diff options
author | aneporada <aneporada@ydb.tech> | 2023-03-14 11:57:34 +0300 |
---|---|---|
committer | aneporada <aneporada@ydb.tech> | 2023-03-14 11:57:34 +0300 |
commit | 5d3df1b07e18c2aa5c2a7f5326b966ea3c55e296 (patch) | |
tree | b01e2f3c81296899f3d3724c133334a1dd169614 | |
parent | 9b9393c30c6fd29ba2bb4a0ba0ef2dd7ce0e1e72 (diff) | |
download | ydb-5d3df1b07e18c2aa5c2a7f5326b966ea3c55e296.tar.gz |
Support TMultiType on minikql level
41 files changed, 526 insertions, 354 deletions
diff --git a/ydb/core/kqp/runtime/kqp_program_builder.cpp b/ydb/core/kqp/runtime/kqp_program_builder.cpp index 78a230e7d3f..de26cb20d21 100644 --- a/ydb/core/kqp/runtime/kqp_program_builder.cpp +++ b/ydb/core/kqp/runtime/kqp_program_builder.cpp @@ -131,7 +131,7 @@ TType* MakeWideFlowType(TProgramBuilder& builder, TStructType* rowType) { tupleItems.push_back(rowType->GetMemberType(i)); } - return builder.NewFlowType(builder.NewTupleType(tupleItems)); + return builder.NewFlowType(builder.NewMultiType(tupleItems)); } TType* MakeBlockType(TProgramBuilder& builder, TStructType* rowType) { @@ -175,8 +175,7 @@ TRuntimeNode TKqpProgramBuilder::KqpWideReadTable(const TTableId& tableId, const MKQL_ENSURE_S(returnType); MKQL_ENSURE_S(returnType->IsFlow()); - const auto itemType = AS_TYPE(TFlowType, returnType)->GetItemType(); - MKQL_ENSURE_S(itemType->IsTuple()); + GetWideComponents(AS_TYPE(TFlowType, returnType)); TCallableBuilder builder(Env, __func__, returnType); builder.Add(BuildTableIdLiteral(tableId, *this)); @@ -199,8 +198,7 @@ TRuntimeNode TKqpProgramBuilder::KqpWideReadTableRanges(const TTableId& tableId, } else { MKQL_ENSURE_S(returnType); MKQL_ENSURE_S(returnType->IsFlow()); - const auto itemType = AS_TYPE(TFlowType, returnType)->GetItemType(); - MKQL_ENSURE_S(itemType->IsTuple()); + GetWideComponents(AS_TYPE(TFlowType, returnType)); } TCallableBuilder builder(Env, __func__, returnType); @@ -223,8 +221,7 @@ TRuntimeNode TKqpProgramBuilder::KqpBlockReadTableRanges(const TTableId& tableId } else { MKQL_ENSURE_S(returnType); MKQL_ENSURE_S(returnType->IsFlow()); - const auto itemType = AS_TYPE(TFlowType, returnType)->GetItemType(); - MKQL_ENSURE_S(itemType->IsTuple()); + GetWideComponents(AS_TYPE(TFlowType, returnType)); } TCallableBuilder builder(Env, __func__, returnType); diff --git a/ydb/core/kqp/runtime/kqp_read_table.cpp b/ydb/core/kqp/runtime/kqp_read_table.cpp index d86b28999a7..b2dd91853a4 100644 --- a/ydb/core/kqp/runtime/kqp_read_table.cpp +++ b/ydb/core/kqp/runtime/kqp_read_table.cpp @@ -473,49 +473,43 @@ private: TParseReadTableRangesResult ParseResult; }; +std::vector<EValueRepresentation> BuildRepresentations(const TType* type) { + std::vector<EValueRepresentation> representations; + + auto wideComponents = type->IsFlow() ? + GetWideComponents(AS_TYPE(TFlowType, type)) : + AS_TYPE(TTupleType, AS_TYPE(TStreamType, type)->GetItemType())->GetElements(); + + representations.reserve(wideComponents.size()); + for (ui32 i = 0U; i < wideComponents.size(); ++i) { + representations.emplace_back(GetValueRepresentation(wideComponents[i])); + } + + return representations; +} + } // namespace IComputationNode* WrapKqpScanWideReadTableRanges(TCallable& callable, const TComputationNodeFactoryContext& ctx, TKqpScanComputeContext& computeCtx) { - std::vector<EValueRepresentation> representations; - auto parseResult = ParseWideReadTableRanges(callable); auto rangesNode = LocateNode(ctx.NodeLocator, *parseResult.Ranges); const auto type = callable.GetType()->GetReturnType(); - const auto returnItemType = type->IsFlow() ? - AS_TYPE(TFlowType, callable.GetType()->GetReturnType())->GetItemType(): - AS_TYPE(TStreamType, callable.GetType()->GetReturnType())->GetItemType(); - - const auto tupleType = AS_TYPE(TTupleType, returnItemType); - - representations.reserve(tupleType->GetElementsCount()); - for (ui32 i = 0U; i < tupleType->GetElementsCount(); ++i) - representations.emplace_back(GetValueRepresentation(tupleType->GetElementType(i))); - + auto representations = BuildRepresentations(type); return new TKqpScanWideReadTableRangesWrapper(computeCtx, parseResult, rangesNode, std::move(representations)); } IComputationNode* WrapKqpScanWideReadTable(TCallable& callable, const TComputationNodeFactoryContext& ctx, TKqpScanComputeContext& computeCtx) { - std::vector<EValueRepresentation> representations; - auto parseResult = ParseWideReadTable(callable); auto fromNode = LocateNode(ctx.NodeLocator, *parseResult.FromTuple); auto toNode = LocateNode(ctx.NodeLocator, *parseResult.ToTuple); const auto type = callable.GetType()->GetReturnType(); - const auto returnItemType = type->IsFlow() ? - AS_TYPE(TFlowType, callable.GetType()->GetReturnType())->GetItemType(): - AS_TYPE(TStreamType, callable.GetType()->GetReturnType())->GetItemType(); - - const auto tupleType = AS_TYPE(TTupleType, returnItemType); - - representations.reserve(tupleType->GetElementsCount()); - for (ui32 i = 0U; i < tupleType->GetElementsCount(); ++i) - representations.emplace_back(GetValueRepresentation(tupleType->GetElementType(i))); + auto representations = BuildRepresentations(type); return new TKqpScanWideReadTableWrapper(computeCtx, parseResult, fromNode, toNode, std::move(representations)); } @@ -523,22 +517,11 @@ IComputationNode* WrapKqpScanWideReadTable(TCallable& callable, const TComputati IComputationNode* WrapKqpScanBlockReadTableRanges(TCallable& callable, const TComputationNodeFactoryContext& ctx, TKqpScanComputeContext& computeCtx) { - std::vector<EValueRepresentation> representations; - auto parseResult = ParseWideReadTableRanges(callable); auto rangesNode = LocateNode(ctx.NodeLocator, *parseResult.Ranges); const auto type = callable.GetType()->GetReturnType(); - const auto returnItemType = type->IsFlow() ? - AS_TYPE(TFlowType, callable.GetType()->GetReturnType())->GetItemType(): - AS_TYPE(TStreamType, callable.GetType()->GetReturnType())->GetItemType(); - - const auto tupleType = AS_TYPE(TTupleType, returnItemType); - - representations.reserve(tupleType->GetElementsCount()); - for (ui32 i = 0U; i < tupleType->GetElementsCount(); ++i) - representations.emplace_back(GetValueRepresentation(tupleType->GetElementType(i))); - + auto representations = BuildRepresentations(type); return new TKqpScanBlockReadTableRangesWrapper(computeCtx, parseResult, rangesNode, std::move(representations)); } diff --git a/ydb/core/yq/libs/pretty_printers/minikql_program_printer.cpp b/ydb/core/yq/libs/pretty_printers/minikql_program_printer.cpp index be86991d410..376156f7f13 100644 --- a/ydb/core/yq/libs/pretty_printers/minikql_program_printer.cpp +++ b/ydb/core/yq/libs/pretty_printers/minikql_program_printer.cpp @@ -75,6 +75,10 @@ public: Out << "BlockType"; } + void Visit(TMultiType&) override { + Out << "MultiType"; + } + // Values void Visit(TVoid&) override { Out << "void"; diff --git a/ydb/library/yql/dq/runtime/dq_arrow_helpers.cpp b/ydb/library/yql/dq/runtime/dq_arrow_helpers.cpp index 94b656e4bb8..09449f9cbee 100644 --- a/ydb/library/yql/dq/runtime/dq_arrow_helpers.cpp +++ b/ydb/library/yql/dq/runtime/dq_arrow_helpers.cpp @@ -509,6 +509,7 @@ bool IsArrowCompatible(const NKikimr::NMiniKQL::TType* type) { case TType::EKind::Flow: case TType::EKind::Tagged: case TType::EKind::Pg: + case TType::EKind::Multi: return false; } return false; diff --git a/ydb/library/yql/dq/runtime/dq_transport.cpp b/ydb/library/yql/dq/runtime/dq_transport.cpp index 5597ad4023c..204c9e02019 100644 --- a/ydb/library/yql/dq/runtime/dq_transport.cpp +++ b/ydb/library/yql/dq/runtime/dq_transport.cpp @@ -388,6 +388,7 @@ ui64 EstimateSizeImpl(const NUdf::TUnboxedValuePod& value, const NKikimr::NMiniK case TType::EKind::Flow: case TType::EKind::ReservedKind: case TType::EKind::Block: + case TType::EKind::Multi: THROW yexception() << "Unsupported type: " << type->GetKindAsStr(); } } diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp index 8e8af714913..00edfc9ed65 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp @@ -386,12 +386,12 @@ size_t GetBitmapPopCount(const std::shared_ptr<arrow::ArrayData>& arr) { size_t CalcMaxBlockLenForOutput(TType* out) { const auto outputType = AS_TYPE(TFlowType, out); - const auto outputTupleType = AS_TYPE(TTupleType, outputType->GetItemType()); - MKQL_ENSURE(outputTupleType->GetElementsCount() > 0, "Expecting at least one output column"); + const auto wideComponents = GetWideComponents(outputType); + MKQL_ENSURE(wideComponents.size() > 0, "Expecting at least one output column"); size_t maxBlockItemSize = 0; - for (ui32 i = 0; i < outputTupleType->GetElementsCount() - 1; ++i) { - auto type = AS_TYPE(TBlockType, outputTupleType->GetElementType(i)); + for (ui32 i = 0; i < wideComponents.size() - 1; ++i) { + auto type = AS_TYPE(TBlockType, wideComponents[i]); MKQL_ENSURE(type->GetShape() != TBlockType::EShape::Scalar, "Expecting block type"); maxBlockItemSize = std::max(maxBlockItemSize, CalcMaxBlockItemSize(type->GetItemType())); } @@ -1423,7 +1423,8 @@ void FillAggStreams(TRuntimeNode streamsNode, TVector<TVector<ui32>>& streams) { IComputationNode* WrapBlockCombineAll(TCallable& callable, const TComputationNodeFactoryContext& ctx) { MKQL_ENSURE(callable.GetInputsCount() == 3, "Expected 3 args"); const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType()); - const auto tupleType = AS_TYPE(TTupleType, flowType->GetItemType()); + const auto wideComponents = GetWideComponents(flowType); + const auto tupleType = TTupleType::Create(wideComponents.size(), wideComponents.data(), ctx.Env); auto wideFlow = dynamic_cast<IComputationWideFlowNode*>(LocateNode(ctx.NodeLocator, callable, 0)); MKQL_ENSURE(wideFlow != nullptr, "Expected wide flow node"); @@ -1443,7 +1444,8 @@ IComputationNode* WrapBlockCombineAll(TCallable& callable, const TComputationNod IComputationNode* WrapBlockCombineHashed(TCallable& callable, const TComputationNodeFactoryContext& ctx) { MKQL_ENSURE(callable.GetInputsCount() == 4, "Expected 4 args"); const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType()); - const auto tupleType = AS_TYPE(TTupleType, flowType->GetItemType()); + const auto wideComponents = GetWideComponents(flowType); + const auto tupleType = TTupleType::Create(wideComponents.size(), wideComponents.data(), ctx.Env); auto wideFlow = dynamic_cast<IComputationWideFlowNode*>(LocateNode(ctx.NodeLocator, callable, 0)); MKQL_ENSURE(wideFlow != nullptr, "Expected wide flow node"); @@ -1488,7 +1490,8 @@ IComputationNode* WrapBlockCombineHashed(TCallable& callable, const TComputation IComputationNode* WrapBlockMergeFinalizeHashed(TCallable& callable, const TComputationNodeFactoryContext& ctx) { MKQL_ENSURE(callable.GetInputsCount() == 3, "Expected 3 args"); const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType()); - const auto tupleType = AS_TYPE(TTupleType, flowType->GetItemType()); + const auto wideComponents = GetWideComponents(flowType); + const auto tupleType = TTupleType::Create(wideComponents.size(), wideComponents.data(), ctx.Env); auto wideFlow = dynamic_cast<IComputationWideFlowNode*>(LocateNode(ctx.NodeLocator, callable, 0)); MKQL_ENSURE(wideFlow != nullptr, "Expected wide flow node"); @@ -1519,7 +1522,8 @@ IComputationNode* WrapBlockMergeFinalizeHashed(TCallable& callable, const TCompu IComputationNode* WrapBlockMergeManyFinalizeHashed(TCallable& callable, const TComputationNodeFactoryContext& ctx) { MKQL_ENSURE(callable.GetInputsCount() == 5, "Expected 5 args"); const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType()); - const auto tupleType = AS_TYPE(TTupleType, flowType->GetItemType()); + const auto wideComponents = GetWideComponents(flowType); + const auto tupleType = TTupleType::Create(wideComponents.size(), wideComponents.data(), ctx.Env); auto wideFlow = dynamic_cast<IComputationWideFlowNode*>(LocateNode(ctx.NodeLocator, callable, 0)); MKQL_ENSURE(wideFlow != nullptr, "Expected wide flow node"); diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_compress.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_compress.cpp index 726adcd5b1f..bd83ebe6f98 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_compress.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_compress.cpp @@ -357,8 +357,8 @@ IComputationNode* WrapBlockCompress(TCallable& callable, const TComputationNodeF MKQL_ENSURE(callable.GetInputsCount() == 2, "Expected 2 args, got " << callable.GetInputsCount()); const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType()); - const auto tupleType = AS_TYPE(TTupleType, flowType->GetItemType()); - const ui32 width = tupleType->GetElementsCount(); + const auto wideComponents = GetWideComponents(flowType); + const ui32 width = wideComponents.size(); MKQL_ENSURE(width > 1, "Expected at least two columns"); const auto indexData = AS_VALUE(TDataLiteral, callable.GetInput(1U)); @@ -369,7 +369,7 @@ IComputationNode* WrapBlockCompress(TCallable& callable, const TComputationNodeF bool bitmapIsScalar = false; bool allScalars = true; for (ui32 i = 0; i < width; ++i) { - types.push_back(AS_TYPE(TBlockType, tupleType->GetElementType(i))); + types.push_back(AS_TYPE(TBlockType, wideComponents[i])); bool isScalar = types.back()->GetShape() == TBlockType::EShape::Scalar; if (i == width - 1) { MKQL_ENSURE(isScalar, "Expecting scalar block size as last column"); diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_skiptake.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_skiptake.cpp index b60617c24af..3434148e261 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_skiptake.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_skiptake.cpp @@ -142,8 +142,8 @@ IComputationNode* WrapSkipTake(bool skip, TCallable& callable, const TComputatio MKQL_ENSURE(callable.GetInputsCount() == 2, "Expected 2 args"); const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType()); - const auto tupleType = AS_TYPE(TTupleType, flowType->GetItemType()); - MKQL_ENSURE(tupleType->GetElementsCount() > 0, "Expected at least one column"); + const auto flowWidth = GetWideComponentsCount(flowType); + MKQL_ENSURE(flowWidth > 0, "Expected at least one column"); auto wideFlow = dynamic_cast<IComputationWideFlowNode*>(LocateNode(ctx.NodeLocator, callable, 0)); MKQL_ENSURE(wideFlow != nullptr, "Expected wide flow node"); @@ -153,9 +153,9 @@ IComputationNode* WrapSkipTake(bool skip, TCallable& callable, const TComputatio MKQL_ENSURE(countType->GetSchemeType() == NUdf::TDataType<ui64>::Id, "Expected ui64"); if (skip) { - return new TWideSkipBlocksWrapper(ctx.Mutables, wideFlow, count, tupleType->GetElementsCount()); + return new TWideSkipBlocksWrapper(ctx.Mutables, wideFlow, count, flowWidth); } - return new TWideTakeBlocksWrapper(ctx.Mutables, wideFlow, count, tupleType->GetElementsCount()); + return new TWideTakeBlocksWrapper(ctx.Mutables, wideFlow, count, flowWidth); } } //namespace diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_top.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_top.cpp index cfc075e855d..9b53c713ef3 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_top.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_top.cpp @@ -25,16 +25,16 @@ class TTopOrSortBlocksWrapper : public TStatefulWideFlowBlockComputationNode<TTo using TBase = TStatefulWideFlowBlockComputationNode<TSelf>; using TChunkedArrayIndex = TVector<IArrayBuilder::TArrayDataItem>; public: - TTopOrSortBlocksWrapper(TComputationMutables& mutables, IComputationWideFlowNode* flow, TTupleType* tupleType, IComputationNode* count, + TTopOrSortBlocksWrapper(TComputationMutables& mutables, IComputationWideFlowNode* flow, TArrayRef<TType* const> wideComponents, IComputationNode* count, TVector<IComputationNode*>&& directions, TVector<ui32>&& keyIndicies) - : TBase(mutables, flow, tupleType->GetElementsCount()) + : TBase(mutables, flow, wideComponents.size()) , Flow_(flow) , Count_(count) , Directions_(std::move(directions)) , KeyIndicies_(std::move(keyIndicies)) { - for (ui32 i = 0; i < tupleType->GetElementsCount() - 1; ++i) { - Columns_.push_back(AS_TYPE(TBlockType, tupleType->GetElementType(i))); + for (ui32 i = 0; i < wideComponents.size() - 1; ++i) { + Columns_.push_back(AS_TYPE(TBlockType, wideComponents[i])); } } @@ -502,8 +502,8 @@ IComputationNode* WrapTopOrSort(TCallable& callable, const TComputationNodeFacto MKQL_ENSURE(inputsWithCount > 2U && !(inputsWithCount % 2U), "Expected more arguments."); const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType()); - const auto tupleType = AS_TYPE(TTupleType, flowType->GetItemType()); - MKQL_ENSURE(tupleType->GetElementsCount() > 0, "Expected at least one column"); + const auto wideComponents = GetWideComponents(flowType); + MKQL_ENSURE(wideComponents.size() > 0, "Expected at least one column"); auto wideFlow = dynamic_cast<IComputationWideFlowNode*>(LocateNode(ctx.NodeLocator, callable, 0)); MKQL_ENSURE(wideFlow != nullptr, "Expected wide flow node"); @@ -519,12 +519,12 @@ IComputationNode* WrapTopOrSort(TCallable& callable, const TComputationNodeFacto TVector<ui32> keyIndicies; for (ui32 i = 2; i < inputsWithCount; i += 2) { ui32 keyIndex = AS_VALUE(TDataLiteral, callable.GetInput(i - offset))->AsValue().Get<ui32>(); - MKQL_ENSURE(keyIndex + 1 < tupleType->GetElementsCount(), "Wrong key index"); + MKQL_ENSURE(keyIndex + 1 < wideComponents.size(), "Wrong key index"); keyIndicies.push_back(keyIndex); directions.push_back(LocateNode(ctx.NodeLocator, callable, i + 1 - offset)); } - return new TTopOrSortBlocksWrapper<Sort, HasCount>(ctx.Mutables, wideFlow, tupleType, count, std::move(directions), std::move(keyIndicies)); + return new TTopOrSortBlocksWrapper<Sort, HasCount>(ctx.Mutables, wideFlow, wideComponents, count, std::move(directions), std::move(keyIndicies)); } } //namespace diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp index 4f747fb7482..2074fab98c1 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp @@ -473,12 +473,8 @@ IComputationNode* WrapWideToBlocks(TCallable& callable, const TComputationNodeFa MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 args, got " << callable.GetInputsCount()); const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType()); - const auto tupleType = AS_TYPE(TTupleType, flowType->GetItemType()); - TVector<TType*> items; - for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) { - items.push_back(tupleType->GetElementType(i)); - } - + const auto wideComponents = GetWideComponents(flowType); + TVector<TType*> items(wideComponents.begin(), wideComponents.end()); auto wideFlow = dynamic_cast<IComputationWideFlowNode*>(LocateNode(ctx.NodeLocator, callable, 0)); MKQL_ENSURE(wideFlow != nullptr, "Expected wide flow node"); @@ -497,11 +493,11 @@ IComputationNode* WrapWideFromBlocks(TCallable& callable, const TComputationNode MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 args, got " << callable.GetInputsCount()); const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType()); - const auto tupleType = AS_TYPE(TTupleType, flowType->GetItemType()); - MKQL_ENSURE(tupleType->GetElementsCount() > 0, "Expected at least one column"); + const auto wideComponents = GetWideComponents(flowType); + MKQL_ENSURE(wideComponents.size() > 0, "Expected at least one column"); TVector<TType*> items; - for (ui32 i = 0; i < tupleType->GetElementsCount() - 1; ++i) { - const auto blockType = AS_TYPE(TBlockType, tupleType->GetElementType(i)); + for (ui32 i = 0; i < wideComponents.size() - 1; ++i) { + const auto blockType = AS_TYPE(TBlockType, wideComponents[i]); items.push_back(blockType->GetItemType()); } @@ -521,12 +517,12 @@ IComputationNode* WrapBlockExpandChunked(TCallable& callable, const TComputation MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 args, got " << callable.GetInputsCount()); const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType()); - const auto tupleType = AS_TYPE(TTupleType, flowType->GetItemType()); + const auto wideComponents = GetWideComponents(flowType); auto wideFlow = dynamic_cast<IComputationWideFlowNode*>(LocateNode(ctx.NodeLocator, callable, 0)); MKQL_ENSURE(wideFlow != nullptr, "Expected wide flow node"); - return new TBlockExpandChunkedWrapper(ctx.Mutables, wideFlow, tupleType->GetElementsCount()); + return new TBlockExpandChunkedWrapper(ctx.Mutables, wideFlow, wideComponents.size()); } } diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_discard.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_discard.cpp index 88a92bcd4e2..78c20eb3cf8 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_discard.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_discard.cpp @@ -3,6 +3,7 @@ #include <ydb/library/yql/minikql/computation/mkql_computation_node_codegen.h> #include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h> #include <ydb/library/yql/minikql/mkql_node_cast.h> +#include <ydb/library/yql/minikql/mkql_runtime_version.h> namespace NKikimr { namespace NMiniKQL { @@ -223,13 +224,15 @@ IComputationNode* WrapDiscard(TCallable& callable, const TComputationNodeFactory const auto type = callable.GetType()->GetReturnType(); const auto flow = LocateNode(ctx.NodeLocator, callable, 0); if (type->IsFlow()) { - if (const auto wide = dynamic_cast<IComputationWideFlowNode*>(flow)) - if (const auto itemType = AS_TYPE(TFlowType, callable.GetInput(0U).GetStaticType())->GetItemType(); itemType->IsTuple()) - return new TDiscardWideFlowWrapper(wide, AS_TYPE(TTupleType, itemType)->GetElementsCount()); - else - return new TDiscardWideFlowWrapper(wide, 0U); - else + if (const auto wide = dynamic_cast<IComputationWideFlowNode*>(flow)) { + auto flowType = AS_TYPE(TFlowType, callable.GetInput(0U).GetStaticType()); + if (RuntimeVersion > 35 && flowType->GetItemType()->IsMulti() || flowType->GetItemType()->IsTuple()) { + return new TDiscardWideFlowWrapper(wide, GetWideComponentsCount(flowType)); + } + return new TDiscardWideFlowWrapper(wide, 0U); + } else { return new TDiscardFlowWrapper(flow); + } } else if (type->IsStream()) { return new TDiscardWrapper(ctx.Mutables, flow); } diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_flatmap.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_flatmap.cpp index a6ea5576ca4..811d2463eca 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_flatmap.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_flatmap.cpp @@ -1712,7 +1712,7 @@ IComputationNode* WrapFlatMap(TCallable& callable, const TComputationNodeFactory IComputationNode* WrapNarrowFlatMap(TCallable& callable, const TComputationNodeFactoryContext& ctx) { MKQL_ENSURE(callable.GetInputsCount() > 1U, "Expected at least two args."); - const auto width = AS_TYPE(TTupleType, AS_TYPE(TFlowType, callable.GetInput(0U).GetStaticType())->GetItemType())->GetElementsCount(); + const auto width = GetWideComponentsCount(AS_TYPE(TFlowType, callable.GetInput(0U).GetStaticType())); MKQL_ENSURE(callable.GetInputsCount() == width + 2U, "Wrong signature."); const auto last = callable.GetInputsCount() - 1U; diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp index 4388da96073..29dfbf2cdd7 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp @@ -851,8 +851,8 @@ IComputationNode* WrapGraceJoin(TCallable& callable, const TComputationNodeFacto const auto leftFlowNode = callable.GetInput(0); const auto rightFlowNode = callable.GetInput(1); - const auto leftFlowTupleType = AS_TYPE(TFlowType, leftFlowNode)->GetItemType(); - const auto rightFlowTupleType = AS_TYPE(TFlowType, rightFlowNode)->GetItemType(); + const auto leftFlowComponents = GetWideComponents(AS_TYPE(TFlowType, leftFlowNode)); + const auto rightFlowComponents = GetWideComponents(AS_TYPE(TFlowType, rightFlowNode)); const auto joinKindNode = callable.GetInput(2); const auto leftKeyColumnsNode = AS_VALUE(TTupleLiteral, callable.GetInput(3)); const auto rightKeyColumnsNode = AS_VALUE(TTupleLiteral, callable.GetInput(4)); @@ -863,17 +863,16 @@ IComputationNode* WrapGraceJoin(TCallable& callable, const TComputationNodeFacto const auto flowLeft = dynamic_cast<IComputationWideFlowNode*> (LocateNode(ctx.NodeLocator, callable, 0)); const auto flowRight = dynamic_cast<IComputationWideFlowNode*> (LocateNode(ctx.NodeLocator, callable, 1)); - const auto tupleType = AS_TYPE(TTupleType, AS_TYPE(TFlowType, callable.GetType()->GetReturnType())->GetItemType()); + const auto outputFlowComponents = GetWideComponents(AS_TYPE(TFlowType, callable.GetType()->GetReturnType())); std::vector<EValueRepresentation> outputRepresentations; - outputRepresentations.reserve(tupleType->GetElementsCount()); - for (ui32 i = 0U; i < tupleType->GetElementsCount(); ++i) - outputRepresentations.emplace_back(GetValueRepresentation(tupleType->GetElementType(i))); + outputRepresentations.reserve(outputFlowComponents.size()); + for (ui32 i = 0U; i < outputFlowComponents.size(); ++i) { + outputRepresentations.emplace_back(GetValueRepresentation(outputFlowComponents[i])); + } std::vector<ui32> leftKeyColumns, leftRenames, rightKeyColumns, rightRenames; - std::vector<TType *> leftColumnsTypes, rightColumnsTypes; - - leftColumnsTypes.resize(AS_TYPE(TTupleType, leftFlowTupleType)->GetElementsCount()); - rightColumnsTypes.resize(AS_TYPE(TTupleType, rightFlowTupleType)->GetElementsCount()); + std::vector<TType*> leftColumnsTypes(leftFlowComponents.begin(), leftFlowComponents.end()); + std::vector<TType*> rightColumnsTypes(rightFlowComponents.begin(), rightFlowComponents.end()); leftKeyColumns.reserve(leftKeyColumnsNode->GetValuesCount()); for (ui32 i = 0; i < leftKeyColumnsNode->GetValuesCount(); ++i) { @@ -895,14 +894,6 @@ IComputationNode* WrapGraceJoin(TCallable& callable, const TComputationNodeFacto rightRenames.emplace_back(AS_VALUE(TDataLiteral, rightRenamesNode->GetValue(i))->AsValue().Get<ui32>()); } - for (ui32 i = 0; i < leftColumnsTypes.size(); ++i) { - leftColumnsTypes[i] = AS_TYPE(TTupleType, leftFlowTupleType)->GetElementType(i); - } - - for (ui32 i = 0; i < rightColumnsTypes.size(); ++i) { - rightColumnsTypes[i] = AS_TYPE(TTupleType, rightFlowTupleType)->GetElementType(i); - } - return new TGraceJoinWrapper( ctx.Mutables, flowLeft, flowRight, GetJoinKind(rawJoinKind), std::move(leftKeyColumns), std::move(rightKeyColumns), std::move(leftRenames), std::move(rightRenames), diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_join.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_join.cpp index 04946f18b93..3164ad4eeed 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_join.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_join.cpp @@ -2058,6 +2058,15 @@ IComputationNode* WrapCommonJoinCore(TCallable& callable, const TComputationNode fieldTypes.emplace_back(tupleType->GetElementType(i)); inputRepresentations.emplace_back(GetValueRepresentation(fieldTypes.back())); } + } else if (inputRowType->IsMulti()) { + const auto tupleType = AS_TYPE(TMultiType, inputRowType); + inputRepresentations.reserve(tupleType->GetElementsCount()); + fieldTypes.reserve(tupleType->GetElementsCount()); + for (ui32 i = 0U; i < tupleType->GetElementsCount(); ++i) { + fieldTypes.emplace_back(tupleType->GetElementType(i)); + inputRepresentations.emplace_back(GetValueRepresentation(fieldTypes.back())); + } + } else if (inputRowType->IsStruct()) { const auto structType = AS_TYPE(TStructType, inputRowType); inputRepresentations.reserve(structType->GetMembersCount()); @@ -2078,6 +2087,11 @@ IComputationNode* WrapCommonJoinCore(TCallable& callable, const TComputationNode outputRepresentations.reserve(tupleType->GetElementsCount()); for (ui32 i = 0U; i < tupleType->GetElementsCount(); ++i) outputRepresentations.emplace_back(GetValueRepresentation(tupleType->GetElementType(i))); + } else if (outputRowType->IsMulti()) { + const auto tupleType = AS_TYPE(TMultiType, outputRowType); + outputRepresentations.reserve(tupleType->GetElementsCount()); + for (ui32 i = 0U; i < tupleType->GetElementsCount(); ++i) + outputRepresentations.emplace_back(GetValueRepresentation(tupleType->GetElementType(i))); } else if (outputRowType->IsStruct()) { const auto structType = AS_TYPE(TStructType, outputRowType); outputRepresentations.reserve(structType->GetMembersCount()); diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_map_join.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_map_join.cpp index f7c3ddfc107..b37f588c707 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_map_join.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_map_join.cpp @@ -1878,7 +1878,9 @@ IComputationNode* WrapMapJoinCore(TCallable& callable, const TComputationNodeFac for (ui32 i = 0; i < leftKeyColumns.size(); ++i) { const auto leftColumnType = leftItemType->IsTuple() ? AS_TYPE(TTupleType, leftItemType)->GetElementType(leftKeyColumns[i]): - AS_TYPE(TStructType, leftItemType)->GetMemberType(leftKeyColumns[i]); + (leftItemType->IsMulti() ? + AS_TYPE(TMultiType, leftItemType)->GetElementType(leftKeyColumns[i]): + AS_TYPE(TStructType, leftItemType)->GetMemberType(leftKeyColumns[i])); const auto rightType = isTupleKey ? AS_TYPE(TTupleType, dictKeyType)->GetElementType(i) : dictKeyType; bool isOptional; if (UnpackOptional(leftColumnType, isOptional)->IsSameType(*rightType)) { @@ -1901,6 +1903,11 @@ IComputationNode* WrapMapJoinCore(TCallable& callable, const TComputationNodeFac outputRepresentations.reserve(tupleType->GetElementsCount()); for (ui32 i = 0U; i < tupleType->GetElementsCount(); ++i) outputRepresentations.emplace_back(GetValueRepresentation(tupleType->GetElementType(i))); + } else if (returnItemType->IsMulti()) { + const auto multiType = AS_TYPE(TMultiType, returnItemType); + outputRepresentations.reserve(multiType->GetElementsCount()); + for (ui32 i = 0U; i < multiType->GetElementsCount(); ++i) + outputRepresentations.emplace_back(GetValueRepresentation(multiType->GetElementType(i))); } else if (returnItemType->IsStruct()) { const auto structType = AS_TYPE(TStructType, returnItemType); outputRepresentations.reserve(structType->GetMembersCount()); @@ -1914,7 +1921,7 @@ IComputationNode* WrapMapJoinCore(TCallable& callable, const TComputationNodeFac #define NEW_WRAPPER(KIND, RIGHT_REQ, IS_TUPLE) \ if (type->IsFlow()) { \ if (const auto wide = dynamic_cast<IComputationWideFlowNode*>(flow)) { \ - const auto width = AS_TYPE(TTupleType, AS_TYPE(TFlowType, callable.GetInput(0U).GetStaticType())->GetItemType())->GetElementsCount(); \ + const auto width = GetWideComponentsCount(AS_TYPE(TFlowType, callable.GetInput(0U).GetStaticType())); \ if (boolWithoutRight) \ return new TWideMapJoinWrapper<true, RIGHT_REQ, IS_TUPLE>(ctx.Mutables, \ std::move(leftKeyConverters), dictType, std::move(outputRepresentations), \ diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_multimap.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_multimap.cpp index 7f49641dd59..dc4eb919483 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_multimap.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_multimap.cpp @@ -548,7 +548,8 @@ IComputationNode* WrapMultiMap(TCallable& callable, const TComputationNodeFactor IComputationNode* WrapNarrowMultiMap(TCallable& callable, const TComputationNodeFactoryContext& ctx) { MKQL_ENSURE(callable.GetInputsCount() > 2U, "Expected at least three arguments."); - const auto width = AS_TYPE(TTupleType, AS_TYPE(TFlowType, callable.GetInput(0U).GetStaticType())->GetItemType())->GetElementsCount(); + auto wideComponents = GetWideComponents(AS_TYPE(TFlowType, callable.GetInput(0U).GetStaticType())); + const auto width = wideComponents.size(); MKQL_ENSURE(callable.GetInputsCount() > width + 2U, "Wrong signature."); const auto flow = LocateNode(ctx.NodeLocator, callable, 0U); if (const auto wide = dynamic_cast<IComputationWideFlowNode*>(flow)) { diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_skip.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_skip.cpp index 78451056a88..6d1905e9fc8 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_skip.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_skip.cpp @@ -346,7 +346,7 @@ IComputationNode* WrapSkip(TCallable& callable, const TComputationNodeFactoryCon const auto count = LocateNode(ctx.NodeLocator, callable, 1); if (type->IsFlow()) { if (const auto wide = dynamic_cast<IComputationWideFlowNode*>(flow)) - return new TWideSkipWrapper(ctx.Mutables, wide, count, AS_TYPE(TTupleType, AS_TYPE(TFlowType, type)->GetItemType())->GetElementsCount()); + return new TWideSkipWrapper(ctx.Mutables, wide, count, GetWideComponentsCount(AS_TYPE(TFlowType, type))); else return new TSkipFlowWrapper(ctx.Mutables, GetValueRepresentation(type), flow, count); } else if (type->IsStream()) { diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_source.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_source.cpp index 20cdca896be..d53311791f5 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_source.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_source.cpp @@ -74,7 +74,7 @@ IComputationNode* WrapSourceOf(TCallable& callable, const TComputationNodeFactor IComputationNode* WrapSource(TCallable& callable, const TComputationNodeFactoryContext& ctx) { MKQL_ENSURE(!callable.GetInputsCount(), "Expected no args."); - MKQL_ENSURE(!AS_TYPE(TTupleType, AS_TYPE(TFlowType, callable.GetType()->GetReturnType())->GetItemType())->GetElementsCount(), "Expected zero width of output flow."); + MKQL_ENSURE(!GetWideComponentsCount(AS_TYPE(TFlowType, callable.GetType()->GetReturnType())), "Expected zero width of output flow."); return new TSourceWrapper; } diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_wide_chain_map.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_wide_chain_map.cpp index cd677661aa5..5c5e5a4bd85 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_wide_chain_map.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_wide_chain_map.cpp @@ -277,8 +277,8 @@ private: IComputationNode* WrapWideChain1Map(TCallable& callable, const TComputationNodeFactoryContext& ctx) { MKQL_ENSURE(callable.GetInputsCount() > 0U, "Expected argument."); - const auto inputWidth = AS_TYPE(TTupleType, AS_TYPE(TFlowType, callable.GetInput(0U).GetStaticType())->GetItemType())->GetElementsCount(); - const auto outputWidth = AS_TYPE(TTupleType, AS_TYPE(TFlowType, callable.GetType()->GetReturnType())->GetItemType())->GetElementsCount(); + const auto inputWidth = GetWideComponentsCount(AS_TYPE(TFlowType, callable.GetInput(0U).GetStaticType())); + const auto outputWidth = GetWideComponentsCount(AS_TYPE(TFlowType, callable.GetType()->GetReturnType())); MKQL_ENSURE(callable.GetInputsCount() == inputWidth + outputWidth * 3U + 1U, "Wrong signature."); const auto flow = LocateNode(ctx.NodeLocator, callable, 0U); diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_wide_chopper.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_wide_chopper.cpp index fa4bc5df4c2..e06568fa321 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_wide_chopper.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_wide_chopper.cpp @@ -341,7 +341,8 @@ private: IComputationNode* WrapWideChopper(TCallable& callable, const TComputationNodeFactoryContext& ctx) { MKQL_ENSURE(callable.GetInputsCount() >= 4U, "Expected at least four args."); - const auto width = AS_TYPE(TTupleType, AS_TYPE(TFlowType, callable.GetInput(0U).GetStaticType())->GetItemType())->GetElementsCount(); + const auto wideComponents = GetWideComponents(AS_TYPE(TFlowType, callable.GetInput(0U).GetStaticType())); + const ui32 width = wideComponents.size(); const auto flow = LocateNode(ctx.NodeLocator, callable, 0U); const auto keysSize = (callable.GetInputsCount() - width - 4U) >> 1U; diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp index b2c7698bd2a..16b86f90739 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp @@ -1054,8 +1054,8 @@ template<bool Last> IComputationNode* WrapWideCombinerT(TCallable& callable, const TComputationNodeFactoryContext& ctx) { MKQL_ENSURE(callable.GetInputsCount() >= (Last ? 3U : 4U), "Expected more arguments."); - const auto inputWidth = AS_TYPE(TTupleType, AS_TYPE(TFlowType, callable.GetInput(0U).GetStaticType())->GetItemType())->GetElementsCount(); - const auto outputWidth = AS_TYPE(TTupleType, AS_TYPE(TFlowType, callable.GetType()->GetReturnType())->GetItemType())->GetElementsCount(); + const auto inputWidth = GetWideComponentsCount(AS_TYPE(TFlowType, callable.GetInput(0U).GetStaticType())); + const auto outputWidth = GetWideComponentsCount(AS_TYPE(TFlowType, callable.GetType()->GetReturnType())); const auto flow = LocateNode(ctx.NodeLocator, callable, 0U); diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_wide_condense.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_wide_condense.cpp index b65ba68c214..b8a1ff2ed67 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_wide_condense.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_wide_condense.cpp @@ -253,8 +253,8 @@ private: IComputationNode* WrapWideCondense1(TCallable& callable, const TComputationNodeFactoryContext& ctx) { MKQL_ENSURE(callable.GetInputsCount() >= 2U, "Expected at least two args."); - const auto inputWidth = AS_TYPE(TTupleType, AS_TYPE(TFlowType, callable.GetInput(0U).GetStaticType())->GetItemType())->GetElementsCount(); - const auto outputWidth = AS_TYPE(TTupleType, AS_TYPE(TFlowType, callable.GetType()->GetReturnType())->GetItemType())->GetElementsCount(); + const auto inputWidth = GetWideComponentsCount(AS_TYPE(TFlowType, callable.GetInput(0U).GetStaticType())); + const auto outputWidth = GetWideComponentsCount(AS_TYPE(TFlowType, callable.GetType()->GetReturnType())); const auto flow = LocateNode(ctx.NodeLocator, callable, 0U); diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_wide_filter.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_wide_filter.cpp index a3fb6dd736d..a4d4606bc52 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_wide_filter.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_wide_filter.cpp @@ -399,7 +399,7 @@ private: template<bool TakeOrSkip, bool Inclusive> IComputationNode* WrapWideWhile(TCallable& callable, const TComputationNodeFactoryContext& ctx) { - const auto width = AS_TYPE(TTupleType, AS_TYPE(TFlowType, callable.GetType()->GetReturnType())->GetItemType())->GetElementsCount(); + const auto width = GetWideComponentsCount(AS_TYPE(TFlowType, callable.GetType()->GetReturnType())); MKQL_ENSURE(callable.GetInputsCount() == width + 2U, "Expected 3 or more args."); const auto flow = LocateNode(ctx.NodeLocator, callable, 0U); @@ -424,7 +424,7 @@ IComputationNode* WrapWideWhile(TCallable& callable, const TComputationNodeFacto } IComputationNode* WrapWideFilter(TCallable& callable, const TComputationNodeFactoryContext& ctx) { - const auto width = AS_TYPE(TTupleType, AS_TYPE(TFlowType, callable.GetType()->GetReturnType())->GetItemType())->GetElementsCount(); + const auto width = GetWideComponentsCount(AS_TYPE(TFlowType, callable.GetType()->GetReturnType())); MKQL_ENSURE(callable.GetInputsCount() == width + 2U || callable.GetInputsCount() == width + 3U, "Expected 3 or more args."); const auto flow = LocateNode(ctx.NodeLocator, callable, 0U); diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_wide_map.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_wide_map.cpp index 32586e61a8a..4ca15daf5e5 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_wide_map.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_wide_map.cpp @@ -259,7 +259,7 @@ private: } IComputationNode* WrapExpandMap(TCallable& callable, const TComputationNodeFactoryContext& ctx) { - const auto width = AS_TYPE(TTupleType, AS_TYPE(TFlowType, callable.GetType()->GetReturnType())->GetItemType())->GetElementsCount(); + const auto width = GetWideComponentsCount(AS_TYPE(TFlowType, callable.GetType()->GetReturnType())); MKQL_ENSURE(callable.GetInputsCount() == width + 2U, "Expected two or more args."); const auto flow = LocateNode(ctx.NodeLocator, callable, 0U); @@ -273,8 +273,8 @@ IComputationNode* WrapExpandMap(TCallable& callable, const TComputationNodeFacto IComputationNode* WrapWideMap(TCallable& callable, const TComputationNodeFactoryContext& ctx) { MKQL_ENSURE(callable.GetInputsCount() > 0U, "Expected argument."); - const auto inputWidth = AS_TYPE(TTupleType, AS_TYPE(TFlowType, callable.GetInput(0U).GetStaticType())->GetItemType())->GetElementsCount(); - const auto outputWidth = AS_TYPE(TTupleType, AS_TYPE(TFlowType, callable.GetType()->GetReturnType())->GetItemType())->GetElementsCount(); + const auto inputWidth = GetWideComponentsCount(AS_TYPE(TFlowType, callable.GetInput(0U).GetStaticType())); + const auto outputWidth = GetWideComponentsCount(AS_TYPE(TFlowType, callable.GetType()->GetReturnType())); MKQL_ENSURE(callable.GetInputsCount() == inputWidth + outputWidth + 1U, "Wrong signature."); const auto flow = LocateNode(ctx.NodeLocator, callable, 0U); @@ -295,7 +295,7 @@ IComputationNode* WrapWideMap(TCallable& callable, const TComputationNodeFactory IComputationNode* WrapNarrowMap(TCallable& callable, const TComputationNodeFactoryContext& ctx) { MKQL_ENSURE(callable.GetInputsCount() > 1U, "Expected two or more args."); - const auto width = AS_TYPE(TTupleType, AS_TYPE(TFlowType, callable.GetInput(0U).GetStaticType())->GetItemType())->GetElementsCount(); + const auto width = GetWideComponentsCount(AS_TYPE(TFlowType, callable.GetInput(0U).GetStaticType())); MKQL_ENSURE(callable.GetInputsCount() == width + 2U, "Wrong signature."); const auto flow = LocateNode(ctx.NodeLocator, callable, 0U); if (const auto wide = dynamic_cast<IComputationWideFlowNode*>(flow)) { diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_wide_top_sort.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_wide_top_sort.cpp index ae59e906c70..39aa47b86dd 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_wide_top_sort.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_wide_top_sort.cpp @@ -516,8 +516,8 @@ IComputationNode* WrapWideTopT(TCallable& callable, const TComputationNodeFactor } const auto keyWidth = (inputsWithCount >> 1U) - 1U; - const auto inputType = AS_TYPE(TTupleType, AS_TYPE(TFlowType, callable.GetType()->GetReturnType())->GetItemType()); - std::vector<ui32> indexes(inputType->GetElementsCount()); + const auto inputWideComponents = GetWideComponents(AS_TYPE(TFlowType, callable.GetType()->GetReturnType())); + std::vector<ui32> indexes(inputWideComponents.size()); TKeyTypes keyTypes(keyWidth); std::unordered_set<ui32> keyIndexes; @@ -525,7 +525,7 @@ IComputationNode* WrapWideTopT(TCallable& callable, const TComputationNodeFactor const auto keyIndex = AS_VALUE(TDataLiteral, callable.GetInput(((i + 1U) << 1U) - offset))->AsValue().Get<ui32>(); indexes[i] = keyIndex; keyIndexes.emplace(keyIndex); - keyTypes[i].first = *UnpackOptionalData(inputType->GetElementType(keyIndex), keyTypes[i].second)->GetDataSlot(); + keyTypes[i].first = *UnpackOptionalData(inputWideComponents[keyIndex], keyTypes[i].second)->GetDataSlot(); } size_t payloadPos = keyTypes.size(); @@ -537,9 +537,9 @@ IComputationNode* WrapWideTopT(TCallable& callable, const TComputationNodeFactor indexes[payloadPos++] = i; } - std::vector<EValueRepresentation> representations(inputType->GetElementsCount()); + std::vector<EValueRepresentation> representations(inputWideComponents.size()); for (auto i = 0U; i < representations.size(); ++i) - representations[i] = GetValueRepresentation(inputType->GetElementType(indexes[i])); + representations[i] = GetValueRepresentation(inputWideComponents[indexes[i]]); TComputationNodePtrVector directions(keyWidth); auto index = 1U - offset; diff --git a/ydb/library/yql/minikql/comp_nodes/ut/mkql_block_skiptake_ut.cpp b/ydb/library/yql/minikql/comp_nodes/ut/mkql_block_skiptake_ut.cpp index 4b12ffc7c35..830ee7152ad 100644 --- a/ydb/library/yql/minikql/comp_nodes/ut/mkql_block_skiptake_ut.cpp +++ b/ydb/library/yql/minikql/comp_nodes/ut/mkql_block_skiptake_ut.cpp @@ -114,7 +114,7 @@ TRuntimeNode MakeFlow(TSetup_& setup) { TProgramBuilder& pb = *setup.PgmBuilder; TCallableBuilder callableBuilder(*setup.Env, "TestBlockFlow", pb.NewFlowType( - pb.NewTupleType({ + pb.NewMultiType({ pb.NewBlockType(pb.NewDataType(NUdf::EDataSlot::Uint64), TBlockType::EShape::Many), pb.NewBlockType(pb.NewDataType(NUdf::EDataSlot::Uint64), TBlockType::EShape::Scalar), pb.NewBlockType(pb.NewDataType(NUdf::EDataSlot::Uint64), TBlockType::EShape::Scalar), diff --git a/ydb/library/yql/minikql/comp_nodes/ut/mkql_grace_join_ut.cpp b/ydb/library/yql/minikql/comp_nodes/ut/mkql_grace_join_ut.cpp index 8ad3b86c169..2bb910d4ad9 100644 --- a/ydb/library/yql/minikql/comp_nodes/ut/mkql_grace_join_ut.cpp +++ b/ydb/library/yql/minikql/comp_nodes/ut/mkql_grace_join_ut.cpp @@ -370,7 +370,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) { }); - const auto resultType = pb.NewFlowType(pb.NewTupleType({ + const auto resultType = pb.NewFlowType(pb.NewMultiType({ pb.NewDataType(NUdf::TDataType<char*>::Id), pb.NewDataType(NUdf::TDataType<char*>::Id) })); @@ -444,7 +444,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) { }); - const auto resultType = pb.NewFlowType(pb.NewTupleType({ + const auto resultType = pb.NewFlowType(pb.NewMultiType({ pb.NewDataType(NUdf::TDataType<char*>::Id), pb.NewDataType(NUdf::TDataType<char*>::Id) })); @@ -507,7 +507,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) { }); - const auto resultType = pb.NewFlowType(pb.NewTupleType({ + const auto resultType = pb.NewFlowType(pb.NewMultiType({ pb.NewDataType(NUdf::TDataType<char*>::Id), pb.NewDataType(NUdf::TDataType<char*>::Id) })); @@ -575,7 +575,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) { pb.NewTuple({key4, payload6}) }); - const auto resultType = pb.NewFlowType(pb.NewTupleType({ + const auto resultType = pb.NewFlowType(pb.NewMultiType({ pb.NewDataType(NUdf::TDataType<char*>::Id), pb.NewDataType(NUdf::TDataType<char*>::Id) })); @@ -643,7 +643,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) { pb.NewTuple({key4, payload6}) }); - const auto resultType = pb.NewFlowType(pb.NewTupleType({ + const auto resultType = pb.NewFlowType(pb.NewMultiType({ pb.NewDataType(NUdf::TDataType<char*>::Id), pb.NewDataType(NUdf::TDataType<char*>::Id) })); @@ -716,7 +716,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) { }); - const auto resultType = pb.NewFlowType(pb.NewTupleType({ + const auto resultType = pb.NewFlowType(pb.NewMultiType({ pb.NewDataType(NUdf::TDataType<char*>::Id), pb.NewDataType(NUdf::TDataType<char*>::Id) })); @@ -789,7 +789,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) { }); - const auto resultType = pb.NewFlowType(pb.NewTupleType({ + const auto resultType = pb.NewFlowType(pb.NewMultiType({ pb.NewDataType(NUdf::TDataType<char*>::Id), pb.NewDataType(NUdf::TDataType<ui32>::Id) })); @@ -851,7 +851,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) { pb.NewTuple({key4, payload6}) }); - const auto resultType = pb.NewFlowType(pb.NewTupleType({ + const auto resultType = pb.NewFlowType(pb.NewMultiType({ pb.NewDataType(NUdf::TDataType<char*>::Id), pb.NewDataType(NUdf::TDataType<ui32>::Id) })); @@ -912,7 +912,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) { pb.NewTuple({key4, payload6}) }); - const auto resultType = pb.NewFlowType(pb.NewTupleType({ + const auto resultType = pb.NewFlowType(pb.NewMultiType({ pb.NewDataType(NUdf::TDataType<char*>::Id), pb.NewDataType(NUdf::TDataType<ui32>::Id) })); @@ -977,7 +977,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) { pb.NewTuple({key4, payload6}) }); - const auto resultType = pb.NewFlowType(pb.NewTupleType({ + const auto resultType = pb.NewFlowType(pb.NewMultiType({ pb.NewDataType(NUdf::TDataType<char*>::Id), pb.NewDataType(NUdf::TDataType<ui32>::Id) })); @@ -1039,7 +1039,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) { pb.NewTuple({key4, payload6}) }); - const auto resultType = pb.NewFlowType(pb.NewTupleType({ + const auto resultType = pb.NewFlowType(pb.NewMultiType({ pb.NewDataType(NUdf::TDataType<char*>::Id), pb.NewDataType(NUdf::TDataType<char*>::Id) })); @@ -1112,7 +1112,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) { pb.NewTuple({key4, payload6}) }); - const auto resultType = pb.NewFlowType(pb.NewTupleType({ + const auto resultType = pb.NewFlowType(pb.NewMultiType({ pb.NewDataType(NUdf::TDataType<char*>::Id), pb.NewDataType(NUdf::TDataType<ui32>::Id) })); @@ -1172,7 +1172,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) { }); - const auto resultType = pb.NewFlowType(pb.NewTupleType({ + const auto resultType = pb.NewFlowType(pb.NewMultiType({ pb.NewDataType(NUdf::TDataType<char*>::Id), pb.NewDataType(NUdf::TDataType<ui32>::Id) })); @@ -1236,7 +1236,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) { }); - const auto resultType = pb.NewFlowType(pb.NewTupleType({ + const auto resultType = pb.NewFlowType(pb.NewMultiType({ pb.NewDataType(NUdf::TDataType<char*>::Id), pb.NewDataType(NUdf::TDataType<char*>::Id) })); @@ -1312,7 +1312,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) { pb.NewTuple({key4, payload6}) }); - const auto resultType = pb.NewFlowType(pb.NewTupleType({ + const auto resultType = pb.NewFlowType(pb.NewMultiType({ pb.NewDataType(NUdf::TDataType<char*>::Id), pb.NewDataType(NUdf::TDataType<ui32>::Id) })); @@ -1377,7 +1377,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) { pb.NewTuple({key4, payload6}) }); - const auto resultType = pb.NewFlowType(pb.NewTupleType({ + const auto resultType = pb.NewFlowType(pb.NewMultiType({ pb.NewDataType(NUdf::TDataType<char*>::Id), pb.NewDataType(NUdf::TDataType<ui32>::Id) })); @@ -1440,7 +1440,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) { }); - const auto resultType = pb.NewFlowType(pb.NewTupleType({ + const auto resultType = pb.NewFlowType(pb.NewMultiType({ pb.NewDataType(NUdf::TDataType<char*>::Id), pb.NewDataType(NUdf::TDataType<char*>::Id) })); @@ -1517,7 +1517,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) { }); - const auto resultType = pb.NewFlowType(pb.NewTupleType({ + const auto resultType = pb.NewFlowType(pb.NewMultiType({ pb.NewDataType(NUdf::TDataType<char*>::Id), pb.NewDataType(NUdf::TDataType<char*>::Id) })); diff --git a/ydb/library/yql/minikql/comp_nodes/ut/mkql_join_ut.cpp b/ydb/library/yql/minikql/comp_nodes/ut/mkql_join_ut.cpp index fb25e1ad1be..89e2827eb41 100644 --- a/ydb/library/yql/minikql/comp_nodes/ut/mkql_join_ut.cpp +++ b/ydb/library/yql/minikql/comp_nodes/ut/mkql_join_ut.cpp @@ -20,7 +20,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLCommonJoinCoreTupleTest) { const auto list = pb.NewList(tupleType, {data1, data3, data2, data4}); - const auto outputType = pb.NewFlowType(pb.NewTupleType({optionalType, optionalType})); + const auto outputType = pb.NewFlowType(pb.NewMultiType({optionalType, optionalType})); const auto pgmReturn = pb.Collect(pb.CommonJoinCore(pb.ToFlow(list), EJoinKind::Inner, {0U, 0U}, {1U, 1U}, {}, {2U}, 0ULL, std::nullopt, EAnyJoinSettings::None, 3U, outputType)); const auto graph = setup.BuildGraph(pgmReturn); @@ -57,7 +57,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLCommonJoinCoreTupleTest) { const auto list = pb.NewList(tupleType, {data1, data2, data3, data4}); - const auto outputType = pb.NewFlowType(pb.NewTupleType({optionalType, optionalType})); + const auto outputType = pb.NewFlowType(pb.NewMultiType({optionalType, optionalType})); const auto pgmReturn = pb.Collect(pb.CommonJoinCore(pb.ToFlow(list), EJoinKind::Inner, {0U, 0U}, {1U, 1U}, {}, {2U}, 0ULL, {0U}, EAnyJoinSettings::None, 3U, outputType)); const auto graph = setup.BuildGraph(pgmReturn); @@ -94,7 +94,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLCommonJoinCoreTupleTest) { const auto list = pb.NewList(tupleType, {data3, data4, data1, data2}); - const auto outputType = pb.NewFlowType(pb.NewTupleType({optionalType, optionalType})); + const auto outputType = pb.NewFlowType(pb.NewMultiType({optionalType, optionalType})); const auto pgmReturn = pb.Collect(pb.CommonJoinCore(pb.ToFlow(list), EJoinKind::Inner, {0U, 0U}, {1U, 1U}, {}, {2U}, 0ULL, {1U}, EAnyJoinSettings::None, 3U, outputType)); const auto graph = setup.BuildGraph(pgmReturn); @@ -133,7 +133,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLCommonJoinCoreWideTest) { const auto list = pb.NewList(tupleType, {data1, data3, data2, data4}); - const auto outputType = pb.NewFlowType(pb.NewTupleType({optionalType, optionalType})); + const auto outputType = pb.NewFlowType(pb.NewMultiType({optionalType, optionalType})); const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.CommonJoinCore(pb.ExpandMap(pb.ToFlow(list), [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U), pb.Nth(item, 2U), pb.Nth(item, 3U)}; }), EJoinKind::Inner, {0U, 0U}, {1U, 1U}, {}, {2U}, 0ULL, std::nullopt, EAnyJoinSettings::None, 3U, outputType), @@ -174,7 +174,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLCommonJoinCoreWideTest) { const auto list = pb.NewList(tupleType, {data1, data2, data3, data4}); - const auto outputType = pb.NewFlowType(pb.NewTupleType({optionalType, optionalType})); + const auto outputType = pb.NewFlowType(pb.NewMultiType({optionalType, optionalType})); const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.CommonJoinCore(pb.ExpandMap(pb.ToFlow(list), [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U), pb.Nth(item, 2U), pb.Nth(item, 3U)}; }), EJoinKind::Inner, {0U, 0U}, {1U, 1U}, {}, {2U}, 0ULL, {0U}, EAnyJoinSettings::None, 3U, outputType), @@ -215,7 +215,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLCommonJoinCoreWideTest) { const auto list = pb.NewList(tupleType, {data3, data4, data1, data2}); - const auto outputType = pb.NewFlowType(pb.NewTupleType({optionalType, optionalType})); + const auto outputType = pb.NewFlowType(pb.NewMultiType({optionalType, optionalType})); const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.CommonJoinCore(pb.ExpandMap(pb.ToFlow(list), [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U), pb.Nth(item, 2U), pb.Nth(item, 3U)}; }), EJoinKind::Inner, {0U, 0U}, {1U, 1U}, {}, {2U}, 0ULL, {1U}, EAnyJoinSettings::None, 3U, outputType), @@ -262,7 +262,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLCommonJoinCoreWideTest) { const auto list = pb.NewList(tupleType, {data1, data2, data3, data4}); - const auto outputType = pb.NewFlowType(pb.NewTupleType({optStrType, optStrType})); + const auto outputType = pb.NewFlowType(pb.NewMultiType({optStrType, optStrType})); const auto landmine = pb.NewDataLiteral<NUdf::EDataSlot::Utf8>("ACHTUNG MINEN!"); @@ -303,7 +303,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLCommonJoinCoreWideTest) { const auto list = pb.NewList(tupleType, {data1, data2, data3, data4}); - const auto outputType = pb.NewFlowType(pb.NewTupleType({optStrType, optStrType})); + const auto outputType = pb.NewFlowType(pb.NewMultiType({optStrType, optStrType})); const auto landmine = pb.NewDataLiteral<NUdf::EDataSlot::Utf8>("ACHTUNG MINEN!"); diff --git a/ydb/library/yql/minikql/comp_nodes/ut/mkql_map_join_ut.cpp b/ydb/library/yql/minikql/comp_nodes/ut/mkql_map_join_ut.cpp index 6adb2643c62..45f039e2099 100644 --- a/ydb/library/yql/minikql/comp_nodes/ut/mkql_map_join_ut.cpp +++ b/ydb/library/yql/minikql/comp_nodes/ut/mkql_map_join_ut.cpp @@ -628,7 +628,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideMapJoinCoreTest) { return pb.NewTuple({pb.Nth(item, 1U)}); }); - const auto resultType = pb.NewFlowType(pb.NewTupleType({ + const auto resultType = pb.NewFlowType(pb.NewMultiType({ pb.NewDataType(NUdf::TDataType<char*>::Id), pb.NewDataType(NUdf::TDataType<char*>::Id) })); @@ -695,7 +695,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideMapJoinCoreTest) { return pb.NewTuple({pb.Nth(item, 1U)}); }); - const auto resultType = pb.NewFlowType(pb.NewTupleType({ + const auto resultType = pb.NewFlowType(pb.NewMultiType({ pb.NewDataType(NUdf::TDataType<char*>::Id), pb.NewDataType(NUdf::TDataType<char*>::Id) })); @@ -768,7 +768,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideMapJoinCoreTest) { return pb.NewTuple({pb.Nth(item, 1U)}); }); - const auto resultType = pb.NewFlowType(pb.NewTupleType({ + const auto resultType = pb.NewFlowType(pb.NewMultiType({ pb.NewDataType(NUdf::TDataType<char*>::Id), pb.NewDataType(NUdf::TDataType<char*>::Id) })); @@ -838,7 +838,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideMapJoinCoreTest) { return pb.NewTuple({pb.Nth(item, 1U)}); }); - const auto resultType = pb.NewFlowType(pb.NewTupleType({ + const auto resultType = pb.NewFlowType(pb.NewMultiType({ pb.NewDataType(NUdf::TDataType<char*>::Id), pb.NewDataType(NUdf::TDataType<char*>::Id) })); @@ -914,7 +914,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideMapJoinCoreTest) { return pb.NewTuple({pb.Nth(item, 1U)}); }); - const auto resultType = pb.NewFlowType(pb.NewTupleType({ + const auto resultType = pb.NewFlowType(pb.NewMultiType({ pb.NewDataType(NUdf::TDataType<char*>::Id), pb.NewDataType(NUdf::TDataType<ui32>::Id) })); @@ -981,7 +981,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideMapJoinCoreTest) { return pb.NewTuple({pb.Nth(item, 1U)}); }); - const auto resultType = pb.NewFlowType(pb.NewTupleType({ + const auto resultType = pb.NewFlowType(pb.NewMultiType({ pb.NewDataType(NUdf::TDataType<char*>::Id), pb.NewDataType(NUdf::TDataType<ui32>::Id) })); @@ -1048,7 +1048,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideMapJoinCoreTest) { return pb.NewTuple({pb.Nth(item, 1U)}); }); - const auto resultType = pb.NewFlowType(pb.NewTupleType({ + const auto resultType = pb.NewFlowType(pb.NewMultiType({ pb.NewDataType(NUdf::TDataType<char*>::Id), pb.NewDataType(NUdf::TDataType<ui32>::Id) })); @@ -1118,7 +1118,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideMapJoinCoreTest) { return pb.NewTuple({pb.Nth(item, 1U)}); }); - const auto resultType = pb.NewFlowType(pb.NewTupleType({ + const auto resultType = pb.NewFlowType(pb.NewMultiType({ pb.NewDataType(NUdf::TDataType<char*>::Id), pb.NewDataType(NUdf::TDataType<ui32>::Id) })); diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp b/ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp index c85651f48b2..a3aae17bfdc 100644 --- a/ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp +++ b/ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp @@ -307,6 +307,10 @@ private: VisitType<TBlockType>(node); } + void Visit(TMultiType& node) override { + VisitType<TMultiType>(node); + } + void Visit(TTaggedType& node) override { VisitType<TTaggedType>(node); } diff --git a/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp b/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp index 685202f0b6a..8fc702bd8a4 100644 --- a/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp +++ b/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp @@ -410,7 +410,7 @@ TRuntimeNode CreateMapJoin(TProgramBuilder& pb, size_t vecSize, TCallable* list return pb.NewTuple({pb.Nth(item, 1U)}); }); - const auto resultType = pb.NewFlowType(pb.NewTupleType({ + const auto resultType = pb.NewFlowType(pb.NewMultiType({ pb.NewDataType(NUdf::TDataType<char*>::Id), pb.NewDataType(NUdf::TDataType<char*>::Id), })); diff --git a/ydb/library/yql/minikql/mkql_node.cpp b/ydb/library/yql/minikql/mkql_node.cpp index ab1dd70f815..7a3f2cc3673 100644 --- a/ydb/library/yql/minikql/mkql_node.cpp +++ b/ydb/library/yql/minikql/mkql_node.cpp @@ -3,6 +3,7 @@ #include "mkql_node_cast.h" #include "mkql_node_visitor.h" #include "mkql_node_printer.h" +#include "mkql_runtime_version.h" #include <ydb/library/yql/parser/pg_catalog/catalog.h> #include <ydb/public/lib/scheme_types/scheme_type_id.h> @@ -213,6 +214,7 @@ TStringBuf TType::GetKindAsStr() const { xx(Tagged, TTaggedType) \ xx(Block, TBlockType) \ xx(Pg, TPgType) \ + xx(Multi, TMultiType) \ void TType::Accept(INodeVisitor& visitor) { switch (Kind) { @@ -1903,110 +1905,6 @@ bool TAny::Equals(const TAny& nodeToCompare) const { return Item.GetNode()->Equals(*nodeToCompare.Item.GetNode()); } -TTupleType::TTupleType(ui32 elementsCount, TType** elements, const TTypeEnvironment& env, bool validate) - : TType(EKind::Tuple, env.GetTypeOfType()) - , ElementsCount(elementsCount) - , Elements(elements) -{ - if (!validate) - return; -} - -TTupleType* TTupleType::Create(ui32 elementsCount, TType* const* elements, const TTypeEnvironment& env) { - TType** allocatedElements = nullptr; - if (elementsCount) { - allocatedElements = static_cast<TType**>(env.AllocateBuffer(elementsCount * sizeof(*allocatedElements))); - for (ui32 i = 0; i < elementsCount; ++i) { - allocatedElements[i] = elements[i]; - } - } - - return ::new(env.Allocate<TTupleType>()) TTupleType(elementsCount, allocatedElements, env); -} - -bool TTupleType::IsSameType(const TTupleType& typeToCompare) const { - if (this == &typeToCompare) - return true; - - if (ElementsCount != typeToCompare.ElementsCount) - return false; - - for (size_t index = 0; index < ElementsCount; ++index) { - if (!Elements[index]->IsSameType(*typeToCompare.Elements[index])) - return false; - } - - return true; -} - -bool TTupleType::IsConvertableTo(const TTupleType& typeToCompare, bool ignoreTagged) const { - if (this == &typeToCompare) - return true; - - if (ElementsCount != typeToCompare.ElementsCount) - return false; - - for (size_t index = 0; index < ElementsCount; ++index) { - if (!Elements[index]->IsConvertableTo(*typeToCompare.Elements[index], ignoreTagged)) - return false; - } - - return true; -} - -void TTupleType::DoUpdateLinks(const THashMap<TNode*, TNode*>& links) { - for (ui32 i = 0; i < ElementsCount; ++i) { - auto& element = Elements[i]; - auto elementIt = links.find(element); - if (elementIt != links.end()) { - TNode* newNode = elementIt->second; - Y_VERIFY_DEBUG(element->Equals(*newNode)); - element = static_cast<TType*>(newNode); - } - } -} - -TNode* TTupleType::DoCloneOnCallableWrite(const TTypeEnvironment& env) const { - bool needClone = false; - for (ui32 i = 0; i < ElementsCount; ++i) { - if (Elements[i]->GetCookie()) { - needClone = true; - break; - } - } - - if (!needClone) - return const_cast<TTupleType*>(this); - - TType** allocatedElements = nullptr; - if (ElementsCount) { - allocatedElements = static_cast<TType**>(env.AllocateBuffer(ElementsCount * sizeof(*allocatedElements))); - for (ui32 i = 0; i < ElementsCount; ++i) { - allocatedElements[i] = Elements[i]; - auto newNode = (TNode*)Elements[i]->GetCookie(); - if (newNode) { - allocatedElements[i] = static_cast<TType*>(newNode); - } - } - } - - return ::new(env.Allocate<TTupleType>()) TTupleType(ElementsCount, allocatedElements, env, false); -} - -void TTupleType::DoFreeze(const TTypeEnvironment& env) { - Y_UNUSED(env); -} - -bool TTupleType::CalculatePresortSupport() { - for (ui32 i = 0; i < ElementsCount; ++i) { - if (!Elements[i]->IsPresortSupported()) { - return false; - } - } - - return true; -} - TTupleLiteral::TTupleLiteral(TRuntimeNode* values, TTupleType* type, bool validate) : TNode(type) , Values(values) @@ -2371,6 +2269,7 @@ EValueRepresentation GetValueRepresentation(const TType* type) { case TType::EKind::Callable: case TType::EKind::EmptyList: case TType::EKind::EmptyDict: + case TType::EKind::Multi: return EValueRepresentation::Boxed; case TType::EKind::Variant: @@ -2391,5 +2290,17 @@ EValueRepresentation GetValueRepresentation(const TType* type) { } } +TArrayRef<TType* const> GetWideComponents(const TFlowType* type) { + if (RuntimeVersion > 35) { + return AS_TYPE(TMultiType, type->GetItemType())->GetElements(); + } + return AS_TYPE(TTupleType, type->GetItemType())->GetElements(); +} + +TArrayRef<TType* const> GetWideComponents(const TStreamType* type) { + MKQL_ENSURE(RuntimeVersion > 35, "Wide stream is not supported in runtime version " << RuntimeVersion); + return AS_TYPE(TMultiType, type->GetItemType())->GetElements(); +} + } } diff --git a/ydb/library/yql/minikql/mkql_node.h b/ydb/library/yql/minikql/mkql_node.h index 12eb7bf85da..fc70424d5de 100644 --- a/ydb/library/yql/minikql/mkql_node.h +++ b/ydb/library/yql/minikql/mkql_node.h @@ -149,7 +149,8 @@ class TTypeEnvironment; XX(EmptyDict, 32 + 2) \ XX(Tagged, 48 + 7) \ XX(Block, 16 + 13) \ - XX(Pg, 16 + 3) + XX(Pg, 16 + 3) \ + XX(Multi, 16 + 11) class TType : public TNode { public: @@ -256,6 +257,9 @@ using TEmptyDictType = TSingularType<TType::EKind::EmptyDict>; template <TType::EKind SingularKind> TType* GetTypeOfSingular(const TTypeEnvironment& env); +template <typename TLiteralType> +TLiteralType* GetEmptyLiteral(const TTypeEnvironment& env); + template <TType::EKind SingularKind> class TSingular : public TNode { friend class TTypeEnvironment; @@ -535,6 +539,11 @@ inline TType* GetTypeOfSingular<TType::EKind::EmptyDict>(const TTypeEnvironment& return env.GetTypeOfEmptyDict(); } +template <> +inline TTupleLiteral* GetEmptyLiteral(const TTypeEnvironment& env) { + return env.GetEmptyTuple(); +} + class TDataType : public TType { friend class TType; public: @@ -1176,16 +1185,60 @@ private: TRuntimeNode Item; }; -class TTupleType : public TType { +template<typename TDerived, TType::EKind DerivedKind> +class TTupleLikeType : public TType { friend class TType; +using TSelf = TTupleLikeType<TDerived, DerivedKind>; public: - static TTupleType* Create(ui32 elementsCount, TType* const* elements, const TTypeEnvironment& env); + static TDerived* Create(ui32 elementsCount, TType* const* elements, const TTypeEnvironment& env) { + TType **allocatedElements = nullptr; + if (elementsCount) { + allocatedElements = static_cast<TType **>(env.AllocateBuffer(elementsCount * sizeof(*allocatedElements))); + for (ui32 i = 0; i < elementsCount; ++i) { + allocatedElements[i] = elements[i]; + } + } + + return ::new (env.Allocate<TDerived>()) TDerived(elementsCount, allocatedElements, env); + } using TType::IsSameType; - bool IsSameType(const TTupleType& typeToCompare) const; + bool IsSameType(const TDerived& typeToCompare) const { + if (this == &typeToCompare) { + return true; + } + + if (ElementsCount != typeToCompare.ElementsCount) { + return false; + } + + for (size_t index = 0; index < ElementsCount; ++index) { + if (!Elements[index]->IsSameType(*typeToCompare.Elements[index])) { + return false; + } + } + + return true; + } using TType::IsConvertableTo; - bool IsConvertableTo(const TTupleType& typeToCompare, bool ignoreTagged = false) const; + bool IsConvertableTo(const TDerived& typeToCompare, bool ignoreTagged = false) const { + if (this == &typeToCompare) { + return true; + } + + if (ElementsCount != typeToCompare.GetElementsCount()) { + return false; + } + + for (size_t index = 0; index < ElementsCount; ++index) { + if (!Elements[index]->IsConvertableTo(*typeToCompare.GetElementType(index), ignoreTagged)) { + return false; + } + } + + return true; + } ui32 GetElementsCount() const { return ElementsCount; @@ -1196,19 +1249,102 @@ public: return Elements[index]; } + TArrayRef<TType* const> GetElements() const { + return TArrayRef<TType* const>(Elements, ElementsCount); + } + + protected: + TTupleLikeType(ui32 elementsCount, TType** elements, const TTypeEnvironment& env) + : TType(DerivedKind, env.GetTypeOfType()) + , ElementsCount(elementsCount) + , Elements(elements) + { + } + private: - TTupleType(ui32 elemntsCount, TType** elements, const TTypeEnvironment& env, bool validate = true); + void DoUpdateLinks(const THashMap<TNode*, TNode*>& links) { + for (ui32 i = 0; i < ElementsCount; ++i) { + auto &element = Elements[i]; + auto elementIt = links.find(element); + if (elementIt != links.end()) { + TNode* newNode = elementIt->second; + Y_VERIFY_DEBUG(element->Equals(*newNode)); + element = static_cast<TType*>(newNode); + } + } + } - void DoUpdateLinks(const THashMap<TNode*, TNode*>& links); - TNode* DoCloneOnCallableWrite(const TTypeEnvironment& env) const; - void DoFreeze(const TTypeEnvironment& env); - bool CalculatePresortSupport() override; + TNode* DoCloneOnCallableWrite(const TTypeEnvironment& env) const { + bool needClone = false; + for (ui32 i = 0; i < ElementsCount; ++i) { + if (Elements[i]->GetCookie()) { + needClone = true; + break; + } + } + + if (!needClone) { + return const_cast<TSelf*>(this); + } + + TType** allocatedElements = nullptr; + if (ElementsCount) { + allocatedElements = static_cast<TType**>(env.AllocateBuffer(ElementsCount * sizeof(*allocatedElements))); + for (ui32 i = 0; i < ElementsCount; ++i) { + allocatedElements[i] = Elements[i]; + auto newNode = (TNode *)Elements[i]->GetCookie(); + if (newNode) { + allocatedElements[i] = static_cast<TType*>(newNode); + } + } + } + + return ::new (env.Allocate<TDerived>()) TDerived(ElementsCount, allocatedElements, env); + } + + void DoFreeze(const TTypeEnvironment& env) { + Y_UNUSED(env); + } + + bool CalculatePresortSupport() override { + for (ui32 i = 0; i < ElementsCount; ++i) { + if (!Elements[i]->IsPresortSupported()) { + return false; + } + } + return true; + } private: ui32 ElementsCount; TType** Elements; }; +class TTupleType : public TTupleLikeType<TTupleType, TType::EKind::Tuple> { +private: + friend class TType; + using TBase = TTupleLikeType<TTupleType, TType::EKind::Tuple>; + friend TBase; + + TTupleType(ui32 elementsCount, TType** elements, const TTypeEnvironment& env) + : TBase(elementsCount, elements, env) + { + } +}; + +class TMultiType : public TTupleLikeType<TMultiType, TType::EKind::Multi> { +private: + friend class TType; + using TBase = TTupleLikeType<TMultiType, TType::EKind::Multi>; + friend TBase; + + TMultiType(ui32 elementsCount, TType** elements, const TTypeEnvironment& env) + : TBase(elementsCount, elements, env) + { + } +}; + + class TTupleLiteral : public TNode { friend class TNode; public: @@ -1448,6 +1584,17 @@ enum class EValueRepresentation { EValueRepresentation GetValueRepresentation(const TType* type); EValueRepresentation GetValueRepresentation(NUdf::TDataTypeId typeId); +TArrayRef<TType* const> GetWideComponents(const TFlowType* type); +TArrayRef<TType* const> GetWideComponents(const TStreamType* type); + +inline ui32 GetWideComponentsCount(const TFlowType* type) { + return (ui32)GetWideComponents(type).size(); +} + +inline ui32 GetWideComponentsCount(const TStreamType* type) { + return (ui32)GetWideComponents(type).size(); +} + template <TType::EKind SingularKind> TSingularType<SingularKind>* TSingularType<SingularKind>::Create(TTypeType* type, const TTypeEnvironment& env) { return ::new(env.Allocate<TSingularType<SingularKind>>()) TSingularType<SingularKind>(type); @@ -1512,6 +1659,5 @@ template <TType::EKind SingularKind> void TSingular<SingularKind>::DoFreeze(const TTypeEnvironment& env) { Y_UNUSED(env); } - } } diff --git a/ydb/library/yql/minikql/mkql_node_cast.cpp b/ydb/library/yql/minikql/mkql_node_cast.cpp index 84344f2cb4e..25d7aa6e0bd 100644 --- a/ydb/library/yql/minikql/mkql_node_cast.cpp +++ b/ydb/library/yql/minikql/mkql_node_cast.cpp @@ -54,6 +54,7 @@ MKQL_AS_TYPE(Flow) MKQL_AS_TYPE(Tagged) MKQL_AS_TYPE(Block) MKQL_AS_TYPE(Pg) +MKQL_AS_TYPE(Multi) MKQL_AS_VALUE(Any, Type) MKQL_AS_VALUE(Callable, Type) diff --git a/ydb/library/yql/minikql/mkql_node_printer.cpp b/ydb/library/yql/minikql/mkql_node_printer.cpp index 08f13868e05..cd036d1196c 100644 --- a/ydb/library/yql/minikql/mkql_node_printer.cpp +++ b/ydb/library/yql/minikql/mkql_node_printer.cpp @@ -359,9 +359,10 @@ namespace { WriteNewline(); } - void Visit(TTupleType& node) override { + template<typename T> + void VisitTupleLike(T& node, std::string_view name) { WriteIndentation(); - Out << "Type (Tuple) with " << node.GetElementsCount() << " elements {"; + Out << "Type (" << name << ") with " << node.GetElementsCount() << " elements {"; WriteNewline(); { @@ -387,6 +388,14 @@ namespace { WriteNewline(); } + void Visit(TTupleType& node) override { + VisitTupleLike(node, "Tuple"); + } + + void Visit(TMultiType& node) override { + VisitTupleLike(node, "Multi"); + } + void Visit(TResourceType& node) override { Y_UNUSED(node); WriteIndentation(); diff --git a/ydb/library/yql/minikql/mkql_node_serialization.cpp b/ydb/library/yql/minikql/mkql_node_serialization.cpp index 2f72ca37397..8f0fce58245 100644 --- a/ydb/library/yql/minikql/mkql_node_serialization.cpp +++ b/ydb/library/yql/minikql/mkql_node_serialization.cpp @@ -235,6 +235,23 @@ namespace { IsProcessed0 = false; } + void Visit(TMultiType& node) override { + if (node.GetCookie() != 0) { + Owner.WriteReference(node); + IsProcessed0 = true; + return; + } + + Owner.Write(TypeMarker | (char)TType::EKind::Multi); + Owner.WriteVar32(node.GetElementsCount()); + for (ui32 i = node.GetElementsCount(); i-- > 0;) { + auto elementType = node.GetElementType(i); + Owner.AddChildNode(*elementType); + } + + IsProcessed0 = false; + } + void Visit(TTaggedType& node) override { if (node.GetCookie() != 0) { Owner.WriteReference(node); @@ -617,6 +634,10 @@ namespace { Owner.RegisterReference(node); } + void Visit(TMultiType& node) override { + Owner.RegisterReference(node); + } + void Visit(TTaggedType& node) override { auto tag = node.GetTagStr(); Owner.WriteName(tag); @@ -1168,8 +1189,8 @@ namespace { return TryReadCallableType(code); case TType::EKind::Any: return 0; - case TType::EKind::Tuple: - return TryReadTupleType(); + case TType::EKind::Tuple: // and Multi + return TryReadTupleOrMultiType(); case TType::EKind::Resource: return 0; case TType::EKind::Variant: @@ -1206,7 +1227,7 @@ namespace { case TType::EKind::Any: return ReadAnyType(); case TType::EKind::Tuple: - return ReadTupleType(); + return ReadTupleOrMultiType(code); case TType::EKind::Resource: return ReadResourceType(); case TType::EKind::Variant: @@ -1273,7 +1294,7 @@ namespace { return membersCount; } - ui32 TryReadTupleType() { + ui32 TryReadTupleOrMultiType() { const ui32 elementsCount = ReadVar32(); return elementsCount; } @@ -1297,7 +1318,7 @@ namespace { return node; } - TNode* ReadTupleType() { + TNode* ReadTupleOrMultiType(char code) { const ui32 elementsCount = ReadVar32(); TStackVec<TType*> elements(elementsCount); for (ui32 i = 0; i < elementsCount; ++i) { @@ -1308,7 +1329,18 @@ namespace { elements[i] = elementType; } - auto node = TTupleType::Create(elementsCount, elements.data(), Env); + TNode* node = nullptr; + switch ((TType::EKind)(code & TypeMask)) { + case TType::EKind::Tuple: + node = TTupleType::Create(elementsCount, elements.data(), Env); + break; + case TType::EKind::Multi: + node = TMultiType::Create(elementsCount, elements.data(), Env); + break; + default: + ThrowCorrupted(); + } + Nodes.push_back(node); return node; } diff --git a/ydb/library/yql/minikql/mkql_node_visitor.cpp b/ydb/library/yql/minikql/mkql_node_visitor.cpp index cc8a8562f47..80eea7f9374 100644 --- a/ydb/library/yql/minikql/mkql_node_visitor.cpp +++ b/ydb/library/yql/minikql/mkql_node_visitor.cpp @@ -176,6 +176,11 @@ void TThrowingNodeVisitor::Visit(TBlockType& node) { ThrowUnexpectedNodeType(); } +void TThrowingNodeVisitor::Visit(TMultiType& node) { + Y_UNUSED(node); + ThrowUnexpectedNodeType(); +} + void TThrowingNodeVisitor::ThrowUnexpectedNodeType() { THROW yexception() << "Unexpected node type"; } @@ -312,6 +317,10 @@ void TEmptyNodeVisitor::Visit(TBlockType& node) { Y_UNUSED(node); } +void TEmptyNodeVisitor::Visit(TMultiType& node) { + Y_UNUSED(node); +} + void TExploringNodeVisitor::Visit(TTypeType& node) { Y_VERIFY_DEBUG(node.GetType() == &node); } @@ -494,6 +503,13 @@ void TExploringNodeVisitor::Visit(TBlockType& node) { AddChildNode(&node, *node.GetItemType()); } +void TExploringNodeVisitor::Visit(TMultiType& node) { + AddChildNode(&node, *node.GetType()); + for (ui32 i = 0, e = node.GetElementsCount(); i < e; ++i) { + AddChildNode(&node, *node.GetElementType(i)); + } +} + void TExploringNodeVisitor::AddChildNode(TNode* parent, TNode& child) { Stack->push_back(&child); diff --git a/ydb/library/yql/minikql/mkql_node_visitor.h b/ydb/library/yql/minikql/mkql_node_visitor.h index 5c4528b517c..dfd76942da5 100644 --- a/ydb/library/yql/minikql/mkql_node_visitor.h +++ b/ydb/library/yql/minikql/mkql_node_visitor.h @@ -47,6 +47,7 @@ public: virtual void Visit(TFlowType& node) = 0; virtual void Visit(TTaggedType& node) = 0; virtual void Visit(TBlockType& node) = 0; + virtual void Visit(TMultiType& node) = 0; }; class TThrowingNodeVisitor : public INodeVisitor { @@ -84,6 +85,7 @@ public: void Visit(TFlowType& node) override; void Visit(TTaggedType& node) override; void Visit(TBlockType& node) override; + void Visit(TMultiType& node) override; protected: static void ThrowUnexpectedNodeType(); @@ -124,6 +126,7 @@ public: void Visit(TFlowType& node) override; void Visit(TTaggedType& node) override; void Visit(TBlockType& node) override; + void Visit(TMultiType& node) override; }; class TExploringNodeVisitor : public INodeVisitor { @@ -163,6 +166,7 @@ public: void Visit(TFlowType& node) override; void Visit(TTaggedType& node) override; void Visit(TBlockType& node) override; + void Visit(TMultiType& node) override; void Walk(TNode* root, const TTypeEnvironment& env, const std::vector<TNode*>& terminalNodes = std::vector<TNode*>(), bool buildConsumersMap = false, size_t nodesCountHint = 0); diff --git a/ydb/library/yql/minikql/mkql_program_builder.cpp b/ydb/library/yql/minikql/mkql_program_builder.cpp index 67d8f79d9f9..ee264e8a178 100644 --- a/ydb/library/yql/minikql/mkql_program_builder.cpp +++ b/ydb/library/yql/minikql/mkql_program_builder.cpp @@ -227,13 +227,13 @@ bool ReduceOptionalElements(const TType* type, const TArrayRef<const ui32>& test } std::vector<TType*> ValidateBlockFlowType(const TType* flowType) { - const auto* inputTupleType = AS_TYPE(TTupleType, AS_TYPE(TFlowType, flowType)->GetItemType()); - MKQL_ENSURE(inputTupleType->GetElementsCount() > 0, "Expected at least one column"); + const auto wideComponents = GetWideComponents(AS_TYPE(TFlowType, flowType)); + MKQL_ENSURE(wideComponents.size() > 0, "Expected at least one column"); std::vector<TType*> flowItems; - flowItems.reserve(inputTupleType->GetElementsCount()); + flowItems.reserve(wideComponents.size()); bool isScalar; - for (size_t i = 0; i < inputTupleType->GetElementsCount(); ++i) { - auto blockType = AS_TYPE(TBlockType, inputTupleType->GetElementType(i)); + for (size_t i = 0; i < wideComponents.size(); ++i) { + auto blockType = AS_TYPE(TBlockType, wideComponents[i]); isScalar = blockType->GetShape() == TBlockType::EShape::Scalar; auto withoutBlock = blockType->GetItemType(); flowItems.push_back(withoutBlock); @@ -1440,19 +1440,19 @@ TRuntimeNode TProgramBuilder::ToBlocks(TRuntimeNode flow) { } TRuntimeNode TProgramBuilder::WideToBlocks(TRuntimeNode flow) { - TType* outputTupleType; + TType* outputItemType; { - const auto* inputTupleType = AS_TYPE(TTupleType, AS_TYPE(TFlowType, flow.GetStaticType())->GetItemType()); - std::vector<TType*> outputTupleItems; - outputTupleItems.reserve(inputTupleType->GetElementsCount()); - for (size_t i = 0; i < inputTupleType->GetElementsCount(); ++i) { - outputTupleItems.push_back(NewBlockType(inputTupleType->GetElementType(i), TBlockType::EShape::Many)); + const auto wideComponents = GetWideComponents(AS_TYPE(TFlowType, flow.GetStaticType())); + std::vector<TType*> outputItems; + outputItems.reserve(wideComponents.size()); + for (size_t i = 0; i < wideComponents.size(); ++i) { + outputItems.push_back(NewBlockType(wideComponents[i], TBlockType::EShape::Many)); } - outputTupleItems.push_back(NewBlockType(NewDataType(NUdf::TDataType<ui64>::Id), TBlockType::EShape::Scalar)); - outputTupleType = NewTupleType(outputTupleItems); + outputItems.push_back(NewBlockType(NewDataType(NUdf::TDataType<ui64>::Id), TBlockType::EShape::Scalar)); + outputItemType = NewMultiType(outputItems); } - TCallableBuilder callableBuilder(Env, __func__, NewFlowType(outputTupleType)); + TCallableBuilder callableBuilder(Env, __func__, NewFlowType(outputItemType)); callableBuilder.Add(flow); return TRuntimeNode(callableBuilder.Build(), false); } @@ -1467,10 +1467,10 @@ TRuntimeNode TProgramBuilder::FromBlocks(TRuntimeNode flow) { } TRuntimeNode TProgramBuilder::WideFromBlocks(TRuntimeNode flow) { - auto outputTupleItems = ValidateBlockFlowType(flow.GetStaticType()); - outputTupleItems.pop_back(); - TType* outputTupleType = NewTupleType(outputTupleItems); - TCallableBuilder callableBuilder(Env, __func__, NewFlowType(outputTupleType)); + auto outputItems = ValidateBlockFlowType(flow.GetStaticType()); + outputItems.pop_back(); + TType* outputMultiType = NewMultiType(outputItems); + TCallableBuilder callableBuilder(Env, __func__, NewFlowType(outputMultiType)); callableBuilder.Add(flow); return TRuntimeNode(callableBuilder.Build(), false); } @@ -1509,17 +1509,17 @@ TRuntimeNode TProgramBuilder::BlockCompress(TRuntimeNode flow, ui32 bitmapIndex) MKQL_ENSURE(AS_TYPE(TDataType, blockItemTypes[bitmapIndex])->GetSchemeType() == NUdf::TDataType<bool>::Id, "Expected Bool as bitmap column type"); - const auto* inputTupleType = AS_TYPE(TTupleType, AS_TYPE(TFlowType, flow.GetStaticType())->GetItemType()); - MKQL_ENSURE(inputTupleType->GetElementsCount() == blockItemTypes.size(), "Unexpected tuple size"); + const auto wideComponents = GetWideComponents(AS_TYPE(TFlowType, flow.GetStaticType())); + MKQL_ENSURE(wideComponents.size() == blockItemTypes.size(), "Unexpected tuple size"); std::vector<TType*> flowItems; - for (size_t i = 0; i < inputTupleType->GetElementsCount(); ++i) { + for (size_t i = 0; i < wideComponents.size(); ++i) { if (i == bitmapIndex) { continue; } - flowItems.push_back(inputTupleType->GetElementType(i)); + flowItems.push_back(wideComponents[i]); } - TCallableBuilder callableBuilder(Env, __func__, NewFlowType(NewTupleType(flowItems))); + TCallableBuilder callableBuilder(Env, __func__, NewFlowType(NewMultiType(flowItems))); callableBuilder.Add(flow); callableBuilder.Add(NewDataLiteral<ui32>(bitmapIndex)); return TRuntimeNode(callableBuilder.Build(), false); @@ -1699,7 +1699,7 @@ TRuntimeNode TProgramBuilder::BuildWideTopOrSort(const std::string_view& callabl } } - const auto width = AS_TYPE(TTupleType, AS_TYPE(TFlowType, flow.GetStaticType())->GetItemType())->GetElementsCount(); + const auto width = GetWideComponentsCount(AS_TYPE(TFlowType, flow.GetStaticType())); MKQL_ENSURE(!keys.empty() && keys.size() <= width, "Unexpected keys count: " << keys.size()); TCallableBuilder callableBuilder(Env, callableName, flow.GetStaticType()); @@ -2307,6 +2307,20 @@ TRuntimeNode TProgramBuilder::NewTuple(const TArrayRef<const TRuntimeNode>& elem return NewTuple(NewTupleType(types), elements); } +TType* TProgramBuilder::NewEmptyMultiType() { + if (RuntimeVersion > 35) { + return TMultiType::Create(0, nullptr, Env); + } + return Env.GetEmptyTuple()->GetGenericType(); +} + +TType* TProgramBuilder::NewMultiType(const TArrayRef<TType* const>& elements) { + if (RuntimeVersion > 35) { + return TMultiType::Create(elements.size(), elements.data(), Env); + } + return TTupleType::Create(elements.size(), elements.data(), Env); +} + TType* TProgramBuilder::NewResourceType(const std::string_view& tag) { return TResourceType::Create(tag, Env); } @@ -2799,7 +2813,7 @@ TRuntimeNode TProgramBuilder::Source() { THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__; } - TCallableBuilder callableBuilder(Env, __func__, NewFlowType(NewTupleType({}))); + TCallableBuilder callableBuilder(Env, __func__, NewFlowType(NewMultiType({}))); return TRuntimeNode(callableBuilder.Build(), false); } @@ -3402,12 +3416,12 @@ TRuntimeNode TProgramBuilder::NarrowSqueezeToDict(TRuntimeNode flow, bool multi, THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__; } - const auto tupleType = AS_TYPE(TTupleType, AS_TYPE(TFlowType, flow.GetStaticType())->GetItemType()); + const auto wideComponents = GetWideComponents(AS_TYPE(TFlowType, flow.GetStaticType())); TRuntimeNode::TList itemArgs; - itemArgs.reserve(tupleType->GetElementsCount()); + itemArgs.reserve(wideComponents.size()); auto i = 0U; - std::generate_n(std::back_inserter(itemArgs), tupleType->GetElementsCount(), [&](){ return Arg(tupleType->GetElementType(i++)); }); + std::generate_n(std::back_inserter(itemArgs), wideComponents.size(), [&](){ return Arg(wideComponents[i++]); }); const auto key = keySelector(itemArgs); const auto keyType = key.GetStaticType(); @@ -3524,12 +3538,12 @@ TRuntimeNode TProgramBuilder::NarrowMultiMap(TRuntimeNode flow, const TWideLambd THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__; } - const auto tupleType = AS_TYPE(TTupleType, AS_TYPE(TFlowType, flow.GetStaticType())->GetItemType()); + const auto wideComponents = GetWideComponents(AS_TYPE(TFlowType, flow.GetStaticType())); TRuntimeNode::TList itemArgs; - itemArgs.reserve(tupleType->GetElementsCount()); + itemArgs.reserve(wideComponents.size()); auto i = 0U; - std::generate_n(std::back_inserter(itemArgs), tupleType->GetElementsCount(), [&](){ return Arg(tupleType->GetElementType(i++)); }); + std::generate_n(std::back_inserter(itemArgs), wideComponents.size(), [&](){ return Arg(wideComponents[i++]); }); const auto newList = handler(itemArgs); @@ -3557,7 +3571,7 @@ TRuntimeNode TProgramBuilder::ExpandMap(TRuntimeNode flow, const TExpandLambda& tupleItems.reserve(newItems.size()); std::transform(newItems.cbegin(), newItems.cend(), std::back_inserter(tupleItems), std::bind(&TRuntimeNode::GetStaticType, std::placeholders::_1)); - TCallableBuilder callableBuilder(Env, __func__, NewFlowType(NewTupleType(tupleItems))); + TCallableBuilder callableBuilder(Env, __func__, NewFlowType(NewMultiType(tupleItems))); callableBuilder.Add(flow); callableBuilder.Add(itemArg); std::for_each(newItems.cbegin(), newItems.cend(), std::bind(&TCallableBuilder::Add, std::ref(callableBuilder), std::placeholders::_1)); @@ -3569,12 +3583,12 @@ TRuntimeNode TProgramBuilder::WideMap(TRuntimeNode flow, const TWideLambda& hand THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__; } - const auto tupleType = AS_TYPE(TTupleType, AS_TYPE(TFlowType, flow.GetStaticType())->GetItemType()); + const auto wideComponents = GetWideComponents(AS_TYPE(TFlowType, flow.GetStaticType())); TRuntimeNode::TList itemArgs; - itemArgs.reserve(tupleType->GetElementsCount()); + itemArgs.reserve(wideComponents.size()); auto i = 0U; - std::generate_n(std::back_inserter(itemArgs), tupleType->GetElementsCount(), [&](){ return Arg(tupleType->GetElementType(i++)); }); + std::generate_n(std::back_inserter(itemArgs), wideComponents.size(), [&](){ return Arg(wideComponents[i++]); }); const auto newItems = handler(itemArgs); @@ -3582,7 +3596,7 @@ TRuntimeNode TProgramBuilder::WideMap(TRuntimeNode flow, const TWideLambda& hand tupleItems.reserve(newItems.size()); std::transform(newItems.cbegin(), newItems.cend(), std::back_inserter(tupleItems), std::bind(&TRuntimeNode::GetStaticType, std::placeholders::_1)); - TCallableBuilder callableBuilder(Env, __func__, NewFlowType(NewTupleType(tupleItems))); + TCallableBuilder callableBuilder(Env, __func__, NewFlowType(NewMultiType(tupleItems))); callableBuilder.Add(flow); std::for_each(itemArgs.cbegin(), itemArgs.cend(), std::bind(&TCallableBuilder::Add, std::ref(callableBuilder), std::placeholders::_1)); std::for_each(newItems.cbegin(), newItems.cend(), std::bind(&TCallableBuilder::Add, std::ref(callableBuilder), std::placeholders::_1)); @@ -3594,12 +3608,12 @@ TRuntimeNode TProgramBuilder::WideChain1Map(TRuntimeNode flow, const TWideLambda THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__; } - const auto tupleType = AS_TYPE(TTupleType, AS_TYPE(TFlowType, flow.GetStaticType())->GetItemType()); + const auto wideComponents = GetWideComponents(AS_TYPE(TFlowType, flow.GetStaticType())); TRuntimeNode::TList inputArgs; - inputArgs.reserve(tupleType->GetElementsCount()); + inputArgs.reserve(wideComponents.size()); auto i = 0U; - std::generate_n(std::back_inserter(inputArgs), tupleType->GetElementsCount(), [&](){ return Arg(tupleType->GetElementType(i++)); }); + std::generate_n(std::back_inserter(inputArgs), wideComponents.size(), [&](){ return Arg(wideComponents[i++]); }); const auto initItems = init(inputArgs); @@ -3615,7 +3629,7 @@ TRuntimeNode TProgramBuilder::WideChain1Map(TRuntimeNode flow, const TWideLambda MKQL_ENSURE(initItems.size() == updateItems.size(), "Expected same width."); - TCallableBuilder callableBuilder(Env, __func__, NewFlowType(NewTupleType(tupleItems))); + TCallableBuilder callableBuilder(Env, __func__, NewFlowType(NewMultiType(tupleItems))); callableBuilder.Add(flow); std::for_each(inputArgs.cbegin(), inputArgs.cend(), std::bind(&TCallableBuilder::Add, std::ref(callableBuilder), std::placeholders::_1)); std::for_each(initItems.cbegin(), initItems.cend(), std::bind(&TCallableBuilder::Add, std::ref(callableBuilder), std::placeholders::_1)); @@ -3629,12 +3643,12 @@ TRuntimeNode TProgramBuilder::NarrowMap(TRuntimeNode flow, const TNarrowLambda& THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__; } - const auto tupleType = AS_TYPE(TTupleType, AS_TYPE(TFlowType, flow.GetStaticType())->GetItemType()); + const auto wideComponents = GetWideComponents(AS_TYPE(TFlowType, flow.GetStaticType())); TRuntimeNode::TList itemArgs; - itemArgs.reserve(tupleType->GetElementsCount()); + itemArgs.reserve(wideComponents.size()); auto i = 0U; - std::generate_n(std::back_inserter(itemArgs), tupleType->GetElementsCount(), [&](){ return Arg(tupleType->GetElementType(i++)); }); + std::generate_n(std::back_inserter(itemArgs), wideComponents.size(), [&](){ return Arg(wideComponents[i++]); }); const auto newItem = handler(itemArgs); @@ -3650,12 +3664,12 @@ TRuntimeNode TProgramBuilder::NarrowFlatMap(TRuntimeNode flow, const TNarrowLamb THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__; } - const auto tupleType = AS_TYPE(TTupleType, AS_TYPE(TFlowType, flow.GetStaticType())->GetItemType()); + const auto wideComponents = GetWideComponents(AS_TYPE(TFlowType, flow.GetStaticType())); TRuntimeNode::TList itemArgs; - itemArgs.reserve(tupleType->GetElementsCount()); + itemArgs.reserve(wideComponents.size()); auto i = 0U; - std::generate_n(std::back_inserter(itemArgs), tupleType->GetElementsCount(), [&](){ return Arg(tupleType->GetElementType(i++)); }); + std::generate_n(std::back_inserter(itemArgs), wideComponents.size(), [&](){ return Arg(wideComponents[i++]); }); const auto newList = handler(itemArgs); const auto type = newList.GetStaticType(); @@ -3685,12 +3699,12 @@ TRuntimeNode TProgramBuilder::BuildWideFilter(const std::string_view& callableNa THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__; } - const auto tupleType = AS_TYPE(TTupleType, AS_TYPE(TFlowType, flow.GetStaticType())->GetItemType()); + const auto wideComponents = GetWideComponents(AS_TYPE(TFlowType, flow.GetStaticType())); TRuntimeNode::TList itemArgs; - itemArgs.reserve(tupleType->GetElementsCount()); + itemArgs.reserve(wideComponents.size()); auto i = 0U; - std::generate_n(std::back_inserter(itemArgs), tupleType->GetElementsCount(), [&](){ return Arg(tupleType->GetElementType(i++)); }); + std::generate_n(std::back_inserter(itemArgs), wideComponents.size(), [&](){ return Arg(wideComponents[i++]); }); const auto predicate = handler(itemArgs); @@ -3722,12 +3736,12 @@ TRuntimeNode TProgramBuilder::WideSkipWhileInclusive(TRuntimeNode flow, const TN } TRuntimeNode TProgramBuilder::WideFilter(TRuntimeNode flow, TRuntimeNode limit, const TNarrowLambda& handler) { - const auto tupleType = AS_TYPE(TTupleType, AS_TYPE(TFlowType, flow.GetStaticType())->GetItemType()); + const auto wideComponents = GetWideComponents(AS_TYPE(TFlowType, flow.GetStaticType())); TRuntimeNode::TList itemArgs; - itemArgs.reserve(tupleType->GetElementsCount()); + itemArgs.reserve(wideComponents.size()); auto i = 0U; - std::generate_n(std::back_inserter(itemArgs), tupleType->GetElementsCount(), [&](){ return Arg(tupleType->GetElementType(i++)); }); + std::generate_n(std::back_inserter(itemArgs), wideComponents.size(), [&](){ return Arg(wideComponents[i++]); }); const auto predicate = handler(itemArgs); @@ -4543,13 +4557,13 @@ TRuntimeNode TProgramBuilder::WideCombiner(TRuntimeNode flow, ui64 memLimit, con THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__; } - const auto tupleType = AS_TYPE(TTupleType, AS_TYPE(TFlowType, flow.GetStaticType())->GetItemType()); + const auto wideComponents = GetWideComponents(AS_TYPE(TFlowType, flow.GetStaticType())); TRuntimeNode::TList itemArgs; - itemArgs.reserve(tupleType->GetElementsCount()); + itemArgs.reserve(wideComponents.size()); auto i = 0U; - std::generate_n(std::back_inserter(itemArgs), tupleType->GetElementsCount(), [&](){ return Arg(tupleType->GetElementType(i++)); }); + std::generate_n(std::back_inserter(itemArgs), wideComponents.size(), [&](){ return Arg(wideComponents[i++]); }); const auto keys = extractor(itemArgs); @@ -4580,7 +4594,7 @@ TRuntimeNode TProgramBuilder::WideCombiner(TRuntimeNode flow, ui64 memLimit, con tupleItems.reserve(output.size()); std::transform(output.cbegin(), output.cend(), std::back_inserter(tupleItems), std::bind(&TRuntimeNode::GetStaticType, std::placeholders::_1)); - TCallableBuilder callableBuilder(Env, __func__, NewFlowType(NewTupleType(tupleItems))); + TCallableBuilder callableBuilder(Env, __func__, NewFlowType(NewMultiType(tupleItems))); callableBuilder.Add(flow); callableBuilder.Add(NewDataLiteral(memLimit)); callableBuilder.Add(NewDataLiteral(ui32(keyArgs.size()))); @@ -4602,13 +4616,13 @@ TRuntimeNode TProgramBuilder::WideLastCombiner(TRuntimeNode flow, const TWideLam THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__; } - const auto tupleType = AS_TYPE(TTupleType, AS_TYPE(TFlowType, flow.GetStaticType())->GetItemType()); + const auto wideComponents = GetWideComponents(AS_TYPE(TFlowType, flow.GetStaticType())); TRuntimeNode::TList itemArgs; - itemArgs.reserve(tupleType->GetElementsCount()); + itemArgs.reserve(wideComponents.size()); auto i = 0U; - std::generate_n(std::back_inserter(itemArgs), tupleType->GetElementsCount(), [&](){ return Arg(tupleType->GetElementType(i++)); }); + std::generate_n(std::back_inserter(itemArgs), wideComponents.size(), [&](){ return Arg(wideComponents[i++]); }); const auto keys = extractor(itemArgs); @@ -4639,7 +4653,7 @@ TRuntimeNode TProgramBuilder::WideLastCombiner(TRuntimeNode flow, const TWideLam tupleItems.reserve(output.size()); std::transform(output.cbegin(), output.cend(), std::back_inserter(tupleItems), std::bind(&TRuntimeNode::GetStaticType, std::placeholders::_1)); - TCallableBuilder callableBuilder(Env, __func__, NewFlowType(NewTupleType(tupleItems))); + TCallableBuilder callableBuilder(Env, __func__, NewFlowType(NewMultiType(tupleItems))); callableBuilder.Add(flow); callableBuilder.Add(NewDataLiteral(ui32(keyArgs.size()))); callableBuilder.Add(NewDataLiteral(ui32(stateArgs.size()))); @@ -4660,13 +4674,13 @@ TRuntimeNode TProgramBuilder::WideCondense1(TRuntimeNode flow, const TWideLambda THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__; } - const auto tupleType = AS_TYPE(TTupleType, AS_TYPE(TFlowType, flow.GetStaticType())->GetItemType()); + const auto wideComponents = GetWideComponents(AS_TYPE(TFlowType, flow.GetStaticType())); TRuntimeNode::TList itemArgs; - itemArgs.reserve(tupleType->GetElementsCount()); + itemArgs.reserve(wideComponents.size()); auto i = 0U; - std::generate_n(std::back_inserter(itemArgs), tupleType->GetElementsCount(), [&](){ return Arg(tupleType->GetElementType(i++)); }); + std::generate_n(std::back_inserter(itemArgs), wideComponents.size(), [&](){ return Arg(wideComponents[i++]); }); const auto first = init(itemArgs); @@ -4683,7 +4697,7 @@ TRuntimeNode TProgramBuilder::WideCondense1(TRuntimeNode flow, const TWideLambda tupleItems.reserve(next.size()); std::transform(next.cbegin(), next.cend(), std::back_inserter(tupleItems), std::bind(&TRuntimeNode::GetStaticType, std::placeholders::_1)); - TCallableBuilder callableBuilder(Env, __func__, NewFlowType(NewTupleType(tupleItems))); + TCallableBuilder callableBuilder(Env, __func__, NewFlowType(NewMultiType(tupleItems))); callableBuilder.Add(flow); std::for_each(itemArgs.cbegin(), itemArgs.cend(), std::bind(&TCallableBuilder::Add, std::ref(callableBuilder), std::placeholders::_1)); std::for_each(first.cbegin(), first.cend(), std::bind(&TCallableBuilder::Add, std::ref(callableBuilder), std::placeholders::_1)); @@ -4845,13 +4859,13 @@ TRuntimeNode TProgramBuilder::WideChopper(TRuntimeNode flow, const TWideLambda& THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__; } - const auto tupleType = AS_TYPE(TTupleType, AS_TYPE(TFlowType, flow.GetStaticType())->GetItemType()); + const auto wideComponents = GetWideComponents(AS_TYPE(TFlowType, flow.GetStaticType())); TRuntimeNode::TList itemArgs, keyArgs; - itemArgs.reserve(tupleType->GetElementsCount()); + itemArgs.reserve(wideComponents.size()); auto i = 0U; - std::generate_n(std::back_inserter(itemArgs), tupleType->GetElementsCount(), [&](){ return Arg(tupleType->GetElementType(i++)); }); + std::generate_n(std::back_inserter(itemArgs), wideComponents.size(), [&](){ return Arg(wideComponents[i++]); }); const auto keys = extractor(itemArgs); diff --git a/ydb/library/yql/minikql/mkql_program_builder.h b/ydb/library/yql/minikql/mkql_program_builder.h index d20e1adb1cc..5c6348bd2b2 100644 --- a/ydb/library/yql/minikql/mkql_program_builder.h +++ b/ydb/library/yql/minikql/mkql_program_builder.h @@ -194,6 +194,10 @@ public: TRuntimeNode NewEmptyTuple(); TRuntimeNode NewTuple(TType* tupleType, const TArrayRef<const TRuntimeNode>& elements); TRuntimeNode NewTuple(const TArrayRef<const TRuntimeNode>& elements); + + TType* NewEmptyMultiType(); + TType* NewMultiType(const TArrayRef<TType* const>& elements); + TType* NewResourceType(const std::string_view& tag); TType* NewVariantType(TType* underlyingType); TRuntimeNode NewVariant(TRuntimeNode item, ui32 tupleIndex, TType* variantType); diff --git a/ydb/library/yql/providers/common/mkql/yql_type_mkql.cpp b/ydb/library/yql/providers/common/mkql/yql_type_mkql.cpp index 5a2123e13fc..b86b9fba726 100644 --- a/ydb/library/yql/providers/common/mkql/yql_type_mkql.cpp +++ b/ydb/library/yql/providers/common/mkql/yql_type_mkql.cpp @@ -92,7 +92,7 @@ NKikimr::NMiniKQL::TType* BuildType(const TTypeAnnotationNode& annotation, NKiki return nullptr; } } - return pgmBuilder.NewTupleType(elements); + return pgmBuilder.NewMultiType(elements); } case ETypeAnnotationKind::Dict: { @@ -330,6 +330,7 @@ const TTypeAnnotationNode* ConvertMiniKQLType(TPosition position, NKikimr::NMini } case TType::EKind::Any: + case TType::EKind::ReservedKind: YQL_ENSURE(false, "Not supported"); break; @@ -393,9 +394,36 @@ const TTypeAnnotationNode* ConvertMiniKQLType(TPosition position, NKikimr::NMini } } - default: - YQL_ENSURE(false, "Unknown kind"); + case TType::EKind::Flow: + { + auto flowType = static_cast<TFlowType*>(type); + auto itemType = ConvertMiniKQLType(position, flowType->GetItemType(), ctx); + return ctx.MakeType<TFlowExprType>(itemType); + } + + case TType::EKind::Pg: + { + auto pgType = static_cast<TPgType*>(type); + return ctx.MakeType<TPgExprType>(pgType->GetTypeId()); } + + case TType::EKind::Multi: + { + TTypeAnnotationNode::TListType elements; + auto multiType = static_cast<TMultiType*>(type); + for (ui32 index = 0; index < multiType->GetElementsCount(); ++index) { + auto elementType = ConvertMiniKQLType(position, multiType->GetElementType(index), ctx); + elements.push_back(elementType); + } + + auto res = ctx.MakeType<TMultiExprType>(elements); + YQL_ENSURE(res->Validate(position, ctx)); + return res; + } + + } + + YQL_ENSURE(false, "Unknown kind"); } } // namespace NCommon |