diff options
author | ziganshinmr <[email protected]> | 2025-03-31 15:50:04 +0300 |
---|---|---|
committer | ziganshinmr <[email protected]> | 2025-03-31 16:08:18 +0300 |
commit | 30647eeae1a5f0019fd1b41b2a2835221e49ed46 (patch) | |
tree | 6890b601da77233fe0af248b034a635fef338a72 | |
parent | 06c199b566aca570faf8661b96ea873a229f923a (diff) |
YT block table content refactor
* Migrate YtBlockTableContent node to block struct type
* Use ListFromBlock to resolve problems with multipage
* Fix problems with Arrow decoder related to multiple files read
commit_hash:4a2f5dd57c1359b17a9161ccb5c88754125609cb
27 files changed, 280 insertions, 143 deletions
diff --git a/yql/essentials/core/expr_nodes/yql_expr_nodes.json b/yql/essentials/core/expr_nodes/yql_expr_nodes.json index 7eb01b6114c..c051517eee9 100644 --- a/yql/essentials/core/expr_nodes/yql_expr_nodes.json +++ b/yql/essentials/core/expr_nodes/yql_expr_nodes.json @@ -2557,6 +2557,11 @@ "Match": {"Type": "Callable", "Name": "WideFromBlocks"} }, { + "Name": "TCoListFromBlocks", + "Base": "TCoInputBase", + "Match": {"Type": "Callable", "Name": "ListFromBlocks"} + }, + { "Name": "TCoReplicateScalars", "Base": "TCoInputBase", "Match": {"Type": "Callable", "Name": "ReplicateScalars"}, @@ -2570,6 +2575,11 @@ "Match": {"Type": "Callable", "Name": "WideToBlocks"} }, { + "Name": "TCoListToBlocks", + "Base": "TCoInputBase", + "Match": {"Type": "Callable", "Name": "ListToBlocks"} + }, + { "Name": "TCoPgSelect", "Base": "TCallable", "Match": {"Type": "Callable", "Name": "PgSelect"}, diff --git a/yql/essentials/core/type_ann/type_ann_blocks.cpp b/yql/essentials/core/type_ann/type_ann_blocks.cpp index c674d61ed5c..20f09c46961 100644 --- a/yql/essentials/core/type_ann/type_ann_blocks.cpp +++ b/yql/essentials/core/type_ann/type_ann_blocks.cpp @@ -906,6 +906,43 @@ IGraphTransformer::TStatus WideToBlocksWrapper(const TExprNode::TPtr& input, TEx return IGraphTransformer::TStatus::Ok; } +IGraphTransformer::TStatus ListToBlocksWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx) { + Y_UNUSED(output); + if (!EnsureArgsCount(*input, 1U, ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + + if (!EnsureListType(input->Head(), ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + + const auto listItemType = input->Head().GetTypeAnn()->Cast<TListExprType>()->GetItemType(); + if (!EnsureStructType(input->Head().Pos(), *listItemType, ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + const auto structType = listItemType->Cast<TStructExprType>(); + + TVector<const TItemExprType*> outputStructItems; + for (auto item : structType->GetItems()) { + auto itemType = item->GetItemType(); + if (itemType->IsBlockOrScalar()) { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), "Input type should not be a block or scalar")); + return IGraphTransformer::TStatus::Error; + } + + if (!EnsureSupportedAsBlockType(input->Pos(), *itemType, ctx.Expr, ctx.Types)) { + return IGraphTransformer::TStatus::Error; + } + + outputStructItems.push_back(ctx.Expr.MakeType<TItemExprType>(item->GetName(), ctx.Expr.MakeType<TBlockExprType>(itemType))); + } + outputStructItems.push_back(ctx.Expr.MakeType<TItemExprType>(BlockLengthColumnName, ctx.Expr.MakeType<TScalarExprType>(ctx.Expr.MakeType<TDataExprType>(EDataSlot::Uint64)))); + + auto outputStructType = ctx.Expr.MakeType<TStructExprType>(outputStructItems); + input->SetTypeAnn(ctx.Expr.MakeType<TListExprType>(outputStructType)); + return IGraphTransformer::TStatus::Ok; +} + IGraphTransformer::TStatus WideFromBlocksWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) { Y_UNUSED(output); if (!EnsureArgsCount(*input, 1U, ctx.Expr)) { @@ -924,6 +961,22 @@ IGraphTransformer::TStatus WideFromBlocksWrapper(const TExprNode::TPtr& input, T return IGraphTransformer::TStatus::Ok; } +IGraphTransformer::TStatus ListFromBlocksWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) { + Y_UNUSED(output); + if (!EnsureArgsCount(*input, 1U, ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + + TVector<const TItemExprType*> outputStructItems; + if (!EnsureBlockListType(input->Head(), outputStructItems, ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + + auto outputStructType = ctx.Expr.MakeType<TStructExprType>(outputStructItems); + input->SetTypeAnn(ctx.Expr.MakeType<TListExprType>(outputStructType)); + return IGraphTransformer::TStatus::Ok; +} + IGraphTransformer::TStatus WideSkipTakeBlocksWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) { if (!EnsureArgsCount(*input, 2U, ctx.Expr)) { return IGraphTransformer::TStatus::Error; diff --git a/yql/essentials/core/type_ann/type_ann_blocks.h b/yql/essentials/core/type_ann/type_ann_blocks.h index 8a16376f9b7..223758ef7fc 100644 --- a/yql/essentials/core/type_ann/type_ann_blocks.h +++ b/yql/essentials/core/type_ann/type_ann_blocks.h @@ -28,7 +28,9 @@ namespace NTypeAnnImpl { IGraphTransformer::TStatus BlockCombineHashedWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx); IGraphTransformer::TStatus BlockMergeFinalizeHashedWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx); IGraphTransformer::TStatus WideToBlocksWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx); + IGraphTransformer::TStatus ListToBlocksWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx); IGraphTransformer::TStatus WideFromBlocksWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx); + IGraphTransformer::TStatus ListFromBlocksWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx); IGraphTransformer::TStatus WideSkipTakeBlocksWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx); IGraphTransformer::TStatus WideTopBlocksWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx); IGraphTransformer::TStatus WideSortBlocksWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx); diff --git a/yql/essentials/core/type_ann/type_ann_core.cpp b/yql/essentials/core/type_ann/type_ann_core.cpp index 75e07ab3a72..cd6bd71608b 100644 --- a/yql/essentials/core/type_ann/type_ann_core.cpp +++ b/yql/essentials/core/type_ann/type_ann_core.cpp @@ -12983,6 +12983,7 @@ template <NKikimr::NUdf::EDataSlot DataSlot> Functions["NarrowMultiMap"] = &NarrowMultiMapWrapper; Functions["WideFromBlocks"] = &WideFromBlocksWrapper; + Functions["ListFromBlocks"] = &ListFromBlocksWrapper; Functions["WideSkipBlocks"] = &WideSkipTakeBlocksWrapper; Functions["WideTakeBlocks"] = &WideSkipTakeBlocksWrapper; Functions["BlockCompress"] = &BlockCompressWrapper; @@ -13018,6 +13019,7 @@ template <NKikimr::NUdf::EDataSlot DataSlot> ExtFunctions["AsScalar"] = &AsScalarWrapper; ExtFunctions["WideToBlocks"] = &WideToBlocksWrapper; + ExtFunctions["ListToBlocks"] = &ListToBlocksWrapper; ExtFunctions["BlockCombineAll"] = &BlockCombineAllWrapper; ExtFunctions["BlockCombineHashed"] = &BlockCombineHashedWrapper; ExtFunctions["BlockMergeFinalizeHashed"] = &BlockMergeFinalizeHashedWrapper; diff --git a/yql/essentials/core/yql_expr_constraint.cpp b/yql/essentials/core/yql_expr_constraint.cpp index 32cba8f9d28..fdfc21946be 100644 --- a/yql/essentials/core/yql_expr_constraint.cpp +++ b/yql/essentials/core/yql_expr_constraint.cpp @@ -245,7 +245,9 @@ public: Functions["WideTopSortBlocks"] = &TCallableConstraintTransformer::WideTopWrap<true>; Functions["WideSortBlocks"] = &TCallableConstraintTransformer::WideTopWrap<true>; Functions["WideToBlocks"] = &TCallableConstraintTransformer::CopyAllFrom<0>; + Functions["ListToBlocks"] = &TCallableConstraintTransformer::CopyAllFrom<0>; Functions["WideFromBlocks"] = &TCallableConstraintTransformer::CopyAllFrom<0>; + Functions["ListFromBlocks"] = &TCallableConstraintTransformer::CopyAllFrom<0>; Functions["ReplicateScalars"] = &TCallableConstraintTransformer::CopyAllFrom<0>; Functions["BlockMergeFinalizeHashed"] = &TCallableConstraintTransformer::AggregateWrap<true>; Functions["BlockMergeManyFinalizeHashed"] = &TCallableConstraintTransformer::AggregateWrap<true>; diff --git a/yql/essentials/core/yql_expr_type_annotation.cpp b/yql/essentials/core/yql_expr_type_annotation.cpp index 80207d35bc2..16e29a60c8e 100644 --- a/yql/essentials/core/yql_expr_type_annotation.cpp +++ b/yql/essentials/core/yql_expr_type_annotation.cpp @@ -10,6 +10,7 @@ #include <yql/essentials/minikql/dom/json.h> #include <yql/essentials/minikql/dom/yson.h> #include <yql/essentials/minikql/jsonpath/parser/parser.h> +#include <yql/essentials/core/sql_types/block.h> #include <yql/essentials/core/sql_types/simple_types.h> #include "yql/essentials/parser/pg_catalog/catalog.h" #include <yql/essentials/parser/pg_wrapper/interface/utils.h> @@ -3269,6 +3270,52 @@ bool EnsureWideBlockType(TPositionHandle position, const TTypeAnnotationNode& ty return true; } +bool EnsureBlockStructType(TPositionHandle position, const TTypeAnnotationNode& type, TVector<const TItemExprType*>& structItems, TExprContext& ctx) { + if (HasError(&type, ctx)) { + return false; + } + + if (type.GetKind() != ETypeAnnotationKind::Struct) { + ctx.AddError(TIssue(ctx.GetPosition(position), TStringBuilder() << "Expected struct, but got: " << type)); + return false; + } + + auto& items = type.Cast<TStructExprType>()->GetItems(); + if (items.empty()) { + ctx.AddError(TIssue(ctx.GetPosition(position), "Expected at least one column")); + return false; + } + + bool hasBlockLengthColumn = false; + for (auto item : items) { + auto blockType = item->GetItemType(); + if (!EnsureBlockOrScalarType(position, *blockType, ctx)) { + return false; + } + + bool isScalar = false; + auto itemType = GetBlockItemType(*blockType, isScalar); + + if (item->GetName() == BlockLengthColumnName) { + if (!isScalar) { + ctx.AddError(TIssue(ctx.GetPosition(position), "Block length column should be a scalar")); + return false; + } + if (!EnsureSpecificDataType(position, *itemType, EDataSlot::Uint64, ctx)) { + return false; + } + hasBlockLengthColumn = true; + } else { + structItems.push_back(ctx.MakeType<TItemExprType>(item->GetName(), itemType)); + } + } + if (!hasBlockLengthColumn) { + ctx.AddError(TIssue(ctx.GetPosition(position), "Block struct must contain block length column")); + return false; + } + return true; +} + bool EnsureWideFlowBlockType(const TExprNode& node, TTypeAnnotationNode::TListType& blockItemTypes, TExprContext& ctx, bool allowScalar) { if (!EnsureWideFlowType(node, ctx)) { return false; @@ -3285,6 +3332,14 @@ bool EnsureWideStreamBlockType(const TExprNode& node, TTypeAnnotationNode::TList return EnsureWideBlockType(node.Pos(), *node.GetTypeAnn()->Cast<TStreamExprType>()->GetItemType(), blockItemTypes, ctx, allowScalar); } +bool EnsureBlockListType(const TExprNode& node, TVector<const TItemExprType*>& structItems, TExprContext& ctx) { + if (!EnsureListType(node, ctx)) { + return false; + } + + return EnsureBlockStructType(node.Pos(), *node.GetTypeAnn()->Cast<TListExprType>()->GetItemType(), structItems, ctx); +} + bool EnsureOptionalType(const TExprNode& node, TExprContext& ctx) { if (!node.GetTypeAnn()) { YQL_ENSURE(node.Type() == TExprNode::Lambda); diff --git a/yql/essentials/core/yql_expr_type_annotation.h b/yql/essentials/core/yql_expr_type_annotation.h index b429f46f394..8538c45ab18 100644 --- a/yql/essentials/core/yql_expr_type_annotation.h +++ b/yql/essentials/core/yql_expr_type_annotation.h @@ -134,8 +134,10 @@ bool IsWideSequenceBlockType(const TTypeAnnotationNode& type); bool IsSupportedAsBlockType(TPositionHandle pos, const TTypeAnnotationNode& type, TExprContext& ctx, TTypeAnnotationContext& types, bool reportUnspported = false); bool EnsureSupportedAsBlockType(TPositionHandle pos, const TTypeAnnotationNode& type, TExprContext& ctx, TTypeAnnotationContext& types); bool EnsureWideBlockType(TPositionHandle position, const TTypeAnnotationNode& type, TTypeAnnotationNode::TListType& blockItemTypes, TExprContext& ctx, bool allowScalar = true); +bool EnsureBlockStructType(TPositionHandle position, const TTypeAnnotationNode& type, TVector<const TItemExprType*>& structItems, TExprContext& ctx); bool EnsureWideFlowBlockType(const TExprNode& node, TTypeAnnotationNode::TListType& blockItemTypes, TExprContext& ctx, bool allowScalar = true); bool EnsureWideStreamBlockType(const TExprNode& node, TTypeAnnotationNode::TListType& blockItemTypes, TExprContext& ctx, bool allowScalar = true); +bool EnsureBlockListType(const TExprNode& node, TVector<const TItemExprType*>& structItems, TExprContext& ctx); bool EnsureOptionalType(const TExprNode& node, TExprContext& ctx); bool EnsureOptionalType(TPositionHandle position, const TTypeAnnotationNode& type, TExprContext& ctx); bool EnsureType(const TExprNode& node, TExprContext& ctx); diff --git a/yql/essentials/core/yql_type_annotation.cpp b/yql/essentials/core/yql_type_annotation.cpp index 8934949d3fe..6de6ecf8bdf 100644 --- a/yql/essentials/core/yql_type_annotation.cpp +++ b/yql/essentials/core/yql_type_annotation.cpp @@ -303,7 +303,7 @@ IGraphTransformer::TStatus TTypeAnnotationContext::SetColumnOrder(const TExprNod allColumns.erase(it); } - if (!allColumns.empty()) { + if (!allColumns.empty() && !(allColumns.size() == 1 && *allColumns.begin() == BlockLengthColumnName)) { ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder() << "Some columns are left unordered with column order " << FormatColumnOrder(columnOrder) << " for node " << node.Content() << " with type: " << *node.GetTypeAnn())); diff --git a/yql/essentials/minikql/mkql_type_builder.cpp b/yql/essentials/minikql/mkql_type_builder.cpp index c9d6e363b4f..41502b144ae 100644 --- a/yql/essentials/minikql/mkql_type_builder.cpp +++ b/yql/essentials/minikql/mkql_type_builder.cpp @@ -2820,7 +2820,6 @@ TType* TTypeBuilder::ValidateBlockStructType(const TStructType* structType) cons MKQL_ENSURE(isScalar, "Block length column should be scalar"); MKQL_ENSURE(AS_TYPE(TDataType, itemType)->GetSchemeType() == NUdf::TDataType<ui64>::Id, "Expected Uint64"); - MKQL_ENSURE(!hasBlockLengthColumn, "Block struct must contain only one block length column"); hasBlockLengthColumn = true; } else { outStructItems.emplace_back(structType->GetMemberName(i), itemType); diff --git a/yql/essentials/providers/common/mkql/yql_provider_mkql.cpp b/yql/essentials/providers/common/mkql/yql_provider_mkql.cpp index 983bf855424..abe2f9c1e9a 100644 --- a/yql/essentials/providers/common/mkql/yql_provider_mkql.cpp +++ b/yql/essentials/providers/common/mkql/yql_provider_mkql.cpp @@ -448,7 +448,9 @@ TMkqlCommonCallableCompiler::TShared::TShared() { {"FromFlow", &TProgramBuilder::FromFlow}, {"WideToBlocks", &TProgramBuilder::WideToBlocks}, + {"ListToBlocks", &TProgramBuilder::ListToBlocks}, {"WideFromBlocks", &TProgramBuilder::WideFromBlocks}, + {"ListFromBlocks", &TProgramBuilder::ListFromBlocks}, {"AsScalar", &TProgramBuilder::AsScalar}, {"Just", &TProgramBuilder::NewOptional}, diff --git a/yt/yql/providers/yt/codec/yt_codec.cpp b/yt/yql/providers/yt/codec/yt_codec.cpp index 6e64136937f..14e317df97f 100644 --- a/yt/yql/providers/yt/codec/yt_codec.cpp +++ b/yt/yql/providers/yt/codec/yt_codec.cpp @@ -305,6 +305,13 @@ void TMkqlIOSpecs::InitDecoder(NCommon::TCodecContext& codecCtx, } } + if (InputBlockRepresentation_ == EBlockRepresentation::BlockStruct) { + if (auto pos = rowType->FindMemberIndex(BlockLengthColumnName)) { + virtualColumns.insert(*pos); + decoder.FillBlockStructSize = pos; + } + } + THashSet<ui32> usedPos; for (ui32 index = 0; index < rowType->GetMembersCount(); ++index) { auto name = rowType->GetMemberNameStr(index); @@ -444,6 +451,7 @@ void TMkqlIOSpecs::InitInput(NCommon::TCodecContext& codecCtx, TSpecInfo localSpecInfo; TSpecInfo* specInfo = &localSpecInfo; TString decoderRefName = TStringBuilder() << "_internal" << inputIndex; + bool newSpec = false; if (inputSpecs[inputIndex].IsString()) { auto refName = inputSpecs[inputIndex].AsString(); decoderRefName = refName; @@ -453,9 +461,14 @@ void TMkqlIOSpecs::InitInput(NCommon::TCodecContext& codecCtx, Y_ENSURE(inAttrs.HasKey(YqlIOSpecRegistry) && inAttrs[YqlIOSpecRegistry].HasKey(refName), "Bad input registry reference: " << refName); specInfo = &specInfoRegistry[refName]; LoadSpecInfo(true, inAttrs[YqlIOSpecRegistry][refName], codecCtx, *specInfo); + newSpec = true; } } else { LoadSpecInfo(true, inputSpecs[inputIndex], codecCtx, localSpecInfo); + newSpec = true; + } + if (InputBlockRepresentation_ == EBlockRepresentation::BlockStruct && newSpec) { + specInfo->Type = codecCtx.Builder.NewStructType(specInfo->Type, BlockLengthColumnName, TDataType::Create(NUdf::TDataType<ui64>::Id, codecCtx.Env)); } TStructType* inStruct = AS_TYPE(TStructType, specInfo->Type); diff --git a/yt/yql/providers/yt/codec/yt_codec.h b/yt/yql/providers/yt/codec/yt_codec.h index 4e6ef543a5f..ed309564799 100644 --- a/yt/yql/providers/yt/codec/yt_codec.h +++ b/yt/yql/providers/yt/codec/yt_codec.h @@ -31,6 +31,12 @@ public: Y_DECLARE_FLAGS(TSystemFields, ESystemField); + enum class EBlockRepresentation { + None, + WideBlock, + BlockStruct, + }; + struct TSpecInfo { NKikimr::NMiniKQL::TType* Type = nullptr; bool StrictSchema = true; @@ -65,6 +71,7 @@ public: TMaybe<ui32> FillSysColumnIndex; TMaybe<ui32> FillSysColumnNum; TMaybe<ui32> FillSysColumnKeySwitch; + TMaybe<ui32> FillBlockStructSize; }; struct TEncoderSpec { @@ -137,6 +144,10 @@ public: IsTableContent_ = true; } + void SetInputBlockRepresentation(EBlockRepresentation type) { + InputBlockRepresentation_ = type; + } + void SetTableOffsets(const TVector<ui64>& offsets); void Clear(); @@ -156,6 +167,8 @@ public: TString OptLLVM_; TSystemFields SystemFields_; + EBlockRepresentation InputBlockRepresentation_ = EBlockRepresentation::None; + NKikimr::NMiniKQL::IStatsRegistry* JobStats_ = nullptr; THashMap<TString, TDecoderSpec> Decoders; TVector<const TDecoderSpec*> Inputs; diff --git a/yt/yql/providers/yt/codec/yt_codec_io.cpp b/yt/yql/providers/yt/codec/yt_codec_io.cpp index 0a6b31f4e77..a46ecc0f680 100644 --- a/yt/yql/providers/yt/codec/yt_codec_io.cpp +++ b/yt/yql/providers/yt/codec/yt_codec_io.cpp @@ -651,7 +651,7 @@ struct TMkqlReaderImpl::TDecoder { KeySwitch_ = false; } - void Reset(bool hasRangeIndices, ui32 tableIndex, bool ignoreStreamTableIndex) { + virtual void Reset(bool hasRangeIndices, ui32 tableIndex, bool ignoreStreamTableIndex) { HasRangeIndices_ = hasRangeIndices; TableIndex_ = tableIndex; AtStart_ = true; @@ -1463,7 +1463,7 @@ public: , Pool_(pool) { InputStream_ = std::make_unique<TInputBufArrowInputStream>(buf, pool); - ResetColumnConverters(); + HandleTableSwitch(); HandlesSysColumns_ = true; } @@ -1482,14 +1482,19 @@ public: YQL_ENSURE(!Chunks_.empty()); } + bool isWideBlock = (Specs_.InputBlockRepresentation_ == TMkqlIOSpecs::EBlockRepresentation::WideBlock); + auto& decoder = *Specs_.Inputs[TableIndex_]; - Row_ = SpecsCache_.NewRow(TableIndex_, items, true); + Row_ = SpecsCache_.NewRow(TableIndex_, items, isWideBlock); auto& [chunkRowIndex, chunkLen, chunk] = Chunks_.front(); for (size_t i = 0; i < decoder.StructSize; i++) { + if (i == decoder.FillBlockStructSize) { + continue; + } items[i] = SpecsCache_.GetHolderFactory().CreateArrowBlock(std::move(chunk[i])); } - items[decoder.StructSize] = SpecsCache_.GetHolderFactory().CreateArrowBlock(arrow::Datum(static_cast<uint64_t>(chunkLen))); + items[BlockSizeStructIndex_] = SpecsCache_.GetHolderFactory().CreateArrowBlock(arrow::Datum(static_cast<uint64_t>(chunkLen))); RowIndex_ = chunkRowIndex; Chunks_.pop_front(); @@ -1505,17 +1510,17 @@ public: } StreamReader_ = ARROW_RESULT(streamReaderResult); - auto oldTableIndex = TableIndex_; if (!IgnoreStreamTableIndex) { + auto oldTableIndex = TableIndex_; auto tableIdKey = StreamReader_->schema()->metadata()->Get("TableId"); if (tableIdKey.ok()) { TableIndex_ = std::stoi(tableIdKey.ValueOrDie()); YQL_ENSURE(TableIndex_ < Specs_.Inputs.size()); } - } - if (TableIndex_ != oldTableIndex) { - ResetColumnConverters(); + if (TableIndex_ != oldTableIndex) { + HandleTableSwitch(); + } } } @@ -1523,6 +1528,8 @@ public: ARROW_OK(StreamReader_->ReadNext(&batch)); if (!batch) { if (InputStream_->EOSReached()) { + // Prepare for possible table switch + StreamReader_.reset(); return false; } @@ -1565,6 +1572,9 @@ public: } } else if (decoder.FillSysColumnIndex == inputFields[i].StructIndex) { convertedColumn = ARROW_RESULT(arrow::MakeArrayFromScalar(arrow::UInt32Scalar(TableIndex_), batch->num_rows())); + } else if (decoder.FillBlockStructSize == inputFields[i].StructIndex) { + // Actual value will be specified later + convertedColumn = arrow::Datum(static_cast<uint64_t>(0)); } else if (inputFields[i].StructIndex == Max<ui32>()) { // Input field won't appear in the result continue; @@ -1593,14 +1603,22 @@ public: return true; } - void ResetColumnConverters() { - auto& fields = Specs_.Inputs[TableIndex_]->FieldsVec; + void HandleTableSwitch() { + auto& decoder = Specs_.Inputs[TableIndex_]; + ColumnConverters_.clear(); - ColumnConverters_.reserve(fields.size()); - for (auto& field: fields) { + ColumnConverters_.reserve(decoder->FieldsVec.size()); + for (auto& field: decoder->FieldsVec) { YQL_ENSURE(!field.Type->IsPg()); ColumnConverters_.emplace_back(MakeYtColumnConverter(field.Type, nullptr, *Pool_, Specs_.Inputs[TableIndex_]->NativeYtTypeFlags)); } + + BlockSizeStructIndex_ = GetBlockSizeStructIndex(Specs_, TableIndex_); + } + + void Reset(bool hasRangeIndices, ui32 tableIndex, bool ignoreStreamTableIndex) override { + TDecoder::Reset(hasRangeIndices, tableIndex, ignoreStreamTableIndex); + HandleTableSwitch(); } private: @@ -1610,6 +1628,8 @@ private: TDeque<std::tuple<ui64, ui64, std::vector<arrow::Datum>>> Chunks_; + size_t BlockSizeStructIndex_ = 0; + const TMkqlIOSpecs& Specs_; arrow::MemoryPool* Pool_; }; @@ -2517,6 +2537,27 @@ void DecodeToYson(TMkqlIOCache& specsCache, size_t tableIndex, const NUdf::TUnbo WriteRowItems(specsCache, tableIndex, items, {}, ysonOut); } +ui32 GetBlockSizeStructIndex(const TMkqlIOSpecs& specs, size_t tableIndex) { + auto& decoder = specs.Inputs[tableIndex]; + + ui32 blockSizeStructIndex = 0; + switch (specs.InputBlockRepresentation_) { + case TMkqlIOSpecs::EBlockRepresentation::WideBlock: + blockSizeStructIndex = decoder->StructSize; + break; + + case TMkqlIOSpecs::EBlockRepresentation::BlockStruct: + YQL_ENSURE(decoder->FillBlockStructSize.Defined()); + blockSizeStructIndex = *decoder->FillBlockStructSize; + break; + + default: + YQL_ENSURE(false, "unknown block representation"); + } + + return blockSizeStructIndex; +} + ////////////////////////////////////////////////////////////////////////////////////////////////////////// } // NYql diff --git a/yt/yql/providers/yt/codec/yt_codec_io.h b/yt/yql/providers/yt/codec/yt_codec_io.h index 47f8d098635..3a8c4212954 100644 --- a/yt/yql/providers/yt/codec/yt_codec_io.h +++ b/yt/yql/providers/yt/codec/yt_codec_io.h @@ -164,4 +164,6 @@ void DecodeToYson(TMkqlIOCache& specsCache, size_t tableIndex, const NKikimr::NU THolder<NCommon::IBlockReader> MakeBlockReader(NYT::TRawTableReader& source, size_t blockCount, size_t blockSize); +ui32 GetBlockSizeStructIndex(const TMkqlIOSpecs& specs, size_t tableIndex); + } // NYql diff --git a/yt/yql/providers/yt/comp_nodes/yql_mkql_block_table_content.cpp b/yt/yql/providers/yt/comp_nodes/yql_mkql_block_table_content.cpp index d935da20041..d4b65187b4f 100644 --- a/yt/yql/providers/yt/comp_nodes/yql_mkql_block_table_content.cpp +++ b/yt/yql/providers/yt/comp_nodes/yql_mkql_block_table_content.cpp @@ -1,5 +1,5 @@ #include "yql_mkql_block_table_content.h" -#include "yql_mkql_file_block_stream.h" +#include "yql_mkql_file_list.h" #include <yql/essentials/minikql/computation/mkql_computation_node_impl.h> #include <yql/essentials/minikql/mkql_node_cast.h> @@ -20,19 +20,20 @@ class TYtBlockTableContentWrapper : public TMutableComputationNode<TYtBlockTable typedef TMutableComputationNode<TYtBlockTableContentWrapper> TBaseComputation; public: TYtBlockTableContentWrapper(TComputationMutables& mutables, NCommon::TCodecContext& codecCtx, - TVector<TString>&& files, const TString& inputSpec, TStructType* origStructType, bool decompress, std::optional<ui64> expectedRowCount) + TVector<TString>&& files, const TString& inputSpec, TType* listType, bool decompress, std::optional<ui64> expectedRowCount) : TBaseComputation(mutables) , Files_(std::move(files)) , Decompress_(decompress) , ExpectedRowCount_(std::move(expectedRowCount)) { Spec_.SetUseBlockInput(); + Spec_.SetInputBlockRepresentation(TMkqlIOSpecs::EBlockRepresentation::BlockStruct); Spec_.SetIsTableContent(); - Spec_.Init(codecCtx, inputSpec, {}, {}, origStructType, {}, TString()); + Spec_.Init(codecCtx, inputSpec, {}, {}, AS_TYPE(TListType, listType)->GetItemType(), {}, TString()); } NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const { - return ctx.HolderFactory.Create<TFileWideBlockStreamValue>(Spec_, ctx.HolderFactory, Files_, Decompress_, 4, 1_MB, ExpectedRowCount_); + return ctx.HolderFactory.Create<TFileListValue>(Spec_, ctx.HolderFactory, Files_, Decompress_, 4, 1_MB, ExpectedRowCount_); } private: @@ -47,15 +48,14 @@ private: IComputationNode* WrapYtBlockTableContent(NCommon::TCodecContext& codecCtx, TComputationMutables& mutables, TCallable& callable, TStringBuf pathPrefix) { - MKQL_ENSURE(callable.GetInputsCount() == 6, "Expected 6 arguments"); + MKQL_ENSURE(callable.GetInputsCount() == 5, "Expected 5 arguments"); TString uniqueId(AS_VALUE(TDataLiteral, callable.GetInput(0))->AsValue().AsStringRef()); - auto origStructType = AS_TYPE(TStructType, AS_VALUE(TTypeType, callable.GetInput(1))); - const ui32 tablesCount = AS_VALUE(TDataLiteral, callable.GetInput(2))->AsValue().Get<ui32>(); - TString inputSpec(AS_VALUE(TDataLiteral, callable.GetInput(3))->AsValue().AsStringRef()); - const bool decompress = AS_VALUE(TDataLiteral, callable.GetInput(4))->AsValue().Get<bool>(); + const ui32 tablesCount = AS_VALUE(TDataLiteral, callable.GetInput(1))->AsValue().Get<ui32>(); + TString inputSpec(AS_VALUE(TDataLiteral, callable.GetInput(2))->AsValue().AsStringRef()); + const bool decompress = AS_VALUE(TDataLiteral, callable.GetInput(3))->AsValue().Get<bool>(); std::optional<ui64> length; - TTupleLiteral* lengthTuple = AS_VALUE(TTupleLiteral, callable.GetInput(5)); + TTupleLiteral* lengthTuple = AS_VALUE(TTupleLiteral, callable.GetInput(4)); if (lengthTuple->GetValuesCount() > 0) { MKQL_ENSURE(lengthTuple->GetValuesCount() == 1, "Expect 1 element in the length tuple"); length = AS_VALUE(TDataLiteral, lengthTuple->GetValue(0))->AsValue().Get<ui64>(); @@ -67,7 +67,7 @@ IComputationNode* WrapYtBlockTableContent(NCommon::TCodecContext& codecCtx, } return new TYtBlockTableContentWrapper(mutables, codecCtx, std::move(files), inputSpec, - origStructType, decompress, length); + callable.GetType()->GetReturnType(), decompress, length); } } // NYql diff --git a/yt/yql/providers/yt/comp_nodes/yql_mkql_file_input_state.cpp b/yt/yql/providers/yt/comp_nodes/yql_mkql_file_input_state.cpp index d814246f76e..ae32ee6bb54 100644 --- a/yt/yql/providers/yt/comp_nodes/yql_mkql_file_input_state.cpp +++ b/yt/yql/providers/yt/comp_nodes/yql_mkql_file_input_state.cpp @@ -57,7 +57,8 @@ bool TFileInputState::NextValue() { MkqlReader_.Next(); if (Spec_->UseBlockInput_) { - auto blockCountValue = CurrentValue_.GetElement(Spec_->Inputs[CurrentInput_]->StructSize); + auto blockSizeStructIndex = GetBlockSizeStructIndex(*Spec_, CurrentInput_); + auto blockCountValue = CurrentValue_.GetElement(blockSizeStructIndex); CurrentRecord_ += GetBlockCount(blockCountValue); } else { ++CurrentRecord_; diff --git a/yt/yql/providers/yt/comp_nodes/yql_mkql_file_list.cpp b/yt/yql/providers/yt/comp_nodes/yql_mkql_file_list.cpp index 7d720cbbd5d..410abc6ca93 100644 --- a/yt/yql/providers/yt/comp_nodes/yql_mkql_file_list.cpp +++ b/yt/yql/providers/yt/comp_nodes/yql_mkql_file_list.cpp @@ -1,14 +1,16 @@ #include "yql_mkql_file_list.h" -#include "yql_mkql_file_input_state.h" + +#include <yql/essentials/minikql/computation/mkql_block_impl.h> namespace NYql { using namespace NKikimr::NMiniKQL; -TFileListValueBase::TIterator::TIterator(TMemoryUsageInfo* memInfo, THolder<IInputState>&& state, std::optional<ui64> length) +TFileListValueBase::TIterator::TIterator(TMemoryUsageInfo* memInfo, const TMkqlIOSpecs& spec, THolder<TFileInputState>&& state, std::optional<ui64> length) : TComputationValue(memInfo) , State_(std::move(state)) , ExpectedLength_(std::move(length)) + , Spec_(spec) { } @@ -22,19 +24,25 @@ bool TFileListValueBase::TIterator::Next(NUdf::TUnboxedValue& value) { return false; } + value = State_->GetCurrent(); if (ExpectedLength_) { MKQL_ENSURE(*ExpectedLength_ > 0, "Invalid file length. State: " << State_->DebugInfo()); - --(*ExpectedLength_); + if (Spec_.UseBlockInput_) { + auto blockSizeStructIndex = GetBlockSizeStructIndex(Spec_, State_->GetTableIndex()); + auto blockCountValue = value.GetElement(blockSizeStructIndex); + (*ExpectedLength_) -= GetBlockCount(blockCountValue); + } else { + --(*ExpectedLength_); + } } - value = State_->GetCurrent(); return true; } NUdf::TUnboxedValue TFileListValueBase::GetListIterator() const { - return NUdf::TUnboxedValuePod(new TIterator(GetMemInfo(), MakeState(), Length)); + return NUdf::TUnboxedValuePod(new TIterator(GetMemInfo(), Spec, MakeState(), Length)); } -THolder<IInputState> TFileListValue::MakeState() const { +THolder<TFileInputState> TFileListValue::MakeState() const { return MakeHolder<TFileInputState>(Spec, HolderFactory, MakeMkqlFileInputs(FilePaths, Decompress), BlockCount, BlockSize); } diff --git a/yt/yql/providers/yt/comp_nodes/yql_mkql_file_list.h b/yt/yql/providers/yt/comp_nodes/yql_mkql_file_list.h index 912a083efe5..aa0b8d184c9 100644 --- a/yt/yql/providers/yt/comp_nodes/yql_mkql_file_list.h +++ b/yt/yql/providers/yt/comp_nodes/yql_mkql_file_list.h @@ -3,6 +3,7 @@ #include "yql_mkql_input_stream.h" #include <yt/yql/providers/yt/codec/yt_codec.h> +#include <yt/yql/providers/yt/comp_nodes/yql_mkql_file_input_state.h> #include <yql/essentials/minikql/computation/mkql_computation_node.h> #include <yql/essentials/minikql/computation/mkql_custom_list.h> @@ -28,19 +29,21 @@ public: protected: class TIterator : public NKikimr::NMiniKQL::TComputationValue<TIterator> { public: - TIterator(NKikimr::NMiniKQL::TMemoryUsageInfo* memInfo, THolder<IInputState>&& state, std::optional<ui64> length); + TIterator(NKikimr::NMiniKQL::TMemoryUsageInfo* memInfo, const TMkqlIOSpecs& spec, THolder<TFileInputState>&& state, std::optional<ui64> length); private: bool Next(NUdf::TUnboxedValue& value) override; bool AtStart_ = true; - THolder<IInputState> State_; + THolder<TFileInputState> State_; std::optional<ui64> ExpectedLength_; + + const TMkqlIOSpecs& Spec_; }; NUdf::TUnboxedValue GetListIterator() const override; - virtual THolder<IInputState> MakeState() const = 0; + virtual THolder<TFileInputState> MakeState() const = 0; protected: const TMkqlIOSpecs& Spec; @@ -66,7 +69,7 @@ public: } protected: - THolder<IInputState> MakeState() const override; + THolder<TFileInputState> MakeState() const override; private: const TVector<TString> FilePaths; diff --git a/yt/yql/providers/yt/gateway/file/yql_yt_file_comp_nodes.cpp b/yt/yql/providers/yt/gateway/file/yql_yt_file_comp_nodes.cpp index 65453c8137e..889c06436f7 100644 --- a/yt/yql/providers/yt/gateway/file/yql_yt_file_comp_nodes.cpp +++ b/yt/yql/providers/yt/gateway/file/yql_yt_file_comp_nodes.cpp @@ -102,7 +102,7 @@ public: } protected: - THolder<IInputState> MakeState() const override { + THolder<TFileInputState> MakeState() const override { return MakeHolder<TFileInputStateWithTableState>(Spec, HolderFactory, MakeTextYsonInputs(TablePaths_), 0u, 1_MB, TTableState(TableState_)); } diff --git a/yt/yql/providers/yt/gateway/file/yql_yt_file_mkql_compiler.cpp b/yt/yql/providers/yt/gateway/file/yql_yt_file_mkql_compiler.cpp index f1ffba0d200..2296595dc7e 100644 --- a/yt/yql/providers/yt/gateway/file/yql_yt_file_mkql_compiler.cpp +++ b/yt/yql/providers/yt/gateway/file/yql_yt_file_mkql_compiler.cpp @@ -639,13 +639,7 @@ void RegisterYtFileMkqlCompilers(NCommon::TMkqlCallableCompilerBase& compiler) { output.Ref(), itemsCount, ctx, true); } - return ctx.ProgramBuilder.WideToBlocks(ctx.ProgramBuilder.FromFlow(ctx.ProgramBuilder.ExpandMap(ctx.ProgramBuilder.ToFlow(values), [&](TRuntimeNode item) -> TRuntimeNode::TList { - TRuntimeNode::TList result; - for (auto& origItem : origItemStructType->GetItems()) { - result.push_back(ctx.ProgramBuilder.Member(item, origItem->GetName())); - } - return result; - }))); + return ctx.ProgramBuilder.ListToBlocks(values); }); compiler.AddCallable({TYtSort::CallableName(), TYtCopy::CallableName(), TYtMerge::CallableName()}, diff --git a/yt/yql/providers/yt/gateway/native/yql_yt_transform.cpp b/yt/yql/providers/yt/gateway/native/yql_yt_transform.cpp index 26af2460bf8..18e6204a58d 100644 --- a/yt/yql/providers/yt/gateway/native/yql_yt_transform.cpp +++ b/yt/yql/providers/yt/gateway/native/yql_yt_transform.cpp @@ -82,11 +82,7 @@ TCallableVisitFunc TGatewayTransformer::operator()(TInternName internName) { if (EPhase::Content == Phase_ || EPhase::All == Phase_) { return [&, name, useBlocks](NMiniKQL::TCallable& callable, const TTypeEnvironment& env) { - if (useBlocks) { - YQL_ENSURE(callable.GetInputsCount() == 4, "Expected 4 args"); - } else { - YQL_ENSURE(callable.GetInputsCount() == 3, "Expected 3 args"); - } + YQL_ENSURE(callable.GetInputsCount() == 3, "Expected 3 args"); const TString cluster = ExecCtx_.Cluster_; const TString tmpFolder = GetTablesTmpFolder(*Settings_); @@ -331,7 +327,6 @@ TCallableVisitFunc TGatewayTransformer::operator()(TInternName internName) { callable.GetType()->GetReturnType()); if (useBlocks) { call.Add(PgmBuilder_.NewDataLiteral<NUdf::EDataSlot::String>(uniqueId)); - call.Add(callable.GetInput(3)); // orig struct type call.Add(PgmBuilder_.NewDataLiteral(tableList->GetItemsCount())); call.Add(PgmBuilder_.NewDataLiteral<NUdf::EDataSlot::String>(NYT::NodeToYsonString(specNode))); call.Add(PgmBuilder_.NewDataLiteral(ETableContentDeliveryMode::File == deliveryMode)); // use compression diff --git a/yt/yql/providers/yt/job/yql_job_user.cpp b/yt/yql/providers/yt/job/yql_job_user.cpp index f9e923ccffc..0b355e6609d 100644 --- a/yt/yql/providers/yt/job/yql_job_user.cpp +++ b/yt/yql/providers/yt/job/yql_job_user.cpp @@ -171,6 +171,7 @@ void TYqlUserJob::DoImpl(const TFile& inHandle, const TVector<TFile>& outHandles } if (UseBlockInput) { MkqlIOSpecs->SetUseBlockInput(); + MkqlIOSpecs->SetInputBlockRepresentation(TMkqlIOSpecs::EBlockRepresentation::WideBlock); } if (UseBlockOutput) { MkqlIOSpecs->SetUseBlockOutput(); diff --git a/yt/yql/providers/yt/lib/expr_traits/yql_expr_traits.cpp b/yt/yql/providers/yt/lib/expr_traits/yql_expr_traits.cpp index a55d9baf659..8d6b31a1571 100644 --- a/yt/yql/providers/yt/lib/expr_traits/yql_expr_traits.cpp +++ b/yt/yql/providers/yt/lib/expr_traits/yql_expr_traits.cpp @@ -372,6 +372,7 @@ namespace NYql { TStringBuf("Last"), TStringBuf("ToDict"), TStringBuf("SqueezeToDict"), + TStringBuf("BlockStorage"), TStringBuf("Iterator"), // Why? TStringBuf("Collect"), TStringBuf("Length"), diff --git a/yt/yql/providers/yt/provider/yql_yt_block_input.cpp b/yt/yql/providers/yt/provider/yql_yt_block_input.cpp index 5f0f3e07396..37313115425 100644 --- a/yt/yql/providers/yt/provider/yql_yt_block_input.cpp +++ b/yt/yql/providers/yt/provider/yql_yt_block_input.cpp @@ -68,46 +68,19 @@ private: return EnsureWideFlowType(mapLambda.Cast().Args().Arg(0).Ref(), ctx); } - TMaybeNode<TExprBase> TryTransformTableContent(TExprBase node, TExprContext& ctx, const TGetParents& getParents) const { + TMaybeNode<TExprBase> TryTransformTableContent(TExprBase node, TExprContext& ctx) const { auto tableContent = node.Cast<TYtTableContent>(); if (!NYql::HasSetting(tableContent.Settings().Ref(), EYtSettingType::BlockInputReady)) { return tableContent; } - const TParentsMap* parentsMap = getParents(); - if (auto it = parentsMap->find(tableContent.Raw()); it != parentsMap->end() && it->second.size() > 1) { - return tableContent; - } - YQL_CLOG(INFO, ProviderYt) << "Rewrite YtTableContent with block input"; - auto inputStructType = GetSeqItemType(tableContent.Ref().GetTypeAnn())->Cast<TStructExprType>(); - auto asStructBuilder = Build<TCoAsStruct>(ctx, tableContent.Pos()); - TExprNode::TListType narrowMapArgs; - for (auto& item : inputStructType->GetItems()) { - auto arg = ctx.NewArgument(tableContent.Pos(), item->GetName()); - asStructBuilder.Add<TCoNameValueTuple>() - .Name().Build(item->GetName()) - .Value(arg) - .Build(); - narrowMapArgs.push_back(std::move(arg)); - } - auto settings = RemoveSetting(tableContent.Settings().Ref(), EYtSettingType::BlockInputReady, ctx); - return Build<TCoForwardList>(ctx, tableContent.Pos()) - .Stream<TCoNarrowMap>() - .Input<TCoToFlow>() - .Input<TCoWideFromBlocks>() - .Input<TYtBlockTableContent>() - .Input(tableContent.Input()) - .Settings(settings) - .Build() - .Build() - .Build() - .Lambda() - .Args(narrowMapArgs) - .Body(asStructBuilder.Done()) - .Build() + return Build<TCoListFromBlocks>(ctx, tableContent.Pos()) + .Input<TYtBlockTableContent>() + .Input(tableContent.Input()) + .Settings(settings) .Build() .Done(); } diff --git a/yt/yql/providers/yt/provider/yql_yt_datasource_constraints.cpp b/yt/yql/providers/yt/provider/yql_yt_datasource_constraints.cpp index fcad1669af7..9e3bd1fbc2c 100644 --- a/yt/yql/providers/yt/provider/yql_yt_datasource_constraints.cpp +++ b/yt/yql/providers/yt/provider/yql_yt_datasource_constraints.cpp @@ -189,42 +189,9 @@ public: return TStatus::Ok; } - TStatus HandleBlockTableContent(TExprBase input, TExprContext& ctx) { + TStatus HandleBlockTableContent(TExprBase input, TExprContext& /*ctx*/) { TYtBlockTableContent tableContent = input.Cast<TYtBlockTableContent>(); - - auto listType = tableContent.Input().Maybe<TYtOutput>() - ? tableContent.Input().Ref().GetTypeAnn() - : tableContent.Input().Ref().GetTypeAnn()->Cast<TTupleExprType>()->GetItems().back(); - auto itemStructType = listType->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>(); - - auto pathRename = [&](TPartOfConstraintBase::TPathType path) -> std::vector<TPartOfConstraintBase::TPathType> { - YQL_ENSURE(!path.empty()); - - auto fieldIndex = itemStructType->FindItem(path[0]); - YQL_ENSURE(fieldIndex.Defined()); - - path[0] = ctx.GetIndexAsString(*fieldIndex); - return { path }; - }; - - TConstraintSet wideConstraints; - for (auto constraint : tableContent.Input().Ref().GetAllConstraints()) { - if (auto empty = dynamic_cast<const TEmptyConstraintNode*>(constraint)) { - wideConstraints.AddConstraint(ctx.MakeConstraint<TEmptyConstraintNode>()); - } else if (auto sorted = dynamic_cast<const TSortedConstraintNode*>(constraint)) { - wideConstraints.AddConstraint(sorted->RenameFields(ctx, pathRename)); - } else if (auto chopped = dynamic_cast<const TChoppedConstraintNode*>(constraint)) { - wideConstraints.AddConstraint(chopped->RenameFields(ctx, pathRename)); - } else if (auto unique = dynamic_cast<const TUniqueConstraintNode*>(constraint)) { - wideConstraints.AddConstraint(unique->RenameFields(ctx, pathRename)); - } else if (auto distinct = dynamic_cast<const TDistinctConstraintNode*>(constraint)) { - wideConstraints.AddConstraint(distinct->RenameFields(ctx, pathRename)); - } else { - YQL_ENSURE(false, "unexpected constraint"); - } - } - - input.Ptr()->SetConstraints(wideConstraints); + input.Ptr()->CopyConstraints(tableContent.Input().Ref()); return TStatus::Ok; } diff --git a/yt/yql/providers/yt/provider/yql_yt_datasource_type_ann.cpp b/yt/yql/providers/yt/provider/yql_yt_datasource_type_ann.cpp index 69dec1f558d..84c811cc816 100644 --- a/yt/yql/providers/yt/provider/yql_yt_datasource_type_ann.cpp +++ b/yt/yql/providers/yt/provider/yql_yt_datasource_type_ann.cpp @@ -898,14 +898,26 @@ public: auto listType = tableContent.Input().Maybe<TYtOutput>() ? tableContent.Input().Ref().GetTypeAnn() : tableContent.Input().Ref().GetTypeAnn()->Cast<TTupleExprType>()->GetItems().back(); - auto itemStructType = listType->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>(); + auto tableStructType = listType->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>(); + + TVector<const TItemExprType*> outputStructItems; + for (auto item : tableStructType->GetItems()) { + auto itemType = item->GetItemType(); + if (itemType->IsBlockOrScalar()) { + ctx.AddError(TIssue(ctx.GetPosition(input.Pos()), "Input type should not be a block or scalar")); + return IGraphTransformer::TStatus::Error; + } + + if (!EnsureSupportedAsBlockType(input.Pos(), *itemType, ctx, *State_->Types)) { + return IGraphTransformer::TStatus::Error; + } - TTypeAnnotationNode::TListType multiTypeItems; - for (auto& item: itemStructType->GetItems()) { - multiTypeItems.emplace_back(ctx.MakeType<TBlockExprType>(item->GetItemType())); + outputStructItems.push_back(ctx.MakeType<TItemExprType>(item->GetName(), ctx.MakeType<TBlockExprType>(itemType))); } - multiTypeItems.push_back(ctx.MakeType<TScalarExprType>(ctx.MakeType<TDataExprType>(EDataSlot::Uint64))); - input.Ptr()->SetTypeAnn(ctx.MakeType<TStreamExprType>(ctx.MakeType<TMultiExprType>(multiTypeItems))); + outputStructItems.push_back(ctx.MakeType<TItemExprType>(BlockLengthColumnName, ctx.MakeType<TScalarExprType>(ctx.MakeType<TDataExprType>(EDataSlot::Uint64)))); + + auto outputStructType = ctx.MakeType<TStructExprType>(outputStructItems); + input.Ptr()->SetTypeAnn(ctx.MakeType<TListExprType>(outputStructType)); if (auto columnOrder = State_->Types->LookupColumnOrder(tableContent.Input().Ref())) { return State_->Types->SetColumnOrder(input.Ref(), *columnOrder, ctx); diff --git a/yt/yql/providers/yt/provider/yql_yt_mkql_compiler.cpp b/yt/yql/providers/yt/provider/yql_yt_mkql_compiler.cpp index 0c3a1a208ba..a6c34dc8155 100644 --- a/yt/yql/providers/yt/provider/yql_yt_mkql_compiler.cpp +++ b/yt/yql/providers/yt/provider/yql_yt_mkql_compiler.cpp @@ -232,23 +232,13 @@ TRuntimeNode BuildTableContentCall(TStringBuf callName, samplingTupleItems.push_back(ctx.ProgramBuilder.NewDataLiteral(isSystemSampling)); } - TType* outType = nullptr; if (useBlocks) { - auto structType = AS_TYPE(TStructType, outItemType); - - std::vector<TType*> outputItems; - outputItems.reserve(structType->GetMembersCount()); - for (size_t i = 0; i < structType->GetMembersCount(); i++) { - outputItems.push_back(ctx.ProgramBuilder.NewBlockType(structType->GetMemberType(i), TBlockType::EShape::Many)); - } - outputItems.push_back(ctx.ProgramBuilder.NewBlockType(ctx.ProgramBuilder.NewDataType(NUdf::TDataType<ui64>::Id), TBlockType::EShape::Scalar)); - outType = ctx.ProgramBuilder.NewStreamType(ctx.ProgramBuilder.NewMultiType(outputItems)); - - } else { - outType = ctx.ProgramBuilder.NewListType(outItemType); + outItemType = ctx.ProgramBuilder.BuildBlockStructType(AS_TYPE(TStructType, outItemType)); } - TCallableBuilder call(ctx.ProgramBuilder.GetTypeEnvironment(), callName, outType); + auto outListType = ctx.ProgramBuilder.NewListType(outItemType); + + TCallableBuilder call(ctx.ProgramBuilder.GetTypeEnvironment(), callName, outListType); call.Add(ctx.ProgramBuilder.NewList(listTypeGroup, groups)); call.Add(ctx.ProgramBuilder.NewTuple(samplingTupleItems)); @@ -259,10 +249,6 @@ TRuntimeNode BuildTableContentCall(TStringBuf callName, call.Add(ctx.ProgramBuilder.NewEmptyTuple()); } - if (useBlocks) { - call.Add(TRuntimeNode(outItemType, true)); - } - auto res = TRuntimeNode(call.Build(), false); if (settings) { @@ -505,8 +491,8 @@ void RegisterYtMkqlCompilers(NCommon::TMkqlCallableCompilerBase& compiler) { [](const TExprNode& node, NCommon::TMkqlBuildContext& ctx) { TYtBlockTableContent tableContent(&node); if (node.GetConstraint<TEmptyConstraintNode>()) { - const auto streamType = ctx.BuildType(node, *node.GetTypeAnn()); - return ctx.ProgramBuilder.EmptyIterator(streamType); + const auto itemType = ctx.BuildType(node, GetSeqItemType(*node.GetTypeAnn())); + return ctx.ProgramBuilder.NewEmptyList(itemType); } auto origItemStructType = ( |