aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraneporada <aneporada@ydb.tech>2023-03-14 11:57:34 +0300
committeraneporada <aneporada@ydb.tech>2023-03-14 11:57:34 +0300
commit5d3df1b07e18c2aa5c2a7f5326b966ea3c55e296 (patch)
treeb01e2f3c81296899f3d3724c133334a1dd169614
parent9b9393c30c6fd29ba2bb4a0ba0ef2dd7ce0e1e72 (diff)
downloadydb-5d3df1b07e18c2aa5c2a7f5326b966ea3c55e296.tar.gz
Support TMultiType on minikql level
-rw-r--r--ydb/core/kqp/runtime/kqp_program_builder.cpp11
-rw-r--r--ydb/core/kqp/runtime/kqp_read_table.cpp53
-rw-r--r--ydb/core/yq/libs/pretty_printers/minikql_program_printer.cpp4
-rw-r--r--ydb/library/yql/dq/runtime/dq_arrow_helpers.cpp1
-rw-r--r--ydb/library/yql/dq/runtime/dq_transport.cpp1
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp20
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_compress.cpp6
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_skiptake.cpp8
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_top.cpp16
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp20
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_discard.cpp15
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_flatmap.cpp2
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp27
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_join.cpp14
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_map_join.cpp11
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_multimap.cpp3
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_skip.cpp2
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_source.cpp2
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_wide_chain_map.cpp4
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_wide_chopper.cpp3
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp4
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_wide_condense.cpp4
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_wide_filter.cpp4
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_wide_map.cpp8
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_wide_top_sort.cpp10
-rw-r--r--ydb/library/yql/minikql/comp_nodes/ut/mkql_block_skiptake_ut.cpp2
-rw-r--r--ydb/library/yql/minikql/comp_nodes/ut/mkql_grace_join_ut.cpp36
-rw-r--r--ydb/library/yql/minikql/comp_nodes/ut/mkql_join_ut.cpp16
-rw-r--r--ydb/library/yql/minikql/comp_nodes/ut/mkql_map_join_ut.cpp16
-rw-r--r--ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp4
-rw-r--r--ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp2
-rw-r--r--ydb/library/yql/minikql/mkql_node.cpp119
-rw-r--r--ydb/library/yql/minikql/mkql_node.h168
-rw-r--r--ydb/library/yql/minikql/mkql_node_cast.cpp1
-rw-r--r--ydb/library/yql/minikql/mkql_node_printer.cpp13
-rw-r--r--ydb/library/yql/minikql/mkql_node_serialization.cpp44
-rw-r--r--ydb/library/yql/minikql/mkql_node_visitor.cpp16
-rw-r--r--ydb/library/yql/minikql/mkql_node_visitor.h4
-rw-r--r--ydb/library/yql/minikql/mkql_program_builder.cpp148
-rw-r--r--ydb/library/yql/minikql/mkql_program_builder.h4
-rw-r--r--ydb/library/yql/providers/common/mkql/yql_type_mkql.cpp34
41 files changed, 526 insertions, 354 deletions
diff --git a/ydb/core/kqp/runtime/kqp_program_builder.cpp b/ydb/core/kqp/runtime/kqp_program_builder.cpp
index 78a230e7d3f..de26cb20d21 100644
--- a/ydb/core/kqp/runtime/kqp_program_builder.cpp
+++ b/ydb/core/kqp/runtime/kqp_program_builder.cpp
@@ -131,7 +131,7 @@ TType* MakeWideFlowType(TProgramBuilder& builder, TStructType* rowType) {
tupleItems.push_back(rowType->GetMemberType(i));
}
- return builder.NewFlowType(builder.NewTupleType(tupleItems));
+ return builder.NewFlowType(builder.NewMultiType(tupleItems));
}
TType* MakeBlockType(TProgramBuilder& builder, TStructType* rowType) {
@@ -175,8 +175,7 @@ TRuntimeNode TKqpProgramBuilder::KqpWideReadTable(const TTableId& tableId, const
MKQL_ENSURE_S(returnType);
MKQL_ENSURE_S(returnType->IsFlow());
- const auto itemType = AS_TYPE(TFlowType, returnType)->GetItemType();
- MKQL_ENSURE_S(itemType->IsTuple());
+ GetWideComponents(AS_TYPE(TFlowType, returnType));
TCallableBuilder builder(Env, __func__, returnType);
builder.Add(BuildTableIdLiteral(tableId, *this));
@@ -199,8 +198,7 @@ TRuntimeNode TKqpProgramBuilder::KqpWideReadTableRanges(const TTableId& tableId,
} else {
MKQL_ENSURE_S(returnType);
MKQL_ENSURE_S(returnType->IsFlow());
- const auto itemType = AS_TYPE(TFlowType, returnType)->GetItemType();
- MKQL_ENSURE_S(itemType->IsTuple());
+ GetWideComponents(AS_TYPE(TFlowType, returnType));
}
TCallableBuilder builder(Env, __func__, returnType);
@@ -223,8 +221,7 @@ TRuntimeNode TKqpProgramBuilder::KqpBlockReadTableRanges(const TTableId& tableId
} else {
MKQL_ENSURE_S(returnType);
MKQL_ENSURE_S(returnType->IsFlow());
- const auto itemType = AS_TYPE(TFlowType, returnType)->GetItemType();
- MKQL_ENSURE_S(itemType->IsTuple());
+ GetWideComponents(AS_TYPE(TFlowType, returnType));
}
TCallableBuilder builder(Env, __func__, returnType);
diff --git a/ydb/core/kqp/runtime/kqp_read_table.cpp b/ydb/core/kqp/runtime/kqp_read_table.cpp
index d86b28999a7..b2dd91853a4 100644
--- a/ydb/core/kqp/runtime/kqp_read_table.cpp
+++ b/ydb/core/kqp/runtime/kqp_read_table.cpp
@@ -473,49 +473,43 @@ private:
TParseReadTableRangesResult ParseResult;
};
+std::vector<EValueRepresentation> BuildRepresentations(const TType* type) {
+ std::vector<EValueRepresentation> representations;
+
+ auto wideComponents = type->IsFlow() ?
+ GetWideComponents(AS_TYPE(TFlowType, type)) :
+ AS_TYPE(TTupleType, AS_TYPE(TStreamType, type)->GetItemType())->GetElements();
+
+ representations.reserve(wideComponents.size());
+ for (ui32 i = 0U; i < wideComponents.size(); ++i) {
+ representations.emplace_back(GetValueRepresentation(wideComponents[i]));
+ }
+
+ return representations;
+}
+
} // namespace
IComputationNode* WrapKqpScanWideReadTableRanges(TCallable& callable, const TComputationNodeFactoryContext& ctx,
TKqpScanComputeContext& computeCtx)
{
- std::vector<EValueRepresentation> representations;
-
auto parseResult = ParseWideReadTableRanges(callable);
auto rangesNode = LocateNode(ctx.NodeLocator, *parseResult.Ranges);
const auto type = callable.GetType()->GetReturnType();
- const auto returnItemType = type->IsFlow() ?
- AS_TYPE(TFlowType, callable.GetType()->GetReturnType())->GetItemType():
- AS_TYPE(TStreamType, callable.GetType()->GetReturnType())->GetItemType();
-
- const auto tupleType = AS_TYPE(TTupleType, returnItemType);
-
- representations.reserve(tupleType->GetElementsCount());
- for (ui32 i = 0U; i < tupleType->GetElementsCount(); ++i)
- representations.emplace_back(GetValueRepresentation(tupleType->GetElementType(i)));
-
+ auto representations = BuildRepresentations(type);
return new TKqpScanWideReadTableRangesWrapper(computeCtx, parseResult, rangesNode, std::move(representations));
}
IComputationNode* WrapKqpScanWideReadTable(TCallable& callable, const TComputationNodeFactoryContext& ctx,
TKqpScanComputeContext& computeCtx)
{
- std::vector<EValueRepresentation> representations;
-
auto parseResult = ParseWideReadTable(callable);
auto fromNode = LocateNode(ctx.NodeLocator, *parseResult.FromTuple);
auto toNode = LocateNode(ctx.NodeLocator, *parseResult.ToTuple);
const auto type = callable.GetType()->GetReturnType();
- const auto returnItemType = type->IsFlow() ?
- AS_TYPE(TFlowType, callable.GetType()->GetReturnType())->GetItemType():
- AS_TYPE(TStreamType, callable.GetType()->GetReturnType())->GetItemType();
-
- const auto tupleType = AS_TYPE(TTupleType, returnItemType);
-
- representations.reserve(tupleType->GetElementsCount());
- for (ui32 i = 0U; i < tupleType->GetElementsCount(); ++i)
- representations.emplace_back(GetValueRepresentation(tupleType->GetElementType(i)));
+ auto representations = BuildRepresentations(type);
return new TKqpScanWideReadTableWrapper(computeCtx, parseResult, fromNode, toNode, std::move(representations));
}
@@ -523,22 +517,11 @@ IComputationNode* WrapKqpScanWideReadTable(TCallable& callable, const TComputati
IComputationNode* WrapKqpScanBlockReadTableRanges(TCallable& callable, const TComputationNodeFactoryContext& ctx,
TKqpScanComputeContext& computeCtx)
{
- std::vector<EValueRepresentation> representations;
-
auto parseResult = ParseWideReadTableRanges(callable);
auto rangesNode = LocateNode(ctx.NodeLocator, *parseResult.Ranges);
const auto type = callable.GetType()->GetReturnType();
- const auto returnItemType = type->IsFlow() ?
- AS_TYPE(TFlowType, callable.GetType()->GetReturnType())->GetItemType():
- AS_TYPE(TStreamType, callable.GetType()->GetReturnType())->GetItemType();
-
- const auto tupleType = AS_TYPE(TTupleType, returnItemType);
-
- representations.reserve(tupleType->GetElementsCount());
- for (ui32 i = 0U; i < tupleType->GetElementsCount(); ++i)
- representations.emplace_back(GetValueRepresentation(tupleType->GetElementType(i)));
-
+ auto representations = BuildRepresentations(type);
return new TKqpScanBlockReadTableRangesWrapper(computeCtx, parseResult, rangesNode, std::move(representations));
}
diff --git a/ydb/core/yq/libs/pretty_printers/minikql_program_printer.cpp b/ydb/core/yq/libs/pretty_printers/minikql_program_printer.cpp
index be86991d410..376156f7f13 100644
--- a/ydb/core/yq/libs/pretty_printers/minikql_program_printer.cpp
+++ b/ydb/core/yq/libs/pretty_printers/minikql_program_printer.cpp
@@ -75,6 +75,10 @@ public:
Out << "BlockType";
}
+ void Visit(TMultiType&) override {
+ Out << "MultiType";
+ }
+
// Values
void Visit(TVoid&) override {
Out << "void";
diff --git a/ydb/library/yql/dq/runtime/dq_arrow_helpers.cpp b/ydb/library/yql/dq/runtime/dq_arrow_helpers.cpp
index 94b656e4bb8..09449f9cbee 100644
--- a/ydb/library/yql/dq/runtime/dq_arrow_helpers.cpp
+++ b/ydb/library/yql/dq/runtime/dq_arrow_helpers.cpp
@@ -509,6 +509,7 @@ bool IsArrowCompatible(const NKikimr::NMiniKQL::TType* type) {
case TType::EKind::Flow:
case TType::EKind::Tagged:
case TType::EKind::Pg:
+ case TType::EKind::Multi:
return false;
}
return false;
diff --git a/ydb/library/yql/dq/runtime/dq_transport.cpp b/ydb/library/yql/dq/runtime/dq_transport.cpp
index 5597ad4023c..204c9e02019 100644
--- a/ydb/library/yql/dq/runtime/dq_transport.cpp
+++ b/ydb/library/yql/dq/runtime/dq_transport.cpp
@@ -388,6 +388,7 @@ ui64 EstimateSizeImpl(const NUdf::TUnboxedValuePod& value, const NKikimr::NMiniK
case TType::EKind::Flow:
case TType::EKind::ReservedKind:
case TType::EKind::Block:
+ case TType::EKind::Multi:
THROW yexception() << "Unsupported type: " << type->GetKindAsStr();
}
}
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp
index 8e8af714913..00edfc9ed65 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp
@@ -386,12 +386,12 @@ size_t GetBitmapPopCount(const std::shared_ptr<arrow::ArrayData>& arr) {
size_t CalcMaxBlockLenForOutput(TType* out) {
const auto outputType = AS_TYPE(TFlowType, out);
- const auto outputTupleType = AS_TYPE(TTupleType, outputType->GetItemType());
- MKQL_ENSURE(outputTupleType->GetElementsCount() > 0, "Expecting at least one output column");
+ const auto wideComponents = GetWideComponents(outputType);
+ MKQL_ENSURE(wideComponents.size() > 0, "Expecting at least one output column");
size_t maxBlockItemSize = 0;
- for (ui32 i = 0; i < outputTupleType->GetElementsCount() - 1; ++i) {
- auto type = AS_TYPE(TBlockType, outputTupleType->GetElementType(i));
+ for (ui32 i = 0; i < wideComponents.size() - 1; ++i) {
+ auto type = AS_TYPE(TBlockType, wideComponents[i]);
MKQL_ENSURE(type->GetShape() != TBlockType::EShape::Scalar, "Expecting block type");
maxBlockItemSize = std::max(maxBlockItemSize, CalcMaxBlockItemSize(type->GetItemType()));
}
@@ -1423,7 +1423,8 @@ void FillAggStreams(TRuntimeNode streamsNode, TVector<TVector<ui32>>& streams) {
IComputationNode* WrapBlockCombineAll(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
MKQL_ENSURE(callable.GetInputsCount() == 3, "Expected 3 args");
const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType());
- const auto tupleType = AS_TYPE(TTupleType, flowType->GetItemType());
+ const auto wideComponents = GetWideComponents(flowType);
+ const auto tupleType = TTupleType::Create(wideComponents.size(), wideComponents.data(), ctx.Env);
auto wideFlow = dynamic_cast<IComputationWideFlowNode*>(LocateNode(ctx.NodeLocator, callable, 0));
MKQL_ENSURE(wideFlow != nullptr, "Expected wide flow node");
@@ -1443,7 +1444,8 @@ IComputationNode* WrapBlockCombineAll(TCallable& callable, const TComputationNod
IComputationNode* WrapBlockCombineHashed(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
MKQL_ENSURE(callable.GetInputsCount() == 4, "Expected 4 args");
const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType());
- const auto tupleType = AS_TYPE(TTupleType, flowType->GetItemType());
+ const auto wideComponents = GetWideComponents(flowType);
+ const auto tupleType = TTupleType::Create(wideComponents.size(), wideComponents.data(), ctx.Env);
auto wideFlow = dynamic_cast<IComputationWideFlowNode*>(LocateNode(ctx.NodeLocator, callable, 0));
MKQL_ENSURE(wideFlow != nullptr, "Expected wide flow node");
@@ -1488,7 +1490,8 @@ IComputationNode* WrapBlockCombineHashed(TCallable& callable, const TComputation
IComputationNode* WrapBlockMergeFinalizeHashed(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
MKQL_ENSURE(callable.GetInputsCount() == 3, "Expected 3 args");
const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType());
- const auto tupleType = AS_TYPE(TTupleType, flowType->GetItemType());
+ const auto wideComponents = GetWideComponents(flowType);
+ const auto tupleType = TTupleType::Create(wideComponents.size(), wideComponents.data(), ctx.Env);
auto wideFlow = dynamic_cast<IComputationWideFlowNode*>(LocateNode(ctx.NodeLocator, callable, 0));
MKQL_ENSURE(wideFlow != nullptr, "Expected wide flow node");
@@ -1519,7 +1522,8 @@ IComputationNode* WrapBlockMergeFinalizeHashed(TCallable& callable, const TCompu
IComputationNode* WrapBlockMergeManyFinalizeHashed(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
MKQL_ENSURE(callable.GetInputsCount() == 5, "Expected 5 args");
const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType());
- const auto tupleType = AS_TYPE(TTupleType, flowType->GetItemType());
+ const auto wideComponents = GetWideComponents(flowType);
+ const auto tupleType = TTupleType::Create(wideComponents.size(), wideComponents.data(), ctx.Env);
auto wideFlow = dynamic_cast<IComputationWideFlowNode*>(LocateNode(ctx.NodeLocator, callable, 0));
MKQL_ENSURE(wideFlow != nullptr, "Expected wide flow node");
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_compress.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_compress.cpp
index 726adcd5b1f..bd83ebe6f98 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_compress.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_compress.cpp
@@ -357,8 +357,8 @@ IComputationNode* WrapBlockCompress(TCallable& callable, const TComputationNodeF
MKQL_ENSURE(callable.GetInputsCount() == 2, "Expected 2 args, got " << callable.GetInputsCount());
const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType());
- const auto tupleType = AS_TYPE(TTupleType, flowType->GetItemType());
- const ui32 width = tupleType->GetElementsCount();
+ const auto wideComponents = GetWideComponents(flowType);
+ const ui32 width = wideComponents.size();
MKQL_ENSURE(width > 1, "Expected at least two columns");
const auto indexData = AS_VALUE(TDataLiteral, callable.GetInput(1U));
@@ -369,7 +369,7 @@ IComputationNode* WrapBlockCompress(TCallable& callable, const TComputationNodeF
bool bitmapIsScalar = false;
bool allScalars = true;
for (ui32 i = 0; i < width; ++i) {
- types.push_back(AS_TYPE(TBlockType, tupleType->GetElementType(i)));
+ types.push_back(AS_TYPE(TBlockType, wideComponents[i]));
bool isScalar = types.back()->GetShape() == TBlockType::EShape::Scalar;
if (i == width - 1) {
MKQL_ENSURE(isScalar, "Expecting scalar block size as last column");
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_skiptake.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_skiptake.cpp
index b60617c24af..3434148e261 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_skiptake.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_skiptake.cpp
@@ -142,8 +142,8 @@ IComputationNode* WrapSkipTake(bool skip, TCallable& callable, const TComputatio
MKQL_ENSURE(callable.GetInputsCount() == 2, "Expected 2 args");
const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType());
- const auto tupleType = AS_TYPE(TTupleType, flowType->GetItemType());
- MKQL_ENSURE(tupleType->GetElementsCount() > 0, "Expected at least one column");
+ const auto flowWidth = GetWideComponentsCount(flowType);
+ MKQL_ENSURE(flowWidth > 0, "Expected at least one column");
auto wideFlow = dynamic_cast<IComputationWideFlowNode*>(LocateNode(ctx.NodeLocator, callable, 0));
MKQL_ENSURE(wideFlow != nullptr, "Expected wide flow node");
@@ -153,9 +153,9 @@ IComputationNode* WrapSkipTake(bool skip, TCallable& callable, const TComputatio
MKQL_ENSURE(countType->GetSchemeType() == NUdf::TDataType<ui64>::Id, "Expected ui64");
if (skip) {
- return new TWideSkipBlocksWrapper(ctx.Mutables, wideFlow, count, tupleType->GetElementsCount());
+ return new TWideSkipBlocksWrapper(ctx.Mutables, wideFlow, count, flowWidth);
}
- return new TWideTakeBlocksWrapper(ctx.Mutables, wideFlow, count, tupleType->GetElementsCount());
+ return new TWideTakeBlocksWrapper(ctx.Mutables, wideFlow, count, flowWidth);
}
} //namespace
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_top.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_top.cpp
index cfc075e855d..9b53c713ef3 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_top.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_top.cpp
@@ -25,16 +25,16 @@ class TTopOrSortBlocksWrapper : public TStatefulWideFlowBlockComputationNode<TTo
using TBase = TStatefulWideFlowBlockComputationNode<TSelf>;
using TChunkedArrayIndex = TVector<IArrayBuilder::TArrayDataItem>;
public:
- TTopOrSortBlocksWrapper(TComputationMutables& mutables, IComputationWideFlowNode* flow, TTupleType* tupleType, IComputationNode* count,
+ TTopOrSortBlocksWrapper(TComputationMutables& mutables, IComputationWideFlowNode* flow, TArrayRef<TType* const> wideComponents, IComputationNode* count,
TVector<IComputationNode*>&& directions, TVector<ui32>&& keyIndicies)
- : TBase(mutables, flow, tupleType->GetElementsCount())
+ : TBase(mutables, flow, wideComponents.size())
, Flow_(flow)
, Count_(count)
, Directions_(std::move(directions))
, KeyIndicies_(std::move(keyIndicies))
{
- for (ui32 i = 0; i < tupleType->GetElementsCount() - 1; ++i) {
- Columns_.push_back(AS_TYPE(TBlockType, tupleType->GetElementType(i)));
+ for (ui32 i = 0; i < wideComponents.size() - 1; ++i) {
+ Columns_.push_back(AS_TYPE(TBlockType, wideComponents[i]));
}
}
@@ -502,8 +502,8 @@ IComputationNode* WrapTopOrSort(TCallable& callable, const TComputationNodeFacto
MKQL_ENSURE(inputsWithCount > 2U && !(inputsWithCount % 2U), "Expected more arguments.");
const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType());
- const auto tupleType = AS_TYPE(TTupleType, flowType->GetItemType());
- MKQL_ENSURE(tupleType->GetElementsCount() > 0, "Expected at least one column");
+ const auto wideComponents = GetWideComponents(flowType);
+ MKQL_ENSURE(wideComponents.size() > 0, "Expected at least one column");
auto wideFlow = dynamic_cast<IComputationWideFlowNode*>(LocateNode(ctx.NodeLocator, callable, 0));
MKQL_ENSURE(wideFlow != nullptr, "Expected wide flow node");
@@ -519,12 +519,12 @@ IComputationNode* WrapTopOrSort(TCallable& callable, const TComputationNodeFacto
TVector<ui32> keyIndicies;
for (ui32 i = 2; i < inputsWithCount; i += 2) {
ui32 keyIndex = AS_VALUE(TDataLiteral, callable.GetInput(i - offset))->AsValue().Get<ui32>();
- MKQL_ENSURE(keyIndex + 1 < tupleType->GetElementsCount(), "Wrong key index");
+ MKQL_ENSURE(keyIndex + 1 < wideComponents.size(), "Wrong key index");
keyIndicies.push_back(keyIndex);
directions.push_back(LocateNode(ctx.NodeLocator, callable, i + 1 - offset));
}
- return new TTopOrSortBlocksWrapper<Sort, HasCount>(ctx.Mutables, wideFlow, tupleType, count, std::move(directions), std::move(keyIndicies));
+ return new TTopOrSortBlocksWrapper<Sort, HasCount>(ctx.Mutables, wideFlow, wideComponents, count, std::move(directions), std::move(keyIndicies));
}
} //namespace
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp
index 4f747fb7482..2074fab98c1 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp
@@ -473,12 +473,8 @@ IComputationNode* WrapWideToBlocks(TCallable& callable, const TComputationNodeFa
MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 args, got " << callable.GetInputsCount());
const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType());
- const auto tupleType = AS_TYPE(TTupleType, flowType->GetItemType());
- TVector<TType*> items;
- for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) {
- items.push_back(tupleType->GetElementType(i));
- }
-
+ const auto wideComponents = GetWideComponents(flowType);
+ TVector<TType*> items(wideComponents.begin(), wideComponents.end());
auto wideFlow = dynamic_cast<IComputationWideFlowNode*>(LocateNode(ctx.NodeLocator, callable, 0));
MKQL_ENSURE(wideFlow != nullptr, "Expected wide flow node");
@@ -497,11 +493,11 @@ IComputationNode* WrapWideFromBlocks(TCallable& callable, const TComputationNode
MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 args, got " << callable.GetInputsCount());
const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType());
- const auto tupleType = AS_TYPE(TTupleType, flowType->GetItemType());
- MKQL_ENSURE(tupleType->GetElementsCount() > 0, "Expected at least one column");
+ const auto wideComponents = GetWideComponents(flowType);
+ MKQL_ENSURE(wideComponents.size() > 0, "Expected at least one column");
TVector<TType*> items;
- for (ui32 i = 0; i < tupleType->GetElementsCount() - 1; ++i) {
- const auto blockType = AS_TYPE(TBlockType, tupleType->GetElementType(i));
+ for (ui32 i = 0; i < wideComponents.size() - 1; ++i) {
+ const auto blockType = AS_TYPE(TBlockType, wideComponents[i]);
items.push_back(blockType->GetItemType());
}
@@ -521,12 +517,12 @@ IComputationNode* WrapBlockExpandChunked(TCallable& callable, const TComputation
MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 args, got " << callable.GetInputsCount());
const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType());
- const auto tupleType = AS_TYPE(TTupleType, flowType->GetItemType());
+ const auto wideComponents = GetWideComponents(flowType);
auto wideFlow = dynamic_cast<IComputationWideFlowNode*>(LocateNode(ctx.NodeLocator, callable, 0));
MKQL_ENSURE(wideFlow != nullptr, "Expected wide flow node");
- return new TBlockExpandChunkedWrapper(ctx.Mutables, wideFlow, tupleType->GetElementsCount());
+ return new TBlockExpandChunkedWrapper(ctx.Mutables, wideFlow, wideComponents.size());
}
}
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_discard.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_discard.cpp
index 88a92bcd4e2..78c20eb3cf8 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_discard.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_discard.cpp
@@ -3,6 +3,7 @@
#include <ydb/library/yql/minikql/computation/mkql_computation_node_codegen.h>
#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
#include <ydb/library/yql/minikql/mkql_node_cast.h>
+#include <ydb/library/yql/minikql/mkql_runtime_version.h>
namespace NKikimr {
namespace NMiniKQL {
@@ -223,13 +224,15 @@ IComputationNode* WrapDiscard(TCallable& callable, const TComputationNodeFactory
const auto type = callable.GetType()->GetReturnType();
const auto flow = LocateNode(ctx.NodeLocator, callable, 0);
if (type->IsFlow()) {
- if (const auto wide = dynamic_cast<IComputationWideFlowNode*>(flow))
- if (const auto itemType = AS_TYPE(TFlowType, callable.GetInput(0U).GetStaticType())->GetItemType(); itemType->IsTuple())
- return new TDiscardWideFlowWrapper(wide, AS_TYPE(TTupleType, itemType)->GetElementsCount());
- else
- return new TDiscardWideFlowWrapper(wide, 0U);
- else
+ if (const auto wide = dynamic_cast<IComputationWideFlowNode*>(flow)) {
+ auto flowType = AS_TYPE(TFlowType, callable.GetInput(0U).GetStaticType());
+ if (RuntimeVersion > 35 && flowType->GetItemType()->IsMulti() || flowType->GetItemType()->IsTuple()) {
+ return new TDiscardWideFlowWrapper(wide, GetWideComponentsCount(flowType));
+ }
+ return new TDiscardWideFlowWrapper(wide, 0U);
+ } else {
return new TDiscardFlowWrapper(flow);
+ }
} else if (type->IsStream()) {
return new TDiscardWrapper(ctx.Mutables, flow);
}
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_flatmap.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_flatmap.cpp
index a6ea5576ca4..811d2463eca 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_flatmap.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_flatmap.cpp
@@ -1712,7 +1712,7 @@ IComputationNode* WrapFlatMap(TCallable& callable, const TComputationNodeFactory
IComputationNode* WrapNarrowFlatMap(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
MKQL_ENSURE(callable.GetInputsCount() > 1U, "Expected at least two args.");
- const auto width = AS_TYPE(TTupleType, AS_TYPE(TFlowType, callable.GetInput(0U).GetStaticType())->GetItemType())->GetElementsCount();
+ const auto width = GetWideComponentsCount(AS_TYPE(TFlowType, callable.GetInput(0U).GetStaticType()));
MKQL_ENSURE(callable.GetInputsCount() == width + 2U, "Wrong signature.");
const auto last = callable.GetInputsCount() - 1U;
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp
index 4388da96073..29dfbf2cdd7 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp
@@ -851,8 +851,8 @@ IComputationNode* WrapGraceJoin(TCallable& callable, const TComputationNodeFacto
const auto leftFlowNode = callable.GetInput(0);
const auto rightFlowNode = callable.GetInput(1);
- const auto leftFlowTupleType = AS_TYPE(TFlowType, leftFlowNode)->GetItemType();
- const auto rightFlowTupleType = AS_TYPE(TFlowType, rightFlowNode)->GetItemType();
+ const auto leftFlowComponents = GetWideComponents(AS_TYPE(TFlowType, leftFlowNode));
+ const auto rightFlowComponents = GetWideComponents(AS_TYPE(TFlowType, rightFlowNode));
const auto joinKindNode = callable.GetInput(2);
const auto leftKeyColumnsNode = AS_VALUE(TTupleLiteral, callable.GetInput(3));
const auto rightKeyColumnsNode = AS_VALUE(TTupleLiteral, callable.GetInput(4));
@@ -863,17 +863,16 @@ IComputationNode* WrapGraceJoin(TCallable& callable, const TComputationNodeFacto
const auto flowLeft = dynamic_cast<IComputationWideFlowNode*> (LocateNode(ctx.NodeLocator, callable, 0));
const auto flowRight = dynamic_cast<IComputationWideFlowNode*> (LocateNode(ctx.NodeLocator, callable, 1));
- const auto tupleType = AS_TYPE(TTupleType, AS_TYPE(TFlowType, callable.GetType()->GetReturnType())->GetItemType());
+ const auto outputFlowComponents = GetWideComponents(AS_TYPE(TFlowType, callable.GetType()->GetReturnType()));
std::vector<EValueRepresentation> outputRepresentations;
- outputRepresentations.reserve(tupleType->GetElementsCount());
- for (ui32 i = 0U; i < tupleType->GetElementsCount(); ++i)
- outputRepresentations.emplace_back(GetValueRepresentation(tupleType->GetElementType(i)));
+ outputRepresentations.reserve(outputFlowComponents.size());
+ for (ui32 i = 0U; i < outputFlowComponents.size(); ++i) {
+ outputRepresentations.emplace_back(GetValueRepresentation(outputFlowComponents[i]));
+ }
std::vector<ui32> leftKeyColumns, leftRenames, rightKeyColumns, rightRenames;
- std::vector<TType *> leftColumnsTypes, rightColumnsTypes;
-
- leftColumnsTypes.resize(AS_TYPE(TTupleType, leftFlowTupleType)->GetElementsCount());
- rightColumnsTypes.resize(AS_TYPE(TTupleType, rightFlowTupleType)->GetElementsCount());
+ std::vector<TType*> leftColumnsTypes(leftFlowComponents.begin(), leftFlowComponents.end());
+ std::vector<TType*> rightColumnsTypes(rightFlowComponents.begin(), rightFlowComponents.end());
leftKeyColumns.reserve(leftKeyColumnsNode->GetValuesCount());
for (ui32 i = 0; i < leftKeyColumnsNode->GetValuesCount(); ++i) {
@@ -895,14 +894,6 @@ IComputationNode* WrapGraceJoin(TCallable& callable, const TComputationNodeFacto
rightRenames.emplace_back(AS_VALUE(TDataLiteral, rightRenamesNode->GetValue(i))->AsValue().Get<ui32>());
}
- for (ui32 i = 0; i < leftColumnsTypes.size(); ++i) {
- leftColumnsTypes[i] = AS_TYPE(TTupleType, leftFlowTupleType)->GetElementType(i);
- }
-
- for (ui32 i = 0; i < rightColumnsTypes.size(); ++i) {
- rightColumnsTypes[i] = AS_TYPE(TTupleType, rightFlowTupleType)->GetElementType(i);
- }
-
return new TGraceJoinWrapper(
ctx.Mutables, flowLeft, flowRight, GetJoinKind(rawJoinKind),
std::move(leftKeyColumns), std::move(rightKeyColumns), std::move(leftRenames), std::move(rightRenames),
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_join.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_join.cpp
index 04946f18b93..3164ad4eeed 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_join.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_join.cpp
@@ -2058,6 +2058,15 @@ IComputationNode* WrapCommonJoinCore(TCallable& callable, const TComputationNode
fieldTypes.emplace_back(tupleType->GetElementType(i));
inputRepresentations.emplace_back(GetValueRepresentation(fieldTypes.back()));
}
+ } else if (inputRowType->IsMulti()) {
+ const auto tupleType = AS_TYPE(TMultiType, inputRowType);
+ inputRepresentations.reserve(tupleType->GetElementsCount());
+ fieldTypes.reserve(tupleType->GetElementsCount());
+ for (ui32 i = 0U; i < tupleType->GetElementsCount(); ++i) {
+ fieldTypes.emplace_back(tupleType->GetElementType(i));
+ inputRepresentations.emplace_back(GetValueRepresentation(fieldTypes.back()));
+ }
+
} else if (inputRowType->IsStruct()) {
const auto structType = AS_TYPE(TStructType, inputRowType);
inputRepresentations.reserve(structType->GetMembersCount());
@@ -2078,6 +2087,11 @@ IComputationNode* WrapCommonJoinCore(TCallable& callable, const TComputationNode
outputRepresentations.reserve(tupleType->GetElementsCount());
for (ui32 i = 0U; i < tupleType->GetElementsCount(); ++i)
outputRepresentations.emplace_back(GetValueRepresentation(tupleType->GetElementType(i)));
+ } else if (outputRowType->IsMulti()) {
+ const auto tupleType = AS_TYPE(TMultiType, outputRowType);
+ outputRepresentations.reserve(tupleType->GetElementsCount());
+ for (ui32 i = 0U; i < tupleType->GetElementsCount(); ++i)
+ outputRepresentations.emplace_back(GetValueRepresentation(tupleType->GetElementType(i)));
} else if (outputRowType->IsStruct()) {
const auto structType = AS_TYPE(TStructType, outputRowType);
outputRepresentations.reserve(structType->GetMembersCount());
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_map_join.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_map_join.cpp
index f7c3ddfc107..b37f588c707 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_map_join.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_map_join.cpp
@@ -1878,7 +1878,9 @@ IComputationNode* WrapMapJoinCore(TCallable& callable, const TComputationNodeFac
for (ui32 i = 0; i < leftKeyColumns.size(); ++i) {
const auto leftColumnType = leftItemType->IsTuple() ?
AS_TYPE(TTupleType, leftItemType)->GetElementType(leftKeyColumns[i]):
- AS_TYPE(TStructType, leftItemType)->GetMemberType(leftKeyColumns[i]);
+ (leftItemType->IsMulti() ?
+ AS_TYPE(TMultiType, leftItemType)->GetElementType(leftKeyColumns[i]):
+ AS_TYPE(TStructType, leftItemType)->GetMemberType(leftKeyColumns[i]));
const auto rightType = isTupleKey ? AS_TYPE(TTupleType, dictKeyType)->GetElementType(i) : dictKeyType;
bool isOptional;
if (UnpackOptional(leftColumnType, isOptional)->IsSameType(*rightType)) {
@@ -1901,6 +1903,11 @@ IComputationNode* WrapMapJoinCore(TCallable& callable, const TComputationNodeFac
outputRepresentations.reserve(tupleType->GetElementsCount());
for (ui32 i = 0U; i < tupleType->GetElementsCount(); ++i)
outputRepresentations.emplace_back(GetValueRepresentation(tupleType->GetElementType(i)));
+ } else if (returnItemType->IsMulti()) {
+ const auto multiType = AS_TYPE(TMultiType, returnItemType);
+ outputRepresentations.reserve(multiType->GetElementsCount());
+ for (ui32 i = 0U; i < multiType->GetElementsCount(); ++i)
+ outputRepresentations.emplace_back(GetValueRepresentation(multiType->GetElementType(i)));
} else if (returnItemType->IsStruct()) {
const auto structType = AS_TYPE(TStructType, returnItemType);
outputRepresentations.reserve(structType->GetMembersCount());
@@ -1914,7 +1921,7 @@ IComputationNode* WrapMapJoinCore(TCallable& callable, const TComputationNodeFac
#define NEW_WRAPPER(KIND, RIGHT_REQ, IS_TUPLE) \
if (type->IsFlow()) { \
if (const auto wide = dynamic_cast<IComputationWideFlowNode*>(flow)) { \
- const auto width = AS_TYPE(TTupleType, AS_TYPE(TFlowType, callable.GetInput(0U).GetStaticType())->GetItemType())->GetElementsCount(); \
+ const auto width = GetWideComponentsCount(AS_TYPE(TFlowType, callable.GetInput(0U).GetStaticType())); \
if (boolWithoutRight) \
return new TWideMapJoinWrapper<true, RIGHT_REQ, IS_TUPLE>(ctx.Mutables, \
std::move(leftKeyConverters), dictType, std::move(outputRepresentations), \
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_multimap.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_multimap.cpp
index 7f49641dd59..dc4eb919483 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_multimap.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_multimap.cpp
@@ -548,7 +548,8 @@ IComputationNode* WrapMultiMap(TCallable& callable, const TComputationNodeFactor
IComputationNode* WrapNarrowMultiMap(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
MKQL_ENSURE(callable.GetInputsCount() > 2U, "Expected at least three arguments.");
- const auto width = AS_TYPE(TTupleType, AS_TYPE(TFlowType, callable.GetInput(0U).GetStaticType())->GetItemType())->GetElementsCount();
+ auto wideComponents = GetWideComponents(AS_TYPE(TFlowType, callable.GetInput(0U).GetStaticType()));
+ const auto width = wideComponents.size();
MKQL_ENSURE(callable.GetInputsCount() > width + 2U, "Wrong signature.");
const auto flow = LocateNode(ctx.NodeLocator, callable, 0U);
if (const auto wide = dynamic_cast<IComputationWideFlowNode*>(flow)) {
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_skip.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_skip.cpp
index 78451056a88..6d1905e9fc8 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_skip.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_skip.cpp
@@ -346,7 +346,7 @@ IComputationNode* WrapSkip(TCallable& callable, const TComputationNodeFactoryCon
const auto count = LocateNode(ctx.NodeLocator, callable, 1);
if (type->IsFlow()) {
if (const auto wide = dynamic_cast<IComputationWideFlowNode*>(flow))
- return new TWideSkipWrapper(ctx.Mutables, wide, count, AS_TYPE(TTupleType, AS_TYPE(TFlowType, type)->GetItemType())->GetElementsCount());
+ return new TWideSkipWrapper(ctx.Mutables, wide, count, GetWideComponentsCount(AS_TYPE(TFlowType, type)));
else
return new TSkipFlowWrapper(ctx.Mutables, GetValueRepresentation(type), flow, count);
} else if (type->IsStream()) {
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_source.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_source.cpp
index 20cdca896be..d53311791f5 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_source.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_source.cpp
@@ -74,7 +74,7 @@ IComputationNode* WrapSourceOf(TCallable& callable, const TComputationNodeFactor
IComputationNode* WrapSource(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
MKQL_ENSURE(!callable.GetInputsCount(), "Expected no args.");
- MKQL_ENSURE(!AS_TYPE(TTupleType, AS_TYPE(TFlowType, callable.GetType()->GetReturnType())->GetItemType())->GetElementsCount(), "Expected zero width of output flow.");
+ MKQL_ENSURE(!GetWideComponentsCount(AS_TYPE(TFlowType, callable.GetType()->GetReturnType())), "Expected zero width of output flow.");
return new TSourceWrapper;
}
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_wide_chain_map.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_wide_chain_map.cpp
index cd677661aa5..5c5e5a4bd85 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_wide_chain_map.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_wide_chain_map.cpp
@@ -277,8 +277,8 @@ private:
IComputationNode* WrapWideChain1Map(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
MKQL_ENSURE(callable.GetInputsCount() > 0U, "Expected argument.");
- const auto inputWidth = AS_TYPE(TTupleType, AS_TYPE(TFlowType, callable.GetInput(0U).GetStaticType())->GetItemType())->GetElementsCount();
- const auto outputWidth = AS_TYPE(TTupleType, AS_TYPE(TFlowType, callable.GetType()->GetReturnType())->GetItemType())->GetElementsCount();
+ const auto inputWidth = GetWideComponentsCount(AS_TYPE(TFlowType, callable.GetInput(0U).GetStaticType()));
+ const auto outputWidth = GetWideComponentsCount(AS_TYPE(TFlowType, callable.GetType()->GetReturnType()));
MKQL_ENSURE(callable.GetInputsCount() == inputWidth + outputWidth * 3U + 1U, "Wrong signature.");
const auto flow = LocateNode(ctx.NodeLocator, callable, 0U);
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_wide_chopper.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_wide_chopper.cpp
index fa4bc5df4c2..e06568fa321 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_wide_chopper.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_wide_chopper.cpp
@@ -341,7 +341,8 @@ private:
IComputationNode* WrapWideChopper(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
MKQL_ENSURE(callable.GetInputsCount() >= 4U, "Expected at least four args.");
- const auto width = AS_TYPE(TTupleType, AS_TYPE(TFlowType, callable.GetInput(0U).GetStaticType())->GetItemType())->GetElementsCount();
+ const auto wideComponents = GetWideComponents(AS_TYPE(TFlowType, callable.GetInput(0U).GetStaticType()));
+ const ui32 width = wideComponents.size();
const auto flow = LocateNode(ctx.NodeLocator, callable, 0U);
const auto keysSize = (callable.GetInputsCount() - width - 4U) >> 1U;
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp
index b2c7698bd2a..16b86f90739 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp
@@ -1054,8 +1054,8 @@ template<bool Last>
IComputationNode* WrapWideCombinerT(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
MKQL_ENSURE(callable.GetInputsCount() >= (Last ? 3U : 4U), "Expected more arguments.");
- const auto inputWidth = AS_TYPE(TTupleType, AS_TYPE(TFlowType, callable.GetInput(0U).GetStaticType())->GetItemType())->GetElementsCount();
- const auto outputWidth = AS_TYPE(TTupleType, AS_TYPE(TFlowType, callable.GetType()->GetReturnType())->GetItemType())->GetElementsCount();
+ const auto inputWidth = GetWideComponentsCount(AS_TYPE(TFlowType, callable.GetInput(0U).GetStaticType()));
+ const auto outputWidth = GetWideComponentsCount(AS_TYPE(TFlowType, callable.GetType()->GetReturnType()));
const auto flow = LocateNode(ctx.NodeLocator, callable, 0U);
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_wide_condense.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_wide_condense.cpp
index b65ba68c214..b8a1ff2ed67 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_wide_condense.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_wide_condense.cpp
@@ -253,8 +253,8 @@ private:
IComputationNode* WrapWideCondense1(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
MKQL_ENSURE(callable.GetInputsCount() >= 2U, "Expected at least two args.");
- const auto inputWidth = AS_TYPE(TTupleType, AS_TYPE(TFlowType, callable.GetInput(0U).GetStaticType())->GetItemType())->GetElementsCount();
- const auto outputWidth = AS_TYPE(TTupleType, AS_TYPE(TFlowType, callable.GetType()->GetReturnType())->GetItemType())->GetElementsCount();
+ const auto inputWidth = GetWideComponentsCount(AS_TYPE(TFlowType, callable.GetInput(0U).GetStaticType()));
+ const auto outputWidth = GetWideComponentsCount(AS_TYPE(TFlowType, callable.GetType()->GetReturnType()));
const auto flow = LocateNode(ctx.NodeLocator, callable, 0U);
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_wide_filter.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_wide_filter.cpp
index a3fb6dd736d..a4d4606bc52 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_wide_filter.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_wide_filter.cpp
@@ -399,7 +399,7 @@ private:
template<bool TakeOrSkip, bool Inclusive>
IComputationNode* WrapWideWhile(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
- const auto width = AS_TYPE(TTupleType, AS_TYPE(TFlowType, callable.GetType()->GetReturnType())->GetItemType())->GetElementsCount();
+ const auto width = GetWideComponentsCount(AS_TYPE(TFlowType, callable.GetType()->GetReturnType()));
MKQL_ENSURE(callable.GetInputsCount() == width + 2U, "Expected 3 or more args.");
const auto flow = LocateNode(ctx.NodeLocator, callable, 0U);
@@ -424,7 +424,7 @@ IComputationNode* WrapWideWhile(TCallable& callable, const TComputationNodeFacto
}
IComputationNode* WrapWideFilter(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
- const auto width = AS_TYPE(TTupleType, AS_TYPE(TFlowType, callable.GetType()->GetReturnType())->GetItemType())->GetElementsCount();
+ const auto width = GetWideComponentsCount(AS_TYPE(TFlowType, callable.GetType()->GetReturnType()));
MKQL_ENSURE(callable.GetInputsCount() == width + 2U || callable.GetInputsCount() == width + 3U, "Expected 3 or more args.");
const auto flow = LocateNode(ctx.NodeLocator, callable, 0U);
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_wide_map.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_wide_map.cpp
index 32586e61a8a..4ca15daf5e5 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_wide_map.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_wide_map.cpp
@@ -259,7 +259,7 @@ private:
}
IComputationNode* WrapExpandMap(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
- const auto width = AS_TYPE(TTupleType, AS_TYPE(TFlowType, callable.GetType()->GetReturnType())->GetItemType())->GetElementsCount();
+ const auto width = GetWideComponentsCount(AS_TYPE(TFlowType, callable.GetType()->GetReturnType()));
MKQL_ENSURE(callable.GetInputsCount() == width + 2U, "Expected two or more args.");
const auto flow = LocateNode(ctx.NodeLocator, callable, 0U);
@@ -273,8 +273,8 @@ IComputationNode* WrapExpandMap(TCallable& callable, const TComputationNodeFacto
IComputationNode* WrapWideMap(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
MKQL_ENSURE(callable.GetInputsCount() > 0U, "Expected argument.");
- const auto inputWidth = AS_TYPE(TTupleType, AS_TYPE(TFlowType, callable.GetInput(0U).GetStaticType())->GetItemType())->GetElementsCount();
- const auto outputWidth = AS_TYPE(TTupleType, AS_TYPE(TFlowType, callable.GetType()->GetReturnType())->GetItemType())->GetElementsCount();
+ const auto inputWidth = GetWideComponentsCount(AS_TYPE(TFlowType, callable.GetInput(0U).GetStaticType()));
+ const auto outputWidth = GetWideComponentsCount(AS_TYPE(TFlowType, callable.GetType()->GetReturnType()));
MKQL_ENSURE(callable.GetInputsCount() == inputWidth + outputWidth + 1U, "Wrong signature.");
const auto flow = LocateNode(ctx.NodeLocator, callable, 0U);
@@ -295,7 +295,7 @@ IComputationNode* WrapWideMap(TCallable& callable, const TComputationNodeFactory
IComputationNode* WrapNarrowMap(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
MKQL_ENSURE(callable.GetInputsCount() > 1U, "Expected two or more args.");
- const auto width = AS_TYPE(TTupleType, AS_TYPE(TFlowType, callable.GetInput(0U).GetStaticType())->GetItemType())->GetElementsCount();
+ const auto width = GetWideComponentsCount(AS_TYPE(TFlowType, callable.GetInput(0U).GetStaticType()));
MKQL_ENSURE(callable.GetInputsCount() == width + 2U, "Wrong signature.");
const auto flow = LocateNode(ctx.NodeLocator, callable, 0U);
if (const auto wide = dynamic_cast<IComputationWideFlowNode*>(flow)) {
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_wide_top_sort.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_wide_top_sort.cpp
index ae59e906c70..39aa47b86dd 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_wide_top_sort.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_wide_top_sort.cpp
@@ -516,8 +516,8 @@ IComputationNode* WrapWideTopT(TCallable& callable, const TComputationNodeFactor
}
const auto keyWidth = (inputsWithCount >> 1U) - 1U;
- const auto inputType = AS_TYPE(TTupleType, AS_TYPE(TFlowType, callable.GetType()->GetReturnType())->GetItemType());
- std::vector<ui32> indexes(inputType->GetElementsCount());
+ const auto inputWideComponents = GetWideComponents(AS_TYPE(TFlowType, callable.GetType()->GetReturnType()));
+ std::vector<ui32> indexes(inputWideComponents.size());
TKeyTypes keyTypes(keyWidth);
std::unordered_set<ui32> keyIndexes;
@@ -525,7 +525,7 @@ IComputationNode* WrapWideTopT(TCallable& callable, const TComputationNodeFactor
const auto keyIndex = AS_VALUE(TDataLiteral, callable.GetInput(((i + 1U) << 1U) - offset))->AsValue().Get<ui32>();
indexes[i] = keyIndex;
keyIndexes.emplace(keyIndex);
- keyTypes[i].first = *UnpackOptionalData(inputType->GetElementType(keyIndex), keyTypes[i].second)->GetDataSlot();
+ keyTypes[i].first = *UnpackOptionalData(inputWideComponents[keyIndex], keyTypes[i].second)->GetDataSlot();
}
size_t payloadPos = keyTypes.size();
@@ -537,9 +537,9 @@ IComputationNode* WrapWideTopT(TCallable& callable, const TComputationNodeFactor
indexes[payloadPos++] = i;
}
- std::vector<EValueRepresentation> representations(inputType->GetElementsCount());
+ std::vector<EValueRepresentation> representations(inputWideComponents.size());
for (auto i = 0U; i < representations.size(); ++i)
- representations[i] = GetValueRepresentation(inputType->GetElementType(indexes[i]));
+ representations[i] = GetValueRepresentation(inputWideComponents[indexes[i]]);
TComputationNodePtrVector directions(keyWidth);
auto index = 1U - offset;
diff --git a/ydb/library/yql/minikql/comp_nodes/ut/mkql_block_skiptake_ut.cpp b/ydb/library/yql/minikql/comp_nodes/ut/mkql_block_skiptake_ut.cpp
index 4b12ffc7c35..830ee7152ad 100644
--- a/ydb/library/yql/minikql/comp_nodes/ut/mkql_block_skiptake_ut.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/ut/mkql_block_skiptake_ut.cpp
@@ -114,7 +114,7 @@ TRuntimeNode MakeFlow(TSetup_& setup) {
TProgramBuilder& pb = *setup.PgmBuilder;
TCallableBuilder callableBuilder(*setup.Env, "TestBlockFlow",
pb.NewFlowType(
- pb.NewTupleType({
+ pb.NewMultiType({
pb.NewBlockType(pb.NewDataType(NUdf::EDataSlot::Uint64), TBlockType::EShape::Many),
pb.NewBlockType(pb.NewDataType(NUdf::EDataSlot::Uint64), TBlockType::EShape::Scalar),
pb.NewBlockType(pb.NewDataType(NUdf::EDataSlot::Uint64), TBlockType::EShape::Scalar),
diff --git a/ydb/library/yql/minikql/comp_nodes/ut/mkql_grace_join_ut.cpp b/ydb/library/yql/minikql/comp_nodes/ut/mkql_grace_join_ut.cpp
index 8ad3b86c169..2bb910d4ad9 100644
--- a/ydb/library/yql/minikql/comp_nodes/ut/mkql_grace_join_ut.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/ut/mkql_grace_join_ut.cpp
@@ -370,7 +370,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) {
});
- const auto resultType = pb.NewFlowType(pb.NewTupleType({
+ const auto resultType = pb.NewFlowType(pb.NewMultiType({
pb.NewDataType(NUdf::TDataType<char*>::Id),
pb.NewDataType(NUdf::TDataType<char*>::Id)
}));
@@ -444,7 +444,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) {
});
- const auto resultType = pb.NewFlowType(pb.NewTupleType({
+ const auto resultType = pb.NewFlowType(pb.NewMultiType({
pb.NewDataType(NUdf::TDataType<char*>::Id),
pb.NewDataType(NUdf::TDataType<char*>::Id)
}));
@@ -507,7 +507,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) {
});
- const auto resultType = pb.NewFlowType(pb.NewTupleType({
+ const auto resultType = pb.NewFlowType(pb.NewMultiType({
pb.NewDataType(NUdf::TDataType<char*>::Id),
pb.NewDataType(NUdf::TDataType<char*>::Id)
}));
@@ -575,7 +575,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) {
pb.NewTuple({key4, payload6})
});
- const auto resultType = pb.NewFlowType(pb.NewTupleType({
+ const auto resultType = pb.NewFlowType(pb.NewMultiType({
pb.NewDataType(NUdf::TDataType<char*>::Id),
pb.NewDataType(NUdf::TDataType<char*>::Id)
}));
@@ -643,7 +643,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) {
pb.NewTuple({key4, payload6})
});
- const auto resultType = pb.NewFlowType(pb.NewTupleType({
+ const auto resultType = pb.NewFlowType(pb.NewMultiType({
pb.NewDataType(NUdf::TDataType<char*>::Id),
pb.NewDataType(NUdf::TDataType<char*>::Id)
}));
@@ -716,7 +716,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) {
});
- const auto resultType = pb.NewFlowType(pb.NewTupleType({
+ const auto resultType = pb.NewFlowType(pb.NewMultiType({
pb.NewDataType(NUdf::TDataType<char*>::Id),
pb.NewDataType(NUdf::TDataType<char*>::Id)
}));
@@ -789,7 +789,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) {
});
- const auto resultType = pb.NewFlowType(pb.NewTupleType({
+ const auto resultType = pb.NewFlowType(pb.NewMultiType({
pb.NewDataType(NUdf::TDataType<char*>::Id),
pb.NewDataType(NUdf::TDataType<ui32>::Id)
}));
@@ -851,7 +851,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) {
pb.NewTuple({key4, payload6})
});
- const auto resultType = pb.NewFlowType(pb.NewTupleType({
+ const auto resultType = pb.NewFlowType(pb.NewMultiType({
pb.NewDataType(NUdf::TDataType<char*>::Id),
pb.NewDataType(NUdf::TDataType<ui32>::Id)
}));
@@ -912,7 +912,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) {
pb.NewTuple({key4, payload6})
});
- const auto resultType = pb.NewFlowType(pb.NewTupleType({
+ const auto resultType = pb.NewFlowType(pb.NewMultiType({
pb.NewDataType(NUdf::TDataType<char*>::Id),
pb.NewDataType(NUdf::TDataType<ui32>::Id)
}));
@@ -977,7 +977,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) {
pb.NewTuple({key4, payload6})
});
- const auto resultType = pb.NewFlowType(pb.NewTupleType({
+ const auto resultType = pb.NewFlowType(pb.NewMultiType({
pb.NewDataType(NUdf::TDataType<char*>::Id),
pb.NewDataType(NUdf::TDataType<ui32>::Id)
}));
@@ -1039,7 +1039,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) {
pb.NewTuple({key4, payload6})
});
- const auto resultType = pb.NewFlowType(pb.NewTupleType({
+ const auto resultType = pb.NewFlowType(pb.NewMultiType({
pb.NewDataType(NUdf::TDataType<char*>::Id),
pb.NewDataType(NUdf::TDataType<char*>::Id)
}));
@@ -1112,7 +1112,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) {
pb.NewTuple({key4, payload6})
});
- const auto resultType = pb.NewFlowType(pb.NewTupleType({
+ const auto resultType = pb.NewFlowType(pb.NewMultiType({
pb.NewDataType(NUdf::TDataType<char*>::Id),
pb.NewDataType(NUdf::TDataType<ui32>::Id)
}));
@@ -1172,7 +1172,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) {
});
- const auto resultType = pb.NewFlowType(pb.NewTupleType({
+ const auto resultType = pb.NewFlowType(pb.NewMultiType({
pb.NewDataType(NUdf::TDataType<char*>::Id),
pb.NewDataType(NUdf::TDataType<ui32>::Id)
}));
@@ -1236,7 +1236,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) {
});
- const auto resultType = pb.NewFlowType(pb.NewTupleType({
+ const auto resultType = pb.NewFlowType(pb.NewMultiType({
pb.NewDataType(NUdf::TDataType<char*>::Id),
pb.NewDataType(NUdf::TDataType<char*>::Id)
}));
@@ -1312,7 +1312,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) {
pb.NewTuple({key4, payload6})
});
- const auto resultType = pb.NewFlowType(pb.NewTupleType({
+ const auto resultType = pb.NewFlowType(pb.NewMultiType({
pb.NewDataType(NUdf::TDataType<char*>::Id),
pb.NewDataType(NUdf::TDataType<ui32>::Id)
}));
@@ -1377,7 +1377,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) {
pb.NewTuple({key4, payload6})
});
- const auto resultType = pb.NewFlowType(pb.NewTupleType({
+ const auto resultType = pb.NewFlowType(pb.NewMultiType({
pb.NewDataType(NUdf::TDataType<char*>::Id),
pb.NewDataType(NUdf::TDataType<ui32>::Id)
}));
@@ -1440,7 +1440,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) {
});
- const auto resultType = pb.NewFlowType(pb.NewTupleType({
+ const auto resultType = pb.NewFlowType(pb.NewMultiType({
pb.NewDataType(NUdf::TDataType<char*>::Id),
pb.NewDataType(NUdf::TDataType<char*>::Id)
}));
@@ -1517,7 +1517,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) {
});
- const auto resultType = pb.NewFlowType(pb.NewTupleType({
+ const auto resultType = pb.NewFlowType(pb.NewMultiType({
pb.NewDataType(NUdf::TDataType<char*>::Id),
pb.NewDataType(NUdf::TDataType<char*>::Id)
}));
diff --git a/ydb/library/yql/minikql/comp_nodes/ut/mkql_join_ut.cpp b/ydb/library/yql/minikql/comp_nodes/ut/mkql_join_ut.cpp
index fb25e1ad1be..89e2827eb41 100644
--- a/ydb/library/yql/minikql/comp_nodes/ut/mkql_join_ut.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/ut/mkql_join_ut.cpp
@@ -20,7 +20,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLCommonJoinCoreTupleTest) {
const auto list = pb.NewList(tupleType, {data1, data3, data2, data4});
- const auto outputType = pb.NewFlowType(pb.NewTupleType({optionalType, optionalType}));
+ const auto outputType = pb.NewFlowType(pb.NewMultiType({optionalType, optionalType}));
const auto pgmReturn = pb.Collect(pb.CommonJoinCore(pb.ToFlow(list), EJoinKind::Inner, {0U, 0U}, {1U, 1U}, {}, {2U}, 0ULL, std::nullopt, EAnyJoinSettings::None, 3U, outputType));
const auto graph = setup.BuildGraph(pgmReturn);
@@ -57,7 +57,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLCommonJoinCoreTupleTest) {
const auto list = pb.NewList(tupleType, {data1, data2, data3, data4});
- const auto outputType = pb.NewFlowType(pb.NewTupleType({optionalType, optionalType}));
+ const auto outputType = pb.NewFlowType(pb.NewMultiType({optionalType, optionalType}));
const auto pgmReturn = pb.Collect(pb.CommonJoinCore(pb.ToFlow(list), EJoinKind::Inner, {0U, 0U}, {1U, 1U}, {}, {2U}, 0ULL, {0U}, EAnyJoinSettings::None, 3U, outputType));
const auto graph = setup.BuildGraph(pgmReturn);
@@ -94,7 +94,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLCommonJoinCoreTupleTest) {
const auto list = pb.NewList(tupleType, {data3, data4, data1, data2});
- const auto outputType = pb.NewFlowType(pb.NewTupleType({optionalType, optionalType}));
+ const auto outputType = pb.NewFlowType(pb.NewMultiType({optionalType, optionalType}));
const auto pgmReturn = pb.Collect(pb.CommonJoinCore(pb.ToFlow(list), EJoinKind::Inner, {0U, 0U}, {1U, 1U}, {}, {2U}, 0ULL, {1U}, EAnyJoinSettings::None, 3U, outputType));
const auto graph = setup.BuildGraph(pgmReturn);
@@ -133,7 +133,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLCommonJoinCoreWideTest) {
const auto list = pb.NewList(tupleType, {data1, data3, data2, data4});
- const auto outputType = pb.NewFlowType(pb.NewTupleType({optionalType, optionalType}));
+ const auto outputType = pb.NewFlowType(pb.NewMultiType({optionalType, optionalType}));
const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.CommonJoinCore(pb.ExpandMap(pb.ToFlow(list),
[&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U), pb.Nth(item, 2U), pb.Nth(item, 3U)}; }),
EJoinKind::Inner, {0U, 0U}, {1U, 1U}, {}, {2U}, 0ULL, std::nullopt, EAnyJoinSettings::None, 3U, outputType),
@@ -174,7 +174,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLCommonJoinCoreWideTest) {
const auto list = pb.NewList(tupleType, {data1, data2, data3, data4});
- const auto outputType = pb.NewFlowType(pb.NewTupleType({optionalType, optionalType}));
+ const auto outputType = pb.NewFlowType(pb.NewMultiType({optionalType, optionalType}));
const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.CommonJoinCore(pb.ExpandMap(pb.ToFlow(list),
[&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U), pb.Nth(item, 2U), pb.Nth(item, 3U)}; }),
EJoinKind::Inner, {0U, 0U}, {1U, 1U}, {}, {2U}, 0ULL, {0U}, EAnyJoinSettings::None, 3U, outputType),
@@ -215,7 +215,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLCommonJoinCoreWideTest) {
const auto list = pb.NewList(tupleType, {data3, data4, data1, data2});
- const auto outputType = pb.NewFlowType(pb.NewTupleType({optionalType, optionalType}));
+ const auto outputType = pb.NewFlowType(pb.NewMultiType({optionalType, optionalType}));
const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.CommonJoinCore(pb.ExpandMap(pb.ToFlow(list),
[&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U), pb.Nth(item, 2U), pb.Nth(item, 3U)}; }),
EJoinKind::Inner, {0U, 0U}, {1U, 1U}, {}, {2U}, 0ULL, {1U}, EAnyJoinSettings::None, 3U, outputType),
@@ -262,7 +262,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLCommonJoinCoreWideTest) {
const auto list = pb.NewList(tupleType, {data1, data2, data3, data4});
- const auto outputType = pb.NewFlowType(pb.NewTupleType({optStrType, optStrType}));
+ const auto outputType = pb.NewFlowType(pb.NewMultiType({optStrType, optStrType}));
const auto landmine = pb.NewDataLiteral<NUdf::EDataSlot::Utf8>("ACHTUNG MINEN!");
@@ -303,7 +303,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLCommonJoinCoreWideTest) {
const auto list = pb.NewList(tupleType, {data1, data2, data3, data4});
- const auto outputType = pb.NewFlowType(pb.NewTupleType({optStrType, optStrType}));
+ const auto outputType = pb.NewFlowType(pb.NewMultiType({optStrType, optStrType}));
const auto landmine = pb.NewDataLiteral<NUdf::EDataSlot::Utf8>("ACHTUNG MINEN!");
diff --git a/ydb/library/yql/minikql/comp_nodes/ut/mkql_map_join_ut.cpp b/ydb/library/yql/minikql/comp_nodes/ut/mkql_map_join_ut.cpp
index 6adb2643c62..45f039e2099 100644
--- a/ydb/library/yql/minikql/comp_nodes/ut/mkql_map_join_ut.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/ut/mkql_map_join_ut.cpp
@@ -628,7 +628,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideMapJoinCoreTest) {
return pb.NewTuple({pb.Nth(item, 1U)});
});
- const auto resultType = pb.NewFlowType(pb.NewTupleType({
+ const auto resultType = pb.NewFlowType(pb.NewMultiType({
pb.NewDataType(NUdf::TDataType<char*>::Id),
pb.NewDataType(NUdf::TDataType<char*>::Id)
}));
@@ -695,7 +695,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideMapJoinCoreTest) {
return pb.NewTuple({pb.Nth(item, 1U)});
});
- const auto resultType = pb.NewFlowType(pb.NewTupleType({
+ const auto resultType = pb.NewFlowType(pb.NewMultiType({
pb.NewDataType(NUdf::TDataType<char*>::Id),
pb.NewDataType(NUdf::TDataType<char*>::Id)
}));
@@ -768,7 +768,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideMapJoinCoreTest) {
return pb.NewTuple({pb.Nth(item, 1U)});
});
- const auto resultType = pb.NewFlowType(pb.NewTupleType({
+ const auto resultType = pb.NewFlowType(pb.NewMultiType({
pb.NewDataType(NUdf::TDataType<char*>::Id),
pb.NewDataType(NUdf::TDataType<char*>::Id)
}));
@@ -838,7 +838,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideMapJoinCoreTest) {
return pb.NewTuple({pb.Nth(item, 1U)});
});
- const auto resultType = pb.NewFlowType(pb.NewTupleType({
+ const auto resultType = pb.NewFlowType(pb.NewMultiType({
pb.NewDataType(NUdf::TDataType<char*>::Id),
pb.NewDataType(NUdf::TDataType<char*>::Id)
}));
@@ -914,7 +914,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideMapJoinCoreTest) {
return pb.NewTuple({pb.Nth(item, 1U)});
});
- const auto resultType = pb.NewFlowType(pb.NewTupleType({
+ const auto resultType = pb.NewFlowType(pb.NewMultiType({
pb.NewDataType(NUdf::TDataType<char*>::Id),
pb.NewDataType(NUdf::TDataType<ui32>::Id)
}));
@@ -981,7 +981,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideMapJoinCoreTest) {
return pb.NewTuple({pb.Nth(item, 1U)});
});
- const auto resultType = pb.NewFlowType(pb.NewTupleType({
+ const auto resultType = pb.NewFlowType(pb.NewMultiType({
pb.NewDataType(NUdf::TDataType<char*>::Id),
pb.NewDataType(NUdf::TDataType<ui32>::Id)
}));
@@ -1048,7 +1048,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideMapJoinCoreTest) {
return pb.NewTuple({pb.Nth(item, 1U)});
});
- const auto resultType = pb.NewFlowType(pb.NewTupleType({
+ const auto resultType = pb.NewFlowType(pb.NewMultiType({
pb.NewDataType(NUdf::TDataType<char*>::Id),
pb.NewDataType(NUdf::TDataType<ui32>::Id)
}));
@@ -1118,7 +1118,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideMapJoinCoreTest) {
return pb.NewTuple({pb.Nth(item, 1U)});
});
- const auto resultType = pb.NewFlowType(pb.NewTupleType({
+ const auto resultType = pb.NewFlowType(pb.NewMultiType({
pb.NewDataType(NUdf::TDataType<char*>::Id),
pb.NewDataType(NUdf::TDataType<ui32>::Id)
}));
diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp b/ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp
index c85651f48b2..a3aae17bfdc 100644
--- a/ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp
+++ b/ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp
@@ -307,6 +307,10 @@ private:
VisitType<TBlockType>(node);
}
+ void Visit(TMultiType& node) override {
+ VisitType<TMultiType>(node);
+ }
+
void Visit(TTaggedType& node) override {
VisitType<TTaggedType>(node);
}
diff --git a/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp b/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp
index 685202f0b6a..8fc702bd8a4 100644
--- a/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp
+++ b/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp
@@ -410,7 +410,7 @@ TRuntimeNode CreateMapJoin(TProgramBuilder& pb, size_t vecSize, TCallable* list
return pb.NewTuple({pb.Nth(item, 1U)});
});
- const auto resultType = pb.NewFlowType(pb.NewTupleType({
+ const auto resultType = pb.NewFlowType(pb.NewMultiType({
pb.NewDataType(NUdf::TDataType<char*>::Id),
pb.NewDataType(NUdf::TDataType<char*>::Id),
}));
diff --git a/ydb/library/yql/minikql/mkql_node.cpp b/ydb/library/yql/minikql/mkql_node.cpp
index ab1dd70f815..7a3f2cc3673 100644
--- a/ydb/library/yql/minikql/mkql_node.cpp
+++ b/ydb/library/yql/minikql/mkql_node.cpp
@@ -3,6 +3,7 @@
#include "mkql_node_cast.h"
#include "mkql_node_visitor.h"
#include "mkql_node_printer.h"
+#include "mkql_runtime_version.h"
#include <ydb/library/yql/parser/pg_catalog/catalog.h>
#include <ydb/public/lib/scheme_types/scheme_type_id.h>
@@ -213,6 +214,7 @@ TStringBuf TType::GetKindAsStr() const {
xx(Tagged, TTaggedType) \
xx(Block, TBlockType) \
xx(Pg, TPgType) \
+ xx(Multi, TMultiType) \
void TType::Accept(INodeVisitor& visitor) {
switch (Kind) {
@@ -1903,110 +1905,6 @@ bool TAny::Equals(const TAny& nodeToCompare) const {
return Item.GetNode()->Equals(*nodeToCompare.Item.GetNode());
}
-TTupleType::TTupleType(ui32 elementsCount, TType** elements, const TTypeEnvironment& env, bool validate)
- : TType(EKind::Tuple, env.GetTypeOfType())
- , ElementsCount(elementsCount)
- , Elements(elements)
-{
- if (!validate)
- return;
-}
-
-TTupleType* TTupleType::Create(ui32 elementsCount, TType* const* elements, const TTypeEnvironment& env) {
- TType** allocatedElements = nullptr;
- if (elementsCount) {
- allocatedElements = static_cast<TType**>(env.AllocateBuffer(elementsCount * sizeof(*allocatedElements)));
- for (ui32 i = 0; i < elementsCount; ++i) {
- allocatedElements[i] = elements[i];
- }
- }
-
- return ::new(env.Allocate<TTupleType>()) TTupleType(elementsCount, allocatedElements, env);
-}
-
-bool TTupleType::IsSameType(const TTupleType& typeToCompare) const {
- if (this == &typeToCompare)
- return true;
-
- if (ElementsCount != typeToCompare.ElementsCount)
- return false;
-
- for (size_t index = 0; index < ElementsCount; ++index) {
- if (!Elements[index]->IsSameType(*typeToCompare.Elements[index]))
- return false;
- }
-
- return true;
-}
-
-bool TTupleType::IsConvertableTo(const TTupleType& typeToCompare, bool ignoreTagged) const {
- if (this == &typeToCompare)
- return true;
-
- if (ElementsCount != typeToCompare.ElementsCount)
- return false;
-
- for (size_t index = 0; index < ElementsCount; ++index) {
- if (!Elements[index]->IsConvertableTo(*typeToCompare.Elements[index], ignoreTagged))
- return false;
- }
-
- return true;
-}
-
-void TTupleType::DoUpdateLinks(const THashMap<TNode*, TNode*>& links) {
- for (ui32 i = 0; i < ElementsCount; ++i) {
- auto& element = Elements[i];
- auto elementIt = links.find(element);
- if (elementIt != links.end()) {
- TNode* newNode = elementIt->second;
- Y_VERIFY_DEBUG(element->Equals(*newNode));
- element = static_cast<TType*>(newNode);
- }
- }
-}
-
-TNode* TTupleType::DoCloneOnCallableWrite(const TTypeEnvironment& env) const {
- bool needClone = false;
- for (ui32 i = 0; i < ElementsCount; ++i) {
- if (Elements[i]->GetCookie()) {
- needClone = true;
- break;
- }
- }
-
- if (!needClone)
- return const_cast<TTupleType*>(this);
-
- TType** allocatedElements = nullptr;
- if (ElementsCount) {
- allocatedElements = static_cast<TType**>(env.AllocateBuffer(ElementsCount * sizeof(*allocatedElements)));
- for (ui32 i = 0; i < ElementsCount; ++i) {
- allocatedElements[i] = Elements[i];
- auto newNode = (TNode*)Elements[i]->GetCookie();
- if (newNode) {
- allocatedElements[i] = static_cast<TType*>(newNode);
- }
- }
- }
-
- return ::new(env.Allocate<TTupleType>()) TTupleType(ElementsCount, allocatedElements, env, false);
-}
-
-void TTupleType::DoFreeze(const TTypeEnvironment& env) {
- Y_UNUSED(env);
-}
-
-bool TTupleType::CalculatePresortSupport() {
- for (ui32 i = 0; i < ElementsCount; ++i) {
- if (!Elements[i]->IsPresortSupported()) {
- return false;
- }
- }
-
- return true;
-}
-
TTupleLiteral::TTupleLiteral(TRuntimeNode* values, TTupleType* type, bool validate)
: TNode(type)
, Values(values)
@@ -2371,6 +2269,7 @@ EValueRepresentation GetValueRepresentation(const TType* type) {
case TType::EKind::Callable:
case TType::EKind::EmptyList:
case TType::EKind::EmptyDict:
+ case TType::EKind::Multi:
return EValueRepresentation::Boxed;
case TType::EKind::Variant:
@@ -2391,5 +2290,17 @@ EValueRepresentation GetValueRepresentation(const TType* type) {
}
}
+TArrayRef<TType* const> GetWideComponents(const TFlowType* type) {
+ if (RuntimeVersion > 35) {
+ return AS_TYPE(TMultiType, type->GetItemType())->GetElements();
+ }
+ return AS_TYPE(TTupleType, type->GetItemType())->GetElements();
+}
+
+TArrayRef<TType* const> GetWideComponents(const TStreamType* type) {
+ MKQL_ENSURE(RuntimeVersion > 35, "Wide stream is not supported in runtime version " << RuntimeVersion);
+ return AS_TYPE(TMultiType, type->GetItemType())->GetElements();
+}
+
}
}
diff --git a/ydb/library/yql/minikql/mkql_node.h b/ydb/library/yql/minikql/mkql_node.h
index 12eb7bf85da..fc70424d5de 100644
--- a/ydb/library/yql/minikql/mkql_node.h
+++ b/ydb/library/yql/minikql/mkql_node.h
@@ -149,7 +149,8 @@ class TTypeEnvironment;
XX(EmptyDict, 32 + 2) \
XX(Tagged, 48 + 7) \
XX(Block, 16 + 13) \
- XX(Pg, 16 + 3)
+ XX(Pg, 16 + 3) \
+ XX(Multi, 16 + 11)
class TType : public TNode {
public:
@@ -256,6 +257,9 @@ using TEmptyDictType = TSingularType<TType::EKind::EmptyDict>;
template <TType::EKind SingularKind>
TType* GetTypeOfSingular(const TTypeEnvironment& env);
+template <typename TLiteralType>
+TLiteralType* GetEmptyLiteral(const TTypeEnvironment& env);
+
template <TType::EKind SingularKind>
class TSingular : public TNode {
friend class TTypeEnvironment;
@@ -535,6 +539,11 @@ inline TType* GetTypeOfSingular<TType::EKind::EmptyDict>(const TTypeEnvironment&
return env.GetTypeOfEmptyDict();
}
+template <>
+inline TTupleLiteral* GetEmptyLiteral(const TTypeEnvironment& env) {
+ return env.GetEmptyTuple();
+}
+
class TDataType : public TType {
friend class TType;
public:
@@ -1176,16 +1185,60 @@ private:
TRuntimeNode Item;
};
-class TTupleType : public TType {
+template<typename TDerived, TType::EKind DerivedKind>
+class TTupleLikeType : public TType {
friend class TType;
+using TSelf = TTupleLikeType<TDerived, DerivedKind>;
public:
- static TTupleType* Create(ui32 elementsCount, TType* const* elements, const TTypeEnvironment& env);
+ static TDerived* Create(ui32 elementsCount, TType* const* elements, const TTypeEnvironment& env) {
+ TType **allocatedElements = nullptr;
+ if (elementsCount) {
+ allocatedElements = static_cast<TType **>(env.AllocateBuffer(elementsCount * sizeof(*allocatedElements)));
+ for (ui32 i = 0; i < elementsCount; ++i) {
+ allocatedElements[i] = elements[i];
+ }
+ }
+
+ return ::new (env.Allocate<TDerived>()) TDerived(elementsCount, allocatedElements, env);
+ }
using TType::IsSameType;
- bool IsSameType(const TTupleType& typeToCompare) const;
+ bool IsSameType(const TDerived& typeToCompare) const {
+ if (this == &typeToCompare) {
+ return true;
+ }
+
+ if (ElementsCount != typeToCompare.ElementsCount) {
+ return false;
+ }
+
+ for (size_t index = 0; index < ElementsCount; ++index) {
+ if (!Elements[index]->IsSameType(*typeToCompare.Elements[index])) {
+ return false;
+ }
+ }
+
+ return true;
+ }
using TType::IsConvertableTo;
- bool IsConvertableTo(const TTupleType& typeToCompare, bool ignoreTagged = false) const;
+ bool IsConvertableTo(const TDerived& typeToCompare, bool ignoreTagged = false) const {
+ if (this == &typeToCompare) {
+ return true;
+ }
+
+ if (ElementsCount != typeToCompare.GetElementsCount()) {
+ return false;
+ }
+
+ for (size_t index = 0; index < ElementsCount; ++index) {
+ if (!Elements[index]->IsConvertableTo(*typeToCompare.GetElementType(index), ignoreTagged)) {
+ return false;
+ }
+ }
+
+ return true;
+ }
ui32 GetElementsCount() const {
return ElementsCount;
@@ -1196,19 +1249,102 @@ public:
return Elements[index];
}
+ TArrayRef<TType* const> GetElements() const {
+ return TArrayRef<TType* const>(Elements, ElementsCount);
+ }
+
+ protected:
+ TTupleLikeType(ui32 elementsCount, TType** elements, const TTypeEnvironment& env)
+ : TType(DerivedKind, env.GetTypeOfType())
+ , ElementsCount(elementsCount)
+ , Elements(elements)
+ {
+ }
+
private:
- TTupleType(ui32 elemntsCount, TType** elements, const TTypeEnvironment& env, bool validate = true);
+ void DoUpdateLinks(const THashMap<TNode*, TNode*>& links) {
+ for (ui32 i = 0; i < ElementsCount; ++i) {
+ auto &element = Elements[i];
+ auto elementIt = links.find(element);
+ if (elementIt != links.end()) {
+ TNode* newNode = elementIt->second;
+ Y_VERIFY_DEBUG(element->Equals(*newNode));
+ element = static_cast<TType*>(newNode);
+ }
+ }
+ }
- void DoUpdateLinks(const THashMap<TNode*, TNode*>& links);
- TNode* DoCloneOnCallableWrite(const TTypeEnvironment& env) const;
- void DoFreeze(const TTypeEnvironment& env);
- bool CalculatePresortSupport() override;
+ TNode* DoCloneOnCallableWrite(const TTypeEnvironment& env) const {
+ bool needClone = false;
+ for (ui32 i = 0; i < ElementsCount; ++i) {
+ if (Elements[i]->GetCookie()) {
+ needClone = true;
+ break;
+ }
+ }
+
+ if (!needClone) {
+ return const_cast<TSelf*>(this);
+ }
+
+ TType** allocatedElements = nullptr;
+ if (ElementsCount) {
+ allocatedElements = static_cast<TType**>(env.AllocateBuffer(ElementsCount * sizeof(*allocatedElements)));
+ for (ui32 i = 0; i < ElementsCount; ++i) {
+ allocatedElements[i] = Elements[i];
+ auto newNode = (TNode *)Elements[i]->GetCookie();
+ if (newNode) {
+ allocatedElements[i] = static_cast<TType*>(newNode);
+ }
+ }
+ }
+
+ return ::new (env.Allocate<TDerived>()) TDerived(ElementsCount, allocatedElements, env);
+ }
+
+ void DoFreeze(const TTypeEnvironment& env) {
+ Y_UNUSED(env);
+ }
+
+ bool CalculatePresortSupport() override {
+ for (ui32 i = 0; i < ElementsCount; ++i) {
+ if (!Elements[i]->IsPresortSupported()) {
+ return false;
+ }
+ }
+ return true;
+ }
private:
ui32 ElementsCount;
TType** Elements;
};
+class TTupleType : public TTupleLikeType<TTupleType, TType::EKind::Tuple> {
+private:
+ friend class TType;
+ using TBase = TTupleLikeType<TTupleType, TType::EKind::Tuple>;
+ friend TBase;
+
+ TTupleType(ui32 elementsCount, TType** elements, const TTypeEnvironment& env)
+ : TBase(elementsCount, elements, env)
+ {
+ }
+};
+
+class TMultiType : public TTupleLikeType<TMultiType, TType::EKind::Multi> {
+private:
+ friend class TType;
+ using TBase = TTupleLikeType<TMultiType, TType::EKind::Multi>;
+ friend TBase;
+
+ TMultiType(ui32 elementsCount, TType** elements, const TTypeEnvironment& env)
+ : TBase(elementsCount, elements, env)
+ {
+ }
+};
+
+
class TTupleLiteral : public TNode {
friend class TNode;
public:
@@ -1448,6 +1584,17 @@ enum class EValueRepresentation {
EValueRepresentation GetValueRepresentation(const TType* type);
EValueRepresentation GetValueRepresentation(NUdf::TDataTypeId typeId);
+TArrayRef<TType* const> GetWideComponents(const TFlowType* type);
+TArrayRef<TType* const> GetWideComponents(const TStreamType* type);
+
+inline ui32 GetWideComponentsCount(const TFlowType* type) {
+ return (ui32)GetWideComponents(type).size();
+}
+
+inline ui32 GetWideComponentsCount(const TStreamType* type) {
+ return (ui32)GetWideComponents(type).size();
+}
+
template <TType::EKind SingularKind>
TSingularType<SingularKind>* TSingularType<SingularKind>::Create(TTypeType* type, const TTypeEnvironment& env) {
return ::new(env.Allocate<TSingularType<SingularKind>>()) TSingularType<SingularKind>(type);
@@ -1512,6 +1659,5 @@ template <TType::EKind SingularKind>
void TSingular<SingularKind>::DoFreeze(const TTypeEnvironment& env) {
Y_UNUSED(env);
}
-
}
}
diff --git a/ydb/library/yql/minikql/mkql_node_cast.cpp b/ydb/library/yql/minikql/mkql_node_cast.cpp
index 84344f2cb4e..25d7aa6e0bd 100644
--- a/ydb/library/yql/minikql/mkql_node_cast.cpp
+++ b/ydb/library/yql/minikql/mkql_node_cast.cpp
@@ -54,6 +54,7 @@ MKQL_AS_TYPE(Flow)
MKQL_AS_TYPE(Tagged)
MKQL_AS_TYPE(Block)
MKQL_AS_TYPE(Pg)
+MKQL_AS_TYPE(Multi)
MKQL_AS_VALUE(Any, Type)
MKQL_AS_VALUE(Callable, Type)
diff --git a/ydb/library/yql/minikql/mkql_node_printer.cpp b/ydb/library/yql/minikql/mkql_node_printer.cpp
index 08f13868e05..cd036d1196c 100644
--- a/ydb/library/yql/minikql/mkql_node_printer.cpp
+++ b/ydb/library/yql/minikql/mkql_node_printer.cpp
@@ -359,9 +359,10 @@ namespace {
WriteNewline();
}
- void Visit(TTupleType& node) override {
+ template<typename T>
+ void VisitTupleLike(T& node, std::string_view name) {
WriteIndentation();
- Out << "Type (Tuple) with " << node.GetElementsCount() << " elements {";
+ Out << "Type (" << name << ") with " << node.GetElementsCount() << " elements {";
WriteNewline();
{
@@ -387,6 +388,14 @@ namespace {
WriteNewline();
}
+ void Visit(TTupleType& node) override {
+ VisitTupleLike(node, "Tuple");
+ }
+
+ void Visit(TMultiType& node) override {
+ VisitTupleLike(node, "Multi");
+ }
+
void Visit(TResourceType& node) override {
Y_UNUSED(node);
WriteIndentation();
diff --git a/ydb/library/yql/minikql/mkql_node_serialization.cpp b/ydb/library/yql/minikql/mkql_node_serialization.cpp
index 2f72ca37397..8f0fce58245 100644
--- a/ydb/library/yql/minikql/mkql_node_serialization.cpp
+++ b/ydb/library/yql/minikql/mkql_node_serialization.cpp
@@ -235,6 +235,23 @@ namespace {
IsProcessed0 = false;
}
+ void Visit(TMultiType& node) override {
+ if (node.GetCookie() != 0) {
+ Owner.WriteReference(node);
+ IsProcessed0 = true;
+ return;
+ }
+
+ Owner.Write(TypeMarker | (char)TType::EKind::Multi);
+ Owner.WriteVar32(node.GetElementsCount());
+ for (ui32 i = node.GetElementsCount(); i-- > 0;) {
+ auto elementType = node.GetElementType(i);
+ Owner.AddChildNode(*elementType);
+ }
+
+ IsProcessed0 = false;
+ }
+
void Visit(TTaggedType& node) override {
if (node.GetCookie() != 0) {
Owner.WriteReference(node);
@@ -617,6 +634,10 @@ namespace {
Owner.RegisterReference(node);
}
+ void Visit(TMultiType& node) override {
+ Owner.RegisterReference(node);
+ }
+
void Visit(TTaggedType& node) override {
auto tag = node.GetTagStr();
Owner.WriteName(tag);
@@ -1168,8 +1189,8 @@ namespace {
return TryReadCallableType(code);
case TType::EKind::Any:
return 0;
- case TType::EKind::Tuple:
- return TryReadTupleType();
+ case TType::EKind::Tuple: // and Multi
+ return TryReadTupleOrMultiType();
case TType::EKind::Resource:
return 0;
case TType::EKind::Variant:
@@ -1206,7 +1227,7 @@ namespace {
case TType::EKind::Any:
return ReadAnyType();
case TType::EKind::Tuple:
- return ReadTupleType();
+ return ReadTupleOrMultiType(code);
case TType::EKind::Resource:
return ReadResourceType();
case TType::EKind::Variant:
@@ -1273,7 +1294,7 @@ namespace {
return membersCount;
}
- ui32 TryReadTupleType() {
+ ui32 TryReadTupleOrMultiType() {
const ui32 elementsCount = ReadVar32();
return elementsCount;
}
@@ -1297,7 +1318,7 @@ namespace {
return node;
}
- TNode* ReadTupleType() {
+ TNode* ReadTupleOrMultiType(char code) {
const ui32 elementsCount = ReadVar32();
TStackVec<TType*> elements(elementsCount);
for (ui32 i = 0; i < elementsCount; ++i) {
@@ -1308,7 +1329,18 @@ namespace {
elements[i] = elementType;
}
- auto node = TTupleType::Create(elementsCount, elements.data(), Env);
+ TNode* node = nullptr;
+ switch ((TType::EKind)(code & TypeMask)) {
+ case TType::EKind::Tuple:
+ node = TTupleType::Create(elementsCount, elements.data(), Env);
+ break;
+ case TType::EKind::Multi:
+ node = TMultiType::Create(elementsCount, elements.data(), Env);
+ break;
+ default:
+ ThrowCorrupted();
+ }
+
Nodes.push_back(node);
return node;
}
diff --git a/ydb/library/yql/minikql/mkql_node_visitor.cpp b/ydb/library/yql/minikql/mkql_node_visitor.cpp
index cc8a8562f47..80eea7f9374 100644
--- a/ydb/library/yql/minikql/mkql_node_visitor.cpp
+++ b/ydb/library/yql/minikql/mkql_node_visitor.cpp
@@ -176,6 +176,11 @@ void TThrowingNodeVisitor::Visit(TBlockType& node) {
ThrowUnexpectedNodeType();
}
+void TThrowingNodeVisitor::Visit(TMultiType& node) {
+ Y_UNUSED(node);
+ ThrowUnexpectedNodeType();
+}
+
void TThrowingNodeVisitor::ThrowUnexpectedNodeType() {
THROW yexception() << "Unexpected node type";
}
@@ -312,6 +317,10 @@ void TEmptyNodeVisitor::Visit(TBlockType& node) {
Y_UNUSED(node);
}
+void TEmptyNodeVisitor::Visit(TMultiType& node) {
+ Y_UNUSED(node);
+}
+
void TExploringNodeVisitor::Visit(TTypeType& node) {
Y_VERIFY_DEBUG(node.GetType() == &node);
}
@@ -494,6 +503,13 @@ void TExploringNodeVisitor::Visit(TBlockType& node) {
AddChildNode(&node, *node.GetItemType());
}
+void TExploringNodeVisitor::Visit(TMultiType& node) {
+ AddChildNode(&node, *node.GetType());
+ for (ui32 i = 0, e = node.GetElementsCount(); i < e; ++i) {
+ AddChildNode(&node, *node.GetElementType(i));
+ }
+}
+
void TExploringNodeVisitor::AddChildNode(TNode* parent, TNode& child) {
Stack->push_back(&child);
diff --git a/ydb/library/yql/minikql/mkql_node_visitor.h b/ydb/library/yql/minikql/mkql_node_visitor.h
index 5c4528b517c..dfd76942da5 100644
--- a/ydb/library/yql/minikql/mkql_node_visitor.h
+++ b/ydb/library/yql/minikql/mkql_node_visitor.h
@@ -47,6 +47,7 @@ public:
virtual void Visit(TFlowType& node) = 0;
virtual void Visit(TTaggedType& node) = 0;
virtual void Visit(TBlockType& node) = 0;
+ virtual void Visit(TMultiType& node) = 0;
};
class TThrowingNodeVisitor : public INodeVisitor {
@@ -84,6 +85,7 @@ public:
void Visit(TFlowType& node) override;
void Visit(TTaggedType& node) override;
void Visit(TBlockType& node) override;
+ void Visit(TMultiType& node) override;
protected:
static void ThrowUnexpectedNodeType();
@@ -124,6 +126,7 @@ public:
void Visit(TFlowType& node) override;
void Visit(TTaggedType& node) override;
void Visit(TBlockType& node) override;
+ void Visit(TMultiType& node) override;
};
class TExploringNodeVisitor : public INodeVisitor {
@@ -163,6 +166,7 @@ public:
void Visit(TFlowType& node) override;
void Visit(TTaggedType& node) override;
void Visit(TBlockType& node) override;
+ void Visit(TMultiType& node) override;
void Walk(TNode* root, const TTypeEnvironment& env, const std::vector<TNode*>& terminalNodes = std::vector<TNode*>(),
bool buildConsumersMap = false, size_t nodesCountHint = 0);
diff --git a/ydb/library/yql/minikql/mkql_program_builder.cpp b/ydb/library/yql/minikql/mkql_program_builder.cpp
index 67d8f79d9f9..ee264e8a178 100644
--- a/ydb/library/yql/minikql/mkql_program_builder.cpp
+++ b/ydb/library/yql/minikql/mkql_program_builder.cpp
@@ -227,13 +227,13 @@ bool ReduceOptionalElements(const TType* type, const TArrayRef<const ui32>& test
}
std::vector<TType*> ValidateBlockFlowType(const TType* flowType) {
- const auto* inputTupleType = AS_TYPE(TTupleType, AS_TYPE(TFlowType, flowType)->GetItemType());
- MKQL_ENSURE(inputTupleType->GetElementsCount() > 0, "Expected at least one column");
+ const auto wideComponents = GetWideComponents(AS_TYPE(TFlowType, flowType));
+ MKQL_ENSURE(wideComponents.size() > 0, "Expected at least one column");
std::vector<TType*> flowItems;
- flowItems.reserve(inputTupleType->GetElementsCount());
+ flowItems.reserve(wideComponents.size());
bool isScalar;
- for (size_t i = 0; i < inputTupleType->GetElementsCount(); ++i) {
- auto blockType = AS_TYPE(TBlockType, inputTupleType->GetElementType(i));
+ for (size_t i = 0; i < wideComponents.size(); ++i) {
+ auto blockType = AS_TYPE(TBlockType, wideComponents[i]);
isScalar = blockType->GetShape() == TBlockType::EShape::Scalar;
auto withoutBlock = blockType->GetItemType();
flowItems.push_back(withoutBlock);
@@ -1440,19 +1440,19 @@ TRuntimeNode TProgramBuilder::ToBlocks(TRuntimeNode flow) {
}
TRuntimeNode TProgramBuilder::WideToBlocks(TRuntimeNode flow) {
- TType* outputTupleType;
+ TType* outputItemType;
{
- const auto* inputTupleType = AS_TYPE(TTupleType, AS_TYPE(TFlowType, flow.GetStaticType())->GetItemType());
- std::vector<TType*> outputTupleItems;
- outputTupleItems.reserve(inputTupleType->GetElementsCount());
- for (size_t i = 0; i < inputTupleType->GetElementsCount(); ++i) {
- outputTupleItems.push_back(NewBlockType(inputTupleType->GetElementType(i), TBlockType::EShape::Many));
+ const auto wideComponents = GetWideComponents(AS_TYPE(TFlowType, flow.GetStaticType()));
+ std::vector<TType*> outputItems;
+ outputItems.reserve(wideComponents.size());
+ for (size_t i = 0; i < wideComponents.size(); ++i) {
+ outputItems.push_back(NewBlockType(wideComponents[i], TBlockType::EShape::Many));
}
- outputTupleItems.push_back(NewBlockType(NewDataType(NUdf::TDataType<ui64>::Id), TBlockType::EShape::Scalar));
- outputTupleType = NewTupleType(outputTupleItems);
+ outputItems.push_back(NewBlockType(NewDataType(NUdf::TDataType<ui64>::Id), TBlockType::EShape::Scalar));
+ outputItemType = NewMultiType(outputItems);
}
- TCallableBuilder callableBuilder(Env, __func__, NewFlowType(outputTupleType));
+ TCallableBuilder callableBuilder(Env, __func__, NewFlowType(outputItemType));
callableBuilder.Add(flow);
return TRuntimeNode(callableBuilder.Build(), false);
}
@@ -1467,10 +1467,10 @@ TRuntimeNode TProgramBuilder::FromBlocks(TRuntimeNode flow) {
}
TRuntimeNode TProgramBuilder::WideFromBlocks(TRuntimeNode flow) {
- auto outputTupleItems = ValidateBlockFlowType(flow.GetStaticType());
- outputTupleItems.pop_back();
- TType* outputTupleType = NewTupleType(outputTupleItems);
- TCallableBuilder callableBuilder(Env, __func__, NewFlowType(outputTupleType));
+ auto outputItems = ValidateBlockFlowType(flow.GetStaticType());
+ outputItems.pop_back();
+ TType* outputMultiType = NewMultiType(outputItems);
+ TCallableBuilder callableBuilder(Env, __func__, NewFlowType(outputMultiType));
callableBuilder.Add(flow);
return TRuntimeNode(callableBuilder.Build(), false);
}
@@ -1509,17 +1509,17 @@ TRuntimeNode TProgramBuilder::BlockCompress(TRuntimeNode flow, ui32 bitmapIndex)
MKQL_ENSURE(AS_TYPE(TDataType, blockItemTypes[bitmapIndex])->GetSchemeType() == NUdf::TDataType<bool>::Id, "Expected Bool as bitmap column type");
- const auto* inputTupleType = AS_TYPE(TTupleType, AS_TYPE(TFlowType, flow.GetStaticType())->GetItemType());
- MKQL_ENSURE(inputTupleType->GetElementsCount() == blockItemTypes.size(), "Unexpected tuple size");
+ const auto wideComponents = GetWideComponents(AS_TYPE(TFlowType, flow.GetStaticType()));
+ MKQL_ENSURE(wideComponents.size() == blockItemTypes.size(), "Unexpected tuple size");
std::vector<TType*> flowItems;
- for (size_t i = 0; i < inputTupleType->GetElementsCount(); ++i) {
+ for (size_t i = 0; i < wideComponents.size(); ++i) {
if (i == bitmapIndex) {
continue;
}
- flowItems.push_back(inputTupleType->GetElementType(i));
+ flowItems.push_back(wideComponents[i]);
}
- TCallableBuilder callableBuilder(Env, __func__, NewFlowType(NewTupleType(flowItems)));
+ TCallableBuilder callableBuilder(Env, __func__, NewFlowType(NewMultiType(flowItems)));
callableBuilder.Add(flow);
callableBuilder.Add(NewDataLiteral<ui32>(bitmapIndex));
return TRuntimeNode(callableBuilder.Build(), false);
@@ -1699,7 +1699,7 @@ TRuntimeNode TProgramBuilder::BuildWideTopOrSort(const std::string_view& callabl
}
}
- const auto width = AS_TYPE(TTupleType, AS_TYPE(TFlowType, flow.GetStaticType())->GetItemType())->GetElementsCount();
+ const auto width = GetWideComponentsCount(AS_TYPE(TFlowType, flow.GetStaticType()));
MKQL_ENSURE(!keys.empty() && keys.size() <= width, "Unexpected keys count: " << keys.size());
TCallableBuilder callableBuilder(Env, callableName, flow.GetStaticType());
@@ -2307,6 +2307,20 @@ TRuntimeNode TProgramBuilder::NewTuple(const TArrayRef<const TRuntimeNode>& elem
return NewTuple(NewTupleType(types), elements);
}
+TType* TProgramBuilder::NewEmptyMultiType() {
+ if (RuntimeVersion > 35) {
+ return TMultiType::Create(0, nullptr, Env);
+ }
+ return Env.GetEmptyTuple()->GetGenericType();
+}
+
+TType* TProgramBuilder::NewMultiType(const TArrayRef<TType* const>& elements) {
+ if (RuntimeVersion > 35) {
+ return TMultiType::Create(elements.size(), elements.data(), Env);
+ }
+ return TTupleType::Create(elements.size(), elements.data(), Env);
+}
+
TType* TProgramBuilder::NewResourceType(const std::string_view& tag) {
return TResourceType::Create(tag, Env);
}
@@ -2799,7 +2813,7 @@ TRuntimeNode TProgramBuilder::Source() {
THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;
}
- TCallableBuilder callableBuilder(Env, __func__, NewFlowType(NewTupleType({})));
+ TCallableBuilder callableBuilder(Env, __func__, NewFlowType(NewMultiType({})));
return TRuntimeNode(callableBuilder.Build(), false);
}
@@ -3402,12 +3416,12 @@ TRuntimeNode TProgramBuilder::NarrowSqueezeToDict(TRuntimeNode flow, bool multi,
THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;
}
- const auto tupleType = AS_TYPE(TTupleType, AS_TYPE(TFlowType, flow.GetStaticType())->GetItemType());
+ const auto wideComponents = GetWideComponents(AS_TYPE(TFlowType, flow.GetStaticType()));
TRuntimeNode::TList itemArgs;
- itemArgs.reserve(tupleType->GetElementsCount());
+ itemArgs.reserve(wideComponents.size());
auto i = 0U;
- std::generate_n(std::back_inserter(itemArgs), tupleType->GetElementsCount(), [&](){ return Arg(tupleType->GetElementType(i++)); });
+ std::generate_n(std::back_inserter(itemArgs), wideComponents.size(), [&](){ return Arg(wideComponents[i++]); });
const auto key = keySelector(itemArgs);
const auto keyType = key.GetStaticType();
@@ -3524,12 +3538,12 @@ TRuntimeNode TProgramBuilder::NarrowMultiMap(TRuntimeNode flow, const TWideLambd
THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;
}
- const auto tupleType = AS_TYPE(TTupleType, AS_TYPE(TFlowType, flow.GetStaticType())->GetItemType());
+ const auto wideComponents = GetWideComponents(AS_TYPE(TFlowType, flow.GetStaticType()));
TRuntimeNode::TList itemArgs;
- itemArgs.reserve(tupleType->GetElementsCount());
+ itemArgs.reserve(wideComponents.size());
auto i = 0U;
- std::generate_n(std::back_inserter(itemArgs), tupleType->GetElementsCount(), [&](){ return Arg(tupleType->GetElementType(i++)); });
+ std::generate_n(std::back_inserter(itemArgs), wideComponents.size(), [&](){ return Arg(wideComponents[i++]); });
const auto newList = handler(itemArgs);
@@ -3557,7 +3571,7 @@ TRuntimeNode TProgramBuilder::ExpandMap(TRuntimeNode flow, const TExpandLambda&
tupleItems.reserve(newItems.size());
std::transform(newItems.cbegin(), newItems.cend(), std::back_inserter(tupleItems), std::bind(&TRuntimeNode::GetStaticType, std::placeholders::_1));
- TCallableBuilder callableBuilder(Env, __func__, NewFlowType(NewTupleType(tupleItems)));
+ TCallableBuilder callableBuilder(Env, __func__, NewFlowType(NewMultiType(tupleItems)));
callableBuilder.Add(flow);
callableBuilder.Add(itemArg);
std::for_each(newItems.cbegin(), newItems.cend(), std::bind(&TCallableBuilder::Add, std::ref(callableBuilder), std::placeholders::_1));
@@ -3569,12 +3583,12 @@ TRuntimeNode TProgramBuilder::WideMap(TRuntimeNode flow, const TWideLambda& hand
THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;
}
- const auto tupleType = AS_TYPE(TTupleType, AS_TYPE(TFlowType, flow.GetStaticType())->GetItemType());
+ const auto wideComponents = GetWideComponents(AS_TYPE(TFlowType, flow.GetStaticType()));
TRuntimeNode::TList itemArgs;
- itemArgs.reserve(tupleType->GetElementsCount());
+ itemArgs.reserve(wideComponents.size());
auto i = 0U;
- std::generate_n(std::back_inserter(itemArgs), tupleType->GetElementsCount(), [&](){ return Arg(tupleType->GetElementType(i++)); });
+ std::generate_n(std::back_inserter(itemArgs), wideComponents.size(), [&](){ return Arg(wideComponents[i++]); });
const auto newItems = handler(itemArgs);
@@ -3582,7 +3596,7 @@ TRuntimeNode TProgramBuilder::WideMap(TRuntimeNode flow, const TWideLambda& hand
tupleItems.reserve(newItems.size());
std::transform(newItems.cbegin(), newItems.cend(), std::back_inserter(tupleItems), std::bind(&TRuntimeNode::GetStaticType, std::placeholders::_1));
- TCallableBuilder callableBuilder(Env, __func__, NewFlowType(NewTupleType(tupleItems)));
+ TCallableBuilder callableBuilder(Env, __func__, NewFlowType(NewMultiType(tupleItems)));
callableBuilder.Add(flow);
std::for_each(itemArgs.cbegin(), itemArgs.cend(), std::bind(&TCallableBuilder::Add, std::ref(callableBuilder), std::placeholders::_1));
std::for_each(newItems.cbegin(), newItems.cend(), std::bind(&TCallableBuilder::Add, std::ref(callableBuilder), std::placeholders::_1));
@@ -3594,12 +3608,12 @@ TRuntimeNode TProgramBuilder::WideChain1Map(TRuntimeNode flow, const TWideLambda
THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;
}
- const auto tupleType = AS_TYPE(TTupleType, AS_TYPE(TFlowType, flow.GetStaticType())->GetItemType());
+ const auto wideComponents = GetWideComponents(AS_TYPE(TFlowType, flow.GetStaticType()));
TRuntimeNode::TList inputArgs;
- inputArgs.reserve(tupleType->GetElementsCount());
+ inputArgs.reserve(wideComponents.size());
auto i = 0U;
- std::generate_n(std::back_inserter(inputArgs), tupleType->GetElementsCount(), [&](){ return Arg(tupleType->GetElementType(i++)); });
+ std::generate_n(std::back_inserter(inputArgs), wideComponents.size(), [&](){ return Arg(wideComponents[i++]); });
const auto initItems = init(inputArgs);
@@ -3615,7 +3629,7 @@ TRuntimeNode TProgramBuilder::WideChain1Map(TRuntimeNode flow, const TWideLambda
MKQL_ENSURE(initItems.size() == updateItems.size(), "Expected same width.");
- TCallableBuilder callableBuilder(Env, __func__, NewFlowType(NewTupleType(tupleItems)));
+ TCallableBuilder callableBuilder(Env, __func__, NewFlowType(NewMultiType(tupleItems)));
callableBuilder.Add(flow);
std::for_each(inputArgs.cbegin(), inputArgs.cend(), std::bind(&TCallableBuilder::Add, std::ref(callableBuilder), std::placeholders::_1));
std::for_each(initItems.cbegin(), initItems.cend(), std::bind(&TCallableBuilder::Add, std::ref(callableBuilder), std::placeholders::_1));
@@ -3629,12 +3643,12 @@ TRuntimeNode TProgramBuilder::NarrowMap(TRuntimeNode flow, const TNarrowLambda&
THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;
}
- const auto tupleType = AS_TYPE(TTupleType, AS_TYPE(TFlowType, flow.GetStaticType())->GetItemType());
+ const auto wideComponents = GetWideComponents(AS_TYPE(TFlowType, flow.GetStaticType()));
TRuntimeNode::TList itemArgs;
- itemArgs.reserve(tupleType->GetElementsCount());
+ itemArgs.reserve(wideComponents.size());
auto i = 0U;
- std::generate_n(std::back_inserter(itemArgs), tupleType->GetElementsCount(), [&](){ return Arg(tupleType->GetElementType(i++)); });
+ std::generate_n(std::back_inserter(itemArgs), wideComponents.size(), [&](){ return Arg(wideComponents[i++]); });
const auto newItem = handler(itemArgs);
@@ -3650,12 +3664,12 @@ TRuntimeNode TProgramBuilder::NarrowFlatMap(TRuntimeNode flow, const TNarrowLamb
THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;
}
- const auto tupleType = AS_TYPE(TTupleType, AS_TYPE(TFlowType, flow.GetStaticType())->GetItemType());
+ const auto wideComponents = GetWideComponents(AS_TYPE(TFlowType, flow.GetStaticType()));
TRuntimeNode::TList itemArgs;
- itemArgs.reserve(tupleType->GetElementsCount());
+ itemArgs.reserve(wideComponents.size());
auto i = 0U;
- std::generate_n(std::back_inserter(itemArgs), tupleType->GetElementsCount(), [&](){ return Arg(tupleType->GetElementType(i++)); });
+ std::generate_n(std::back_inserter(itemArgs), wideComponents.size(), [&](){ return Arg(wideComponents[i++]); });
const auto newList = handler(itemArgs);
const auto type = newList.GetStaticType();
@@ -3685,12 +3699,12 @@ TRuntimeNode TProgramBuilder::BuildWideFilter(const std::string_view& callableNa
THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;
}
- const auto tupleType = AS_TYPE(TTupleType, AS_TYPE(TFlowType, flow.GetStaticType())->GetItemType());
+ const auto wideComponents = GetWideComponents(AS_TYPE(TFlowType, flow.GetStaticType()));
TRuntimeNode::TList itemArgs;
- itemArgs.reserve(tupleType->GetElementsCount());
+ itemArgs.reserve(wideComponents.size());
auto i = 0U;
- std::generate_n(std::back_inserter(itemArgs), tupleType->GetElementsCount(), [&](){ return Arg(tupleType->GetElementType(i++)); });
+ std::generate_n(std::back_inserter(itemArgs), wideComponents.size(), [&](){ return Arg(wideComponents[i++]); });
const auto predicate = handler(itemArgs);
@@ -3722,12 +3736,12 @@ TRuntimeNode TProgramBuilder::WideSkipWhileInclusive(TRuntimeNode flow, const TN
}
TRuntimeNode TProgramBuilder::WideFilter(TRuntimeNode flow, TRuntimeNode limit, const TNarrowLambda& handler) {
- const auto tupleType = AS_TYPE(TTupleType, AS_TYPE(TFlowType, flow.GetStaticType())->GetItemType());
+ const auto wideComponents = GetWideComponents(AS_TYPE(TFlowType, flow.GetStaticType()));
TRuntimeNode::TList itemArgs;
- itemArgs.reserve(tupleType->GetElementsCount());
+ itemArgs.reserve(wideComponents.size());
auto i = 0U;
- std::generate_n(std::back_inserter(itemArgs), tupleType->GetElementsCount(), [&](){ return Arg(tupleType->GetElementType(i++)); });
+ std::generate_n(std::back_inserter(itemArgs), wideComponents.size(), [&](){ return Arg(wideComponents[i++]); });
const auto predicate = handler(itemArgs);
@@ -4543,13 +4557,13 @@ TRuntimeNode TProgramBuilder::WideCombiner(TRuntimeNode flow, ui64 memLimit, con
THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;
}
- const auto tupleType = AS_TYPE(TTupleType, AS_TYPE(TFlowType, flow.GetStaticType())->GetItemType());
+ const auto wideComponents = GetWideComponents(AS_TYPE(TFlowType, flow.GetStaticType()));
TRuntimeNode::TList itemArgs;
- itemArgs.reserve(tupleType->GetElementsCount());
+ itemArgs.reserve(wideComponents.size());
auto i = 0U;
- std::generate_n(std::back_inserter(itemArgs), tupleType->GetElementsCount(), [&](){ return Arg(tupleType->GetElementType(i++)); });
+ std::generate_n(std::back_inserter(itemArgs), wideComponents.size(), [&](){ return Arg(wideComponents[i++]); });
const auto keys = extractor(itemArgs);
@@ -4580,7 +4594,7 @@ TRuntimeNode TProgramBuilder::WideCombiner(TRuntimeNode flow, ui64 memLimit, con
tupleItems.reserve(output.size());
std::transform(output.cbegin(), output.cend(), std::back_inserter(tupleItems), std::bind(&TRuntimeNode::GetStaticType, std::placeholders::_1));
- TCallableBuilder callableBuilder(Env, __func__, NewFlowType(NewTupleType(tupleItems)));
+ TCallableBuilder callableBuilder(Env, __func__, NewFlowType(NewMultiType(tupleItems)));
callableBuilder.Add(flow);
callableBuilder.Add(NewDataLiteral(memLimit));
callableBuilder.Add(NewDataLiteral(ui32(keyArgs.size())));
@@ -4602,13 +4616,13 @@ TRuntimeNode TProgramBuilder::WideLastCombiner(TRuntimeNode flow, const TWideLam
THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;
}
- const auto tupleType = AS_TYPE(TTupleType, AS_TYPE(TFlowType, flow.GetStaticType())->GetItemType());
+ const auto wideComponents = GetWideComponents(AS_TYPE(TFlowType, flow.GetStaticType()));
TRuntimeNode::TList itemArgs;
- itemArgs.reserve(tupleType->GetElementsCount());
+ itemArgs.reserve(wideComponents.size());
auto i = 0U;
- std::generate_n(std::back_inserter(itemArgs), tupleType->GetElementsCount(), [&](){ return Arg(tupleType->GetElementType(i++)); });
+ std::generate_n(std::back_inserter(itemArgs), wideComponents.size(), [&](){ return Arg(wideComponents[i++]); });
const auto keys = extractor(itemArgs);
@@ -4639,7 +4653,7 @@ TRuntimeNode TProgramBuilder::WideLastCombiner(TRuntimeNode flow, const TWideLam
tupleItems.reserve(output.size());
std::transform(output.cbegin(), output.cend(), std::back_inserter(tupleItems), std::bind(&TRuntimeNode::GetStaticType, std::placeholders::_1));
- TCallableBuilder callableBuilder(Env, __func__, NewFlowType(NewTupleType(tupleItems)));
+ TCallableBuilder callableBuilder(Env, __func__, NewFlowType(NewMultiType(tupleItems)));
callableBuilder.Add(flow);
callableBuilder.Add(NewDataLiteral(ui32(keyArgs.size())));
callableBuilder.Add(NewDataLiteral(ui32(stateArgs.size())));
@@ -4660,13 +4674,13 @@ TRuntimeNode TProgramBuilder::WideCondense1(TRuntimeNode flow, const TWideLambda
THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;
}
- const auto tupleType = AS_TYPE(TTupleType, AS_TYPE(TFlowType, flow.GetStaticType())->GetItemType());
+ const auto wideComponents = GetWideComponents(AS_TYPE(TFlowType, flow.GetStaticType()));
TRuntimeNode::TList itemArgs;
- itemArgs.reserve(tupleType->GetElementsCount());
+ itemArgs.reserve(wideComponents.size());
auto i = 0U;
- std::generate_n(std::back_inserter(itemArgs), tupleType->GetElementsCount(), [&](){ return Arg(tupleType->GetElementType(i++)); });
+ std::generate_n(std::back_inserter(itemArgs), wideComponents.size(), [&](){ return Arg(wideComponents[i++]); });
const auto first = init(itemArgs);
@@ -4683,7 +4697,7 @@ TRuntimeNode TProgramBuilder::WideCondense1(TRuntimeNode flow, const TWideLambda
tupleItems.reserve(next.size());
std::transform(next.cbegin(), next.cend(), std::back_inserter(tupleItems), std::bind(&TRuntimeNode::GetStaticType, std::placeholders::_1));
- TCallableBuilder callableBuilder(Env, __func__, NewFlowType(NewTupleType(tupleItems)));
+ TCallableBuilder callableBuilder(Env, __func__, NewFlowType(NewMultiType(tupleItems)));
callableBuilder.Add(flow);
std::for_each(itemArgs.cbegin(), itemArgs.cend(), std::bind(&TCallableBuilder::Add, std::ref(callableBuilder), std::placeholders::_1));
std::for_each(first.cbegin(), first.cend(), std::bind(&TCallableBuilder::Add, std::ref(callableBuilder), std::placeholders::_1));
@@ -4845,13 +4859,13 @@ TRuntimeNode TProgramBuilder::WideChopper(TRuntimeNode flow, const TWideLambda&
THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;
}
- const auto tupleType = AS_TYPE(TTupleType, AS_TYPE(TFlowType, flow.GetStaticType())->GetItemType());
+ const auto wideComponents = GetWideComponents(AS_TYPE(TFlowType, flow.GetStaticType()));
TRuntimeNode::TList itemArgs, keyArgs;
- itemArgs.reserve(tupleType->GetElementsCount());
+ itemArgs.reserve(wideComponents.size());
auto i = 0U;
- std::generate_n(std::back_inserter(itemArgs), tupleType->GetElementsCount(), [&](){ return Arg(tupleType->GetElementType(i++)); });
+ std::generate_n(std::back_inserter(itemArgs), wideComponents.size(), [&](){ return Arg(wideComponents[i++]); });
const auto keys = extractor(itemArgs);
diff --git a/ydb/library/yql/minikql/mkql_program_builder.h b/ydb/library/yql/minikql/mkql_program_builder.h
index d20e1adb1cc..5c6348bd2b2 100644
--- a/ydb/library/yql/minikql/mkql_program_builder.h
+++ b/ydb/library/yql/minikql/mkql_program_builder.h
@@ -194,6 +194,10 @@ public:
TRuntimeNode NewEmptyTuple();
TRuntimeNode NewTuple(TType* tupleType, const TArrayRef<const TRuntimeNode>& elements);
TRuntimeNode NewTuple(const TArrayRef<const TRuntimeNode>& elements);
+
+ TType* NewEmptyMultiType();
+ TType* NewMultiType(const TArrayRef<TType* const>& elements);
+
TType* NewResourceType(const std::string_view& tag);
TType* NewVariantType(TType* underlyingType);
TRuntimeNode NewVariant(TRuntimeNode item, ui32 tupleIndex, TType* variantType);
diff --git a/ydb/library/yql/providers/common/mkql/yql_type_mkql.cpp b/ydb/library/yql/providers/common/mkql/yql_type_mkql.cpp
index 5a2123e13fc..b86b9fba726 100644
--- a/ydb/library/yql/providers/common/mkql/yql_type_mkql.cpp
+++ b/ydb/library/yql/providers/common/mkql/yql_type_mkql.cpp
@@ -92,7 +92,7 @@ NKikimr::NMiniKQL::TType* BuildType(const TTypeAnnotationNode& annotation, NKiki
return nullptr;
}
}
- return pgmBuilder.NewTupleType(elements);
+ return pgmBuilder.NewMultiType(elements);
}
case ETypeAnnotationKind::Dict: {
@@ -330,6 +330,7 @@ const TTypeAnnotationNode* ConvertMiniKQLType(TPosition position, NKikimr::NMini
}
case TType::EKind::Any:
+ case TType::EKind::ReservedKind:
YQL_ENSURE(false, "Not supported");
break;
@@ -393,9 +394,36 @@ const TTypeAnnotationNode* ConvertMiniKQLType(TPosition position, NKikimr::NMini
}
}
- default:
- YQL_ENSURE(false, "Unknown kind");
+ case TType::EKind::Flow:
+ {
+ auto flowType = static_cast<TFlowType*>(type);
+ auto itemType = ConvertMiniKQLType(position, flowType->GetItemType(), ctx);
+ return ctx.MakeType<TFlowExprType>(itemType);
+ }
+
+ case TType::EKind::Pg:
+ {
+ auto pgType = static_cast<TPgType*>(type);
+ return ctx.MakeType<TPgExprType>(pgType->GetTypeId());
}
+
+ case TType::EKind::Multi:
+ {
+ TTypeAnnotationNode::TListType elements;
+ auto multiType = static_cast<TMultiType*>(type);
+ for (ui32 index = 0; index < multiType->GetElementsCount(); ++index) {
+ auto elementType = ConvertMiniKQLType(position, multiType->GetElementType(index), ctx);
+ elements.push_back(elementType);
+ }
+
+ auto res = ctx.MakeType<TMultiExprType>(elements);
+ YQL_ENSURE(res->Validate(position, ctx));
+ return res;
+ }
+
+ }
+
+ YQL_ENSURE(false, "Unknown kind");
}
} // namespace NCommon