summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorziganshinmr <[email protected]>2025-03-31 15:50:04 +0300
committerziganshinmr <[email protected]>2025-03-31 16:08:18 +0300
commit30647eeae1a5f0019fd1b41b2a2835221e49ed46 (patch)
tree6890b601da77233fe0af248b034a635fef338a72
parent06c199b566aca570faf8661b96ea873a229f923a (diff)
YT block table content refactor
* Migrate YtBlockTableContent node to block struct type * Use ListFromBlock to resolve problems with multipage * Fix problems with Arrow decoder related to multiple files read commit_hash:4a2f5dd57c1359b17a9161ccb5c88754125609cb
-rw-r--r--yql/essentials/core/expr_nodes/yql_expr_nodes.json10
-rw-r--r--yql/essentials/core/type_ann/type_ann_blocks.cpp53
-rw-r--r--yql/essentials/core/type_ann/type_ann_blocks.h2
-rw-r--r--yql/essentials/core/type_ann/type_ann_core.cpp2
-rw-r--r--yql/essentials/core/yql_expr_constraint.cpp2
-rw-r--r--yql/essentials/core/yql_expr_type_annotation.cpp55
-rw-r--r--yql/essentials/core/yql_expr_type_annotation.h2
-rw-r--r--yql/essentials/core/yql_type_annotation.cpp2
-rw-r--r--yql/essentials/minikql/mkql_type_builder.cpp1
-rw-r--r--yql/essentials/providers/common/mkql/yql_provider_mkql.cpp2
-rw-r--r--yt/yql/providers/yt/codec/yt_codec.cpp13
-rw-r--r--yt/yql/providers/yt/codec/yt_codec.h13
-rw-r--r--yt/yql/providers/yt/codec/yt_codec_io.cpp65
-rw-r--r--yt/yql/providers/yt/codec/yt_codec_io.h2
-rw-r--r--yt/yql/providers/yt/comp_nodes/yql_mkql_block_table_content.cpp22
-rw-r--r--yt/yql/providers/yt/comp_nodes/yql_mkql_file_input_state.cpp3
-rw-r--r--yt/yql/providers/yt/comp_nodes/yql_mkql_file_list.cpp20
-rw-r--r--yt/yql/providers/yt/comp_nodes/yql_mkql_file_list.h11
-rw-r--r--yt/yql/providers/yt/gateway/file/yql_yt_file_comp_nodes.cpp2
-rw-r--r--yt/yql/providers/yt/gateway/file/yql_yt_file_mkql_compiler.cpp8
-rw-r--r--yt/yql/providers/yt/gateway/native/yql_yt_transform.cpp7
-rw-r--r--yt/yql/providers/yt/job/yql_job_user.cpp1
-rw-r--r--yt/yql/providers/yt/lib/expr_traits/yql_expr_traits.cpp1
-rw-r--r--yt/yql/providers/yt/provider/yql_yt_block_input.cpp37
-rw-r--r--yt/yql/providers/yt/provider/yql_yt_datasource_constraints.cpp37
-rw-r--r--yt/yql/providers/yt/provider/yql_yt_datasource_type_ann.cpp24
-rw-r--r--yt/yql/providers/yt/provider/yql_yt_mkql_compiler.cpp26
27 files changed, 280 insertions, 143 deletions
diff --git a/yql/essentials/core/expr_nodes/yql_expr_nodes.json b/yql/essentials/core/expr_nodes/yql_expr_nodes.json
index 7eb01b6114c..c051517eee9 100644
--- a/yql/essentials/core/expr_nodes/yql_expr_nodes.json
+++ b/yql/essentials/core/expr_nodes/yql_expr_nodes.json
@@ -2557,6 +2557,11 @@
"Match": {"Type": "Callable", "Name": "WideFromBlocks"}
},
{
+ "Name": "TCoListFromBlocks",
+ "Base": "TCoInputBase",
+ "Match": {"Type": "Callable", "Name": "ListFromBlocks"}
+ },
+ {
"Name": "TCoReplicateScalars",
"Base": "TCoInputBase",
"Match": {"Type": "Callable", "Name": "ReplicateScalars"},
@@ -2570,6 +2575,11 @@
"Match": {"Type": "Callable", "Name": "WideToBlocks"}
},
{
+ "Name": "TCoListToBlocks",
+ "Base": "TCoInputBase",
+ "Match": {"Type": "Callable", "Name": "ListToBlocks"}
+ },
+ {
"Name": "TCoPgSelect",
"Base": "TCallable",
"Match": {"Type": "Callable", "Name": "PgSelect"},
diff --git a/yql/essentials/core/type_ann/type_ann_blocks.cpp b/yql/essentials/core/type_ann/type_ann_blocks.cpp
index c674d61ed5c..20f09c46961 100644
--- a/yql/essentials/core/type_ann/type_ann_blocks.cpp
+++ b/yql/essentials/core/type_ann/type_ann_blocks.cpp
@@ -906,6 +906,43 @@ IGraphTransformer::TStatus WideToBlocksWrapper(const TExprNode::TPtr& input, TEx
return IGraphTransformer::TStatus::Ok;
}
+IGraphTransformer::TStatus ListToBlocksWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx) {
+ Y_UNUSED(output);
+ if (!EnsureArgsCount(*input, 1U, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (!EnsureListType(input->Head(), ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ const auto listItemType = input->Head().GetTypeAnn()->Cast<TListExprType>()->GetItemType();
+ if (!EnsureStructType(input->Head().Pos(), *listItemType, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+ const auto structType = listItemType->Cast<TStructExprType>();
+
+ TVector<const TItemExprType*> outputStructItems;
+ for (auto item : structType->GetItems()) {
+ auto itemType = item->GetItemType();
+ if (itemType->IsBlockOrScalar()) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), "Input type should not be a block or scalar"));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (!EnsureSupportedAsBlockType(input->Pos(), *itemType, ctx.Expr, ctx.Types)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ outputStructItems.push_back(ctx.Expr.MakeType<TItemExprType>(item->GetName(), ctx.Expr.MakeType<TBlockExprType>(itemType)));
+ }
+ outputStructItems.push_back(ctx.Expr.MakeType<TItemExprType>(BlockLengthColumnName, ctx.Expr.MakeType<TScalarExprType>(ctx.Expr.MakeType<TDataExprType>(EDataSlot::Uint64))));
+
+ auto outputStructType = ctx.Expr.MakeType<TStructExprType>(outputStructItems);
+ input->SetTypeAnn(ctx.Expr.MakeType<TListExprType>(outputStructType));
+ return IGraphTransformer::TStatus::Ok;
+}
+
IGraphTransformer::TStatus WideFromBlocksWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
Y_UNUSED(output);
if (!EnsureArgsCount(*input, 1U, ctx.Expr)) {
@@ -924,6 +961,22 @@ IGraphTransformer::TStatus WideFromBlocksWrapper(const TExprNode::TPtr& input, T
return IGraphTransformer::TStatus::Ok;
}
+IGraphTransformer::TStatus ListFromBlocksWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
+ Y_UNUSED(output);
+ if (!EnsureArgsCount(*input, 1U, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ TVector<const TItemExprType*> outputStructItems;
+ if (!EnsureBlockListType(input->Head(), outputStructItems, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ auto outputStructType = ctx.Expr.MakeType<TStructExprType>(outputStructItems);
+ input->SetTypeAnn(ctx.Expr.MakeType<TListExprType>(outputStructType));
+ return IGraphTransformer::TStatus::Ok;
+}
+
IGraphTransformer::TStatus WideSkipTakeBlocksWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
if (!EnsureArgsCount(*input, 2U, ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
diff --git a/yql/essentials/core/type_ann/type_ann_blocks.h b/yql/essentials/core/type_ann/type_ann_blocks.h
index 8a16376f9b7..223758ef7fc 100644
--- a/yql/essentials/core/type_ann/type_ann_blocks.h
+++ b/yql/essentials/core/type_ann/type_ann_blocks.h
@@ -28,7 +28,9 @@ namespace NTypeAnnImpl {
IGraphTransformer::TStatus BlockCombineHashedWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx);
IGraphTransformer::TStatus BlockMergeFinalizeHashedWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx);
IGraphTransformer::TStatus WideToBlocksWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx);
+ IGraphTransformer::TStatus ListToBlocksWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx);
IGraphTransformer::TStatus WideFromBlocksWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
+ IGraphTransformer::TStatus ListFromBlocksWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
IGraphTransformer::TStatus WideSkipTakeBlocksWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
IGraphTransformer::TStatus WideTopBlocksWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
IGraphTransformer::TStatus WideSortBlocksWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
diff --git a/yql/essentials/core/type_ann/type_ann_core.cpp b/yql/essentials/core/type_ann/type_ann_core.cpp
index 75e07ab3a72..cd6bd71608b 100644
--- a/yql/essentials/core/type_ann/type_ann_core.cpp
+++ b/yql/essentials/core/type_ann/type_ann_core.cpp
@@ -12983,6 +12983,7 @@ template <NKikimr::NUdf::EDataSlot DataSlot>
Functions["NarrowMultiMap"] = &NarrowMultiMapWrapper;
Functions["WideFromBlocks"] = &WideFromBlocksWrapper;
+ Functions["ListFromBlocks"] = &ListFromBlocksWrapper;
Functions["WideSkipBlocks"] = &WideSkipTakeBlocksWrapper;
Functions["WideTakeBlocks"] = &WideSkipTakeBlocksWrapper;
Functions["BlockCompress"] = &BlockCompressWrapper;
@@ -13018,6 +13019,7 @@ template <NKikimr::NUdf::EDataSlot DataSlot>
ExtFunctions["AsScalar"] = &AsScalarWrapper;
ExtFunctions["WideToBlocks"] = &WideToBlocksWrapper;
+ ExtFunctions["ListToBlocks"] = &ListToBlocksWrapper;
ExtFunctions["BlockCombineAll"] = &BlockCombineAllWrapper;
ExtFunctions["BlockCombineHashed"] = &BlockCombineHashedWrapper;
ExtFunctions["BlockMergeFinalizeHashed"] = &BlockMergeFinalizeHashedWrapper;
diff --git a/yql/essentials/core/yql_expr_constraint.cpp b/yql/essentials/core/yql_expr_constraint.cpp
index 32cba8f9d28..fdfc21946be 100644
--- a/yql/essentials/core/yql_expr_constraint.cpp
+++ b/yql/essentials/core/yql_expr_constraint.cpp
@@ -245,7 +245,9 @@ public:
Functions["WideTopSortBlocks"] = &TCallableConstraintTransformer::WideTopWrap<true>;
Functions["WideSortBlocks"] = &TCallableConstraintTransformer::WideTopWrap<true>;
Functions["WideToBlocks"] = &TCallableConstraintTransformer::CopyAllFrom<0>;
+ Functions["ListToBlocks"] = &TCallableConstraintTransformer::CopyAllFrom<0>;
Functions["WideFromBlocks"] = &TCallableConstraintTransformer::CopyAllFrom<0>;
+ Functions["ListFromBlocks"] = &TCallableConstraintTransformer::CopyAllFrom<0>;
Functions["ReplicateScalars"] = &TCallableConstraintTransformer::CopyAllFrom<0>;
Functions["BlockMergeFinalizeHashed"] = &TCallableConstraintTransformer::AggregateWrap<true>;
Functions["BlockMergeManyFinalizeHashed"] = &TCallableConstraintTransformer::AggregateWrap<true>;
diff --git a/yql/essentials/core/yql_expr_type_annotation.cpp b/yql/essentials/core/yql_expr_type_annotation.cpp
index 80207d35bc2..16e29a60c8e 100644
--- a/yql/essentials/core/yql_expr_type_annotation.cpp
+++ b/yql/essentials/core/yql_expr_type_annotation.cpp
@@ -10,6 +10,7 @@
#include <yql/essentials/minikql/dom/json.h>
#include <yql/essentials/minikql/dom/yson.h>
#include <yql/essentials/minikql/jsonpath/parser/parser.h>
+#include <yql/essentials/core/sql_types/block.h>
#include <yql/essentials/core/sql_types/simple_types.h>
#include "yql/essentials/parser/pg_catalog/catalog.h"
#include <yql/essentials/parser/pg_wrapper/interface/utils.h>
@@ -3269,6 +3270,52 @@ bool EnsureWideBlockType(TPositionHandle position, const TTypeAnnotationNode& ty
return true;
}
+bool EnsureBlockStructType(TPositionHandle position, const TTypeAnnotationNode& type, TVector<const TItemExprType*>& structItems, TExprContext& ctx) {
+ if (HasError(&type, ctx)) {
+ return false;
+ }
+
+ if (type.GetKind() != ETypeAnnotationKind::Struct) {
+ ctx.AddError(TIssue(ctx.GetPosition(position), TStringBuilder() << "Expected struct, but got: " << type));
+ return false;
+ }
+
+ auto& items = type.Cast<TStructExprType>()->GetItems();
+ if (items.empty()) {
+ ctx.AddError(TIssue(ctx.GetPosition(position), "Expected at least one column"));
+ return false;
+ }
+
+ bool hasBlockLengthColumn = false;
+ for (auto item : items) {
+ auto blockType = item->GetItemType();
+ if (!EnsureBlockOrScalarType(position, *blockType, ctx)) {
+ return false;
+ }
+
+ bool isScalar = false;
+ auto itemType = GetBlockItemType(*blockType, isScalar);
+
+ if (item->GetName() == BlockLengthColumnName) {
+ if (!isScalar) {
+ ctx.AddError(TIssue(ctx.GetPosition(position), "Block length column should be a scalar"));
+ return false;
+ }
+ if (!EnsureSpecificDataType(position, *itemType, EDataSlot::Uint64, ctx)) {
+ return false;
+ }
+ hasBlockLengthColumn = true;
+ } else {
+ structItems.push_back(ctx.MakeType<TItemExprType>(item->GetName(), itemType));
+ }
+ }
+ if (!hasBlockLengthColumn) {
+ ctx.AddError(TIssue(ctx.GetPosition(position), "Block struct must contain block length column"));
+ return false;
+ }
+ return true;
+}
+
bool EnsureWideFlowBlockType(const TExprNode& node, TTypeAnnotationNode::TListType& blockItemTypes, TExprContext& ctx, bool allowScalar) {
if (!EnsureWideFlowType(node, ctx)) {
return false;
@@ -3285,6 +3332,14 @@ bool EnsureWideStreamBlockType(const TExprNode& node, TTypeAnnotationNode::TList
return EnsureWideBlockType(node.Pos(), *node.GetTypeAnn()->Cast<TStreamExprType>()->GetItemType(), blockItemTypes, ctx, allowScalar);
}
+bool EnsureBlockListType(const TExprNode& node, TVector<const TItemExprType*>& structItems, TExprContext& ctx) {
+ if (!EnsureListType(node, ctx)) {
+ return false;
+ }
+
+ return EnsureBlockStructType(node.Pos(), *node.GetTypeAnn()->Cast<TListExprType>()->GetItemType(), structItems, ctx);
+}
+
bool EnsureOptionalType(const TExprNode& node, TExprContext& ctx) {
if (!node.GetTypeAnn()) {
YQL_ENSURE(node.Type() == TExprNode::Lambda);
diff --git a/yql/essentials/core/yql_expr_type_annotation.h b/yql/essentials/core/yql_expr_type_annotation.h
index b429f46f394..8538c45ab18 100644
--- a/yql/essentials/core/yql_expr_type_annotation.h
+++ b/yql/essentials/core/yql_expr_type_annotation.h
@@ -134,8 +134,10 @@ bool IsWideSequenceBlockType(const TTypeAnnotationNode& type);
bool IsSupportedAsBlockType(TPositionHandle pos, const TTypeAnnotationNode& type, TExprContext& ctx, TTypeAnnotationContext& types, bool reportUnspported = false);
bool EnsureSupportedAsBlockType(TPositionHandle pos, const TTypeAnnotationNode& type, TExprContext& ctx, TTypeAnnotationContext& types);
bool EnsureWideBlockType(TPositionHandle position, const TTypeAnnotationNode& type, TTypeAnnotationNode::TListType& blockItemTypes, TExprContext& ctx, bool allowScalar = true);
+bool EnsureBlockStructType(TPositionHandle position, const TTypeAnnotationNode& type, TVector<const TItemExprType*>& structItems, TExprContext& ctx);
bool EnsureWideFlowBlockType(const TExprNode& node, TTypeAnnotationNode::TListType& blockItemTypes, TExprContext& ctx, bool allowScalar = true);
bool EnsureWideStreamBlockType(const TExprNode& node, TTypeAnnotationNode::TListType& blockItemTypes, TExprContext& ctx, bool allowScalar = true);
+bool EnsureBlockListType(const TExprNode& node, TVector<const TItemExprType*>& structItems, TExprContext& ctx);
bool EnsureOptionalType(const TExprNode& node, TExprContext& ctx);
bool EnsureOptionalType(TPositionHandle position, const TTypeAnnotationNode& type, TExprContext& ctx);
bool EnsureType(const TExprNode& node, TExprContext& ctx);
diff --git a/yql/essentials/core/yql_type_annotation.cpp b/yql/essentials/core/yql_type_annotation.cpp
index 8934949d3fe..6de6ecf8bdf 100644
--- a/yql/essentials/core/yql_type_annotation.cpp
+++ b/yql/essentials/core/yql_type_annotation.cpp
@@ -303,7 +303,7 @@ IGraphTransformer::TStatus TTypeAnnotationContext::SetColumnOrder(const TExprNod
allColumns.erase(it);
}
- if (!allColumns.empty()) {
+ if (!allColumns.empty() && !(allColumns.size() == 1 && *allColumns.begin() == BlockLengthColumnName)) {
ctx.AddError(TIssue(ctx.GetPosition(node.Pos()),
TStringBuilder() << "Some columns are left unordered with column order " << FormatColumnOrder(columnOrder) << " for node "
<< node.Content() << " with type: " << *node.GetTypeAnn()));
diff --git a/yql/essentials/minikql/mkql_type_builder.cpp b/yql/essentials/minikql/mkql_type_builder.cpp
index c9d6e363b4f..41502b144ae 100644
--- a/yql/essentials/minikql/mkql_type_builder.cpp
+++ b/yql/essentials/minikql/mkql_type_builder.cpp
@@ -2820,7 +2820,6 @@ TType* TTypeBuilder::ValidateBlockStructType(const TStructType* structType) cons
MKQL_ENSURE(isScalar, "Block length column should be scalar");
MKQL_ENSURE(AS_TYPE(TDataType, itemType)->GetSchemeType() == NUdf::TDataType<ui64>::Id, "Expected Uint64");
- MKQL_ENSURE(!hasBlockLengthColumn, "Block struct must contain only one block length column");
hasBlockLengthColumn = true;
} else {
outStructItems.emplace_back(structType->GetMemberName(i), itemType);
diff --git a/yql/essentials/providers/common/mkql/yql_provider_mkql.cpp b/yql/essentials/providers/common/mkql/yql_provider_mkql.cpp
index 983bf855424..abe2f9c1e9a 100644
--- a/yql/essentials/providers/common/mkql/yql_provider_mkql.cpp
+++ b/yql/essentials/providers/common/mkql/yql_provider_mkql.cpp
@@ -448,7 +448,9 @@ TMkqlCommonCallableCompiler::TShared::TShared() {
{"FromFlow", &TProgramBuilder::FromFlow},
{"WideToBlocks", &TProgramBuilder::WideToBlocks},
+ {"ListToBlocks", &TProgramBuilder::ListToBlocks},
{"WideFromBlocks", &TProgramBuilder::WideFromBlocks},
+ {"ListFromBlocks", &TProgramBuilder::ListFromBlocks},
{"AsScalar", &TProgramBuilder::AsScalar},
{"Just", &TProgramBuilder::NewOptional},
diff --git a/yt/yql/providers/yt/codec/yt_codec.cpp b/yt/yql/providers/yt/codec/yt_codec.cpp
index 6e64136937f..14e317df97f 100644
--- a/yt/yql/providers/yt/codec/yt_codec.cpp
+++ b/yt/yql/providers/yt/codec/yt_codec.cpp
@@ -305,6 +305,13 @@ void TMkqlIOSpecs::InitDecoder(NCommon::TCodecContext& codecCtx,
}
}
+ if (InputBlockRepresentation_ == EBlockRepresentation::BlockStruct) {
+ if (auto pos = rowType->FindMemberIndex(BlockLengthColumnName)) {
+ virtualColumns.insert(*pos);
+ decoder.FillBlockStructSize = pos;
+ }
+ }
+
THashSet<ui32> usedPos;
for (ui32 index = 0; index < rowType->GetMembersCount(); ++index) {
auto name = rowType->GetMemberNameStr(index);
@@ -444,6 +451,7 @@ void TMkqlIOSpecs::InitInput(NCommon::TCodecContext& codecCtx,
TSpecInfo localSpecInfo;
TSpecInfo* specInfo = &localSpecInfo;
TString decoderRefName = TStringBuilder() << "_internal" << inputIndex;
+ bool newSpec = false;
if (inputSpecs[inputIndex].IsString()) {
auto refName = inputSpecs[inputIndex].AsString();
decoderRefName = refName;
@@ -453,9 +461,14 @@ void TMkqlIOSpecs::InitInput(NCommon::TCodecContext& codecCtx,
Y_ENSURE(inAttrs.HasKey(YqlIOSpecRegistry) && inAttrs[YqlIOSpecRegistry].HasKey(refName), "Bad input registry reference: " << refName);
specInfo = &specInfoRegistry[refName];
LoadSpecInfo(true, inAttrs[YqlIOSpecRegistry][refName], codecCtx, *specInfo);
+ newSpec = true;
}
} else {
LoadSpecInfo(true, inputSpecs[inputIndex], codecCtx, localSpecInfo);
+ newSpec = true;
+ }
+ if (InputBlockRepresentation_ == EBlockRepresentation::BlockStruct && newSpec) {
+ specInfo->Type = codecCtx.Builder.NewStructType(specInfo->Type, BlockLengthColumnName, TDataType::Create(NUdf::TDataType<ui64>::Id, codecCtx.Env));
}
TStructType* inStruct = AS_TYPE(TStructType, specInfo->Type);
diff --git a/yt/yql/providers/yt/codec/yt_codec.h b/yt/yql/providers/yt/codec/yt_codec.h
index 4e6ef543a5f..ed309564799 100644
--- a/yt/yql/providers/yt/codec/yt_codec.h
+++ b/yt/yql/providers/yt/codec/yt_codec.h
@@ -31,6 +31,12 @@ public:
Y_DECLARE_FLAGS(TSystemFields, ESystemField);
+ enum class EBlockRepresentation {
+ None,
+ WideBlock,
+ BlockStruct,
+ };
+
struct TSpecInfo {
NKikimr::NMiniKQL::TType* Type = nullptr;
bool StrictSchema = true;
@@ -65,6 +71,7 @@ public:
TMaybe<ui32> FillSysColumnIndex;
TMaybe<ui32> FillSysColumnNum;
TMaybe<ui32> FillSysColumnKeySwitch;
+ TMaybe<ui32> FillBlockStructSize;
};
struct TEncoderSpec {
@@ -137,6 +144,10 @@ public:
IsTableContent_ = true;
}
+ void SetInputBlockRepresentation(EBlockRepresentation type) {
+ InputBlockRepresentation_ = type;
+ }
+
void SetTableOffsets(const TVector<ui64>& offsets);
void Clear();
@@ -156,6 +167,8 @@ public:
TString OptLLVM_;
TSystemFields SystemFields_;
+ EBlockRepresentation InputBlockRepresentation_ = EBlockRepresentation::None;
+
NKikimr::NMiniKQL::IStatsRegistry* JobStats_ = nullptr;
THashMap<TString, TDecoderSpec> Decoders;
TVector<const TDecoderSpec*> Inputs;
diff --git a/yt/yql/providers/yt/codec/yt_codec_io.cpp b/yt/yql/providers/yt/codec/yt_codec_io.cpp
index 0a6b31f4e77..a46ecc0f680 100644
--- a/yt/yql/providers/yt/codec/yt_codec_io.cpp
+++ b/yt/yql/providers/yt/codec/yt_codec_io.cpp
@@ -651,7 +651,7 @@ struct TMkqlReaderImpl::TDecoder {
KeySwitch_ = false;
}
- void Reset(bool hasRangeIndices, ui32 tableIndex, bool ignoreStreamTableIndex) {
+ virtual void Reset(bool hasRangeIndices, ui32 tableIndex, bool ignoreStreamTableIndex) {
HasRangeIndices_ = hasRangeIndices;
TableIndex_ = tableIndex;
AtStart_ = true;
@@ -1463,7 +1463,7 @@ public:
, Pool_(pool)
{
InputStream_ = std::make_unique<TInputBufArrowInputStream>(buf, pool);
- ResetColumnConverters();
+ HandleTableSwitch();
HandlesSysColumns_ = true;
}
@@ -1482,14 +1482,19 @@ public:
YQL_ENSURE(!Chunks_.empty());
}
+ bool isWideBlock = (Specs_.InputBlockRepresentation_ == TMkqlIOSpecs::EBlockRepresentation::WideBlock);
+
auto& decoder = *Specs_.Inputs[TableIndex_];
- Row_ = SpecsCache_.NewRow(TableIndex_, items, true);
+ Row_ = SpecsCache_.NewRow(TableIndex_, items, isWideBlock);
auto& [chunkRowIndex, chunkLen, chunk] = Chunks_.front();
for (size_t i = 0; i < decoder.StructSize; i++) {
+ if (i == decoder.FillBlockStructSize) {
+ continue;
+ }
items[i] = SpecsCache_.GetHolderFactory().CreateArrowBlock(std::move(chunk[i]));
}
- items[decoder.StructSize] = SpecsCache_.GetHolderFactory().CreateArrowBlock(arrow::Datum(static_cast<uint64_t>(chunkLen)));
+ items[BlockSizeStructIndex_] = SpecsCache_.GetHolderFactory().CreateArrowBlock(arrow::Datum(static_cast<uint64_t>(chunkLen)));
RowIndex_ = chunkRowIndex;
Chunks_.pop_front();
@@ -1505,17 +1510,17 @@ public:
}
StreamReader_ = ARROW_RESULT(streamReaderResult);
- auto oldTableIndex = TableIndex_;
if (!IgnoreStreamTableIndex) {
+ auto oldTableIndex = TableIndex_;
auto tableIdKey = StreamReader_->schema()->metadata()->Get("TableId");
if (tableIdKey.ok()) {
TableIndex_ = std::stoi(tableIdKey.ValueOrDie());
YQL_ENSURE(TableIndex_ < Specs_.Inputs.size());
}
- }
- if (TableIndex_ != oldTableIndex) {
- ResetColumnConverters();
+ if (TableIndex_ != oldTableIndex) {
+ HandleTableSwitch();
+ }
}
}
@@ -1523,6 +1528,8 @@ public:
ARROW_OK(StreamReader_->ReadNext(&batch));
if (!batch) {
if (InputStream_->EOSReached()) {
+ // Prepare for possible table switch
+ StreamReader_.reset();
return false;
}
@@ -1565,6 +1572,9 @@ public:
}
} else if (decoder.FillSysColumnIndex == inputFields[i].StructIndex) {
convertedColumn = ARROW_RESULT(arrow::MakeArrayFromScalar(arrow::UInt32Scalar(TableIndex_), batch->num_rows()));
+ } else if (decoder.FillBlockStructSize == inputFields[i].StructIndex) {
+ // Actual value will be specified later
+ convertedColumn = arrow::Datum(static_cast<uint64_t>(0));
} else if (inputFields[i].StructIndex == Max<ui32>()) {
// Input field won't appear in the result
continue;
@@ -1593,14 +1603,22 @@ public:
return true;
}
- void ResetColumnConverters() {
- auto& fields = Specs_.Inputs[TableIndex_]->FieldsVec;
+ void HandleTableSwitch() {
+ auto& decoder = Specs_.Inputs[TableIndex_];
+
ColumnConverters_.clear();
- ColumnConverters_.reserve(fields.size());
- for (auto& field: fields) {
+ ColumnConverters_.reserve(decoder->FieldsVec.size());
+ for (auto& field: decoder->FieldsVec) {
YQL_ENSURE(!field.Type->IsPg());
ColumnConverters_.emplace_back(MakeYtColumnConverter(field.Type, nullptr, *Pool_, Specs_.Inputs[TableIndex_]->NativeYtTypeFlags));
}
+
+ BlockSizeStructIndex_ = GetBlockSizeStructIndex(Specs_, TableIndex_);
+ }
+
+ void Reset(bool hasRangeIndices, ui32 tableIndex, bool ignoreStreamTableIndex) override {
+ TDecoder::Reset(hasRangeIndices, tableIndex, ignoreStreamTableIndex);
+ HandleTableSwitch();
}
private:
@@ -1610,6 +1628,8 @@ private:
TDeque<std::tuple<ui64, ui64, std::vector<arrow::Datum>>> Chunks_;
+ size_t BlockSizeStructIndex_ = 0;
+
const TMkqlIOSpecs& Specs_;
arrow::MemoryPool* Pool_;
};
@@ -2517,6 +2537,27 @@ void DecodeToYson(TMkqlIOCache& specsCache, size_t tableIndex, const NUdf::TUnbo
WriteRowItems(specsCache, tableIndex, items, {}, ysonOut);
}
+ui32 GetBlockSizeStructIndex(const TMkqlIOSpecs& specs, size_t tableIndex) {
+ auto& decoder = specs.Inputs[tableIndex];
+
+ ui32 blockSizeStructIndex = 0;
+ switch (specs.InputBlockRepresentation_) {
+ case TMkqlIOSpecs::EBlockRepresentation::WideBlock:
+ blockSizeStructIndex = decoder->StructSize;
+ break;
+
+ case TMkqlIOSpecs::EBlockRepresentation::BlockStruct:
+ YQL_ENSURE(decoder->FillBlockStructSize.Defined());
+ blockSizeStructIndex = *decoder->FillBlockStructSize;
+ break;
+
+ default:
+ YQL_ENSURE(false, "unknown block representation");
+ }
+
+ return blockSizeStructIndex;
+}
+
//////////////////////////////////////////////////////////////////////////////////////////////////////////
} // NYql
diff --git a/yt/yql/providers/yt/codec/yt_codec_io.h b/yt/yql/providers/yt/codec/yt_codec_io.h
index 47f8d098635..3a8c4212954 100644
--- a/yt/yql/providers/yt/codec/yt_codec_io.h
+++ b/yt/yql/providers/yt/codec/yt_codec_io.h
@@ -164,4 +164,6 @@ void DecodeToYson(TMkqlIOCache& specsCache, size_t tableIndex, const NKikimr::NU
THolder<NCommon::IBlockReader> MakeBlockReader(NYT::TRawTableReader& source, size_t blockCount, size_t blockSize);
+ui32 GetBlockSizeStructIndex(const TMkqlIOSpecs& specs, size_t tableIndex);
+
} // NYql
diff --git a/yt/yql/providers/yt/comp_nodes/yql_mkql_block_table_content.cpp b/yt/yql/providers/yt/comp_nodes/yql_mkql_block_table_content.cpp
index d935da20041..d4b65187b4f 100644
--- a/yt/yql/providers/yt/comp_nodes/yql_mkql_block_table_content.cpp
+++ b/yt/yql/providers/yt/comp_nodes/yql_mkql_block_table_content.cpp
@@ -1,5 +1,5 @@
#include "yql_mkql_block_table_content.h"
-#include "yql_mkql_file_block_stream.h"
+#include "yql_mkql_file_list.h"
#include <yql/essentials/minikql/computation/mkql_computation_node_impl.h>
#include <yql/essentials/minikql/mkql_node_cast.h>
@@ -20,19 +20,20 @@ class TYtBlockTableContentWrapper : public TMutableComputationNode<TYtBlockTable
typedef TMutableComputationNode<TYtBlockTableContentWrapper> TBaseComputation;
public:
TYtBlockTableContentWrapper(TComputationMutables& mutables, NCommon::TCodecContext& codecCtx,
- TVector<TString>&& files, const TString& inputSpec, TStructType* origStructType, bool decompress, std::optional<ui64> expectedRowCount)
+ TVector<TString>&& files, const TString& inputSpec, TType* listType, bool decompress, std::optional<ui64> expectedRowCount)
: TBaseComputation(mutables)
, Files_(std::move(files))
, Decompress_(decompress)
, ExpectedRowCount_(std::move(expectedRowCount))
{
Spec_.SetUseBlockInput();
+ Spec_.SetInputBlockRepresentation(TMkqlIOSpecs::EBlockRepresentation::BlockStruct);
Spec_.SetIsTableContent();
- Spec_.Init(codecCtx, inputSpec, {}, {}, origStructType, {}, TString());
+ Spec_.Init(codecCtx, inputSpec, {}, {}, AS_TYPE(TListType, listType)->GetItemType(), {}, TString());
}
NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
- return ctx.HolderFactory.Create<TFileWideBlockStreamValue>(Spec_, ctx.HolderFactory, Files_, Decompress_, 4, 1_MB, ExpectedRowCount_);
+ return ctx.HolderFactory.Create<TFileListValue>(Spec_, ctx.HolderFactory, Files_, Decompress_, 4, 1_MB, ExpectedRowCount_);
}
private:
@@ -47,15 +48,14 @@ private:
IComputationNode* WrapYtBlockTableContent(NCommon::TCodecContext& codecCtx,
TComputationMutables& mutables, TCallable& callable, TStringBuf pathPrefix)
{
- MKQL_ENSURE(callable.GetInputsCount() == 6, "Expected 6 arguments");
+ MKQL_ENSURE(callable.GetInputsCount() == 5, "Expected 5 arguments");
TString uniqueId(AS_VALUE(TDataLiteral, callable.GetInput(0))->AsValue().AsStringRef());
- auto origStructType = AS_TYPE(TStructType, AS_VALUE(TTypeType, callable.GetInput(1)));
- const ui32 tablesCount = AS_VALUE(TDataLiteral, callable.GetInput(2))->AsValue().Get<ui32>();
- TString inputSpec(AS_VALUE(TDataLiteral, callable.GetInput(3))->AsValue().AsStringRef());
- const bool decompress = AS_VALUE(TDataLiteral, callable.GetInput(4))->AsValue().Get<bool>();
+ const ui32 tablesCount = AS_VALUE(TDataLiteral, callable.GetInput(1))->AsValue().Get<ui32>();
+ TString inputSpec(AS_VALUE(TDataLiteral, callable.GetInput(2))->AsValue().AsStringRef());
+ const bool decompress = AS_VALUE(TDataLiteral, callable.GetInput(3))->AsValue().Get<bool>();
std::optional<ui64> length;
- TTupleLiteral* lengthTuple = AS_VALUE(TTupleLiteral, callable.GetInput(5));
+ TTupleLiteral* lengthTuple = AS_VALUE(TTupleLiteral, callable.GetInput(4));
if (lengthTuple->GetValuesCount() > 0) {
MKQL_ENSURE(lengthTuple->GetValuesCount() == 1, "Expect 1 element in the length tuple");
length = AS_VALUE(TDataLiteral, lengthTuple->GetValue(0))->AsValue().Get<ui64>();
@@ -67,7 +67,7 @@ IComputationNode* WrapYtBlockTableContent(NCommon::TCodecContext& codecCtx,
}
return new TYtBlockTableContentWrapper(mutables, codecCtx, std::move(files), inputSpec,
- origStructType, decompress, length);
+ callable.GetType()->GetReturnType(), decompress, length);
}
} // NYql
diff --git a/yt/yql/providers/yt/comp_nodes/yql_mkql_file_input_state.cpp b/yt/yql/providers/yt/comp_nodes/yql_mkql_file_input_state.cpp
index d814246f76e..ae32ee6bb54 100644
--- a/yt/yql/providers/yt/comp_nodes/yql_mkql_file_input_state.cpp
+++ b/yt/yql/providers/yt/comp_nodes/yql_mkql_file_input_state.cpp
@@ -57,7 +57,8 @@ bool TFileInputState::NextValue() {
MkqlReader_.Next();
if (Spec_->UseBlockInput_) {
- auto blockCountValue = CurrentValue_.GetElement(Spec_->Inputs[CurrentInput_]->StructSize);
+ auto blockSizeStructIndex = GetBlockSizeStructIndex(*Spec_, CurrentInput_);
+ auto blockCountValue = CurrentValue_.GetElement(blockSizeStructIndex);
CurrentRecord_ += GetBlockCount(blockCountValue);
} else {
++CurrentRecord_;
diff --git a/yt/yql/providers/yt/comp_nodes/yql_mkql_file_list.cpp b/yt/yql/providers/yt/comp_nodes/yql_mkql_file_list.cpp
index 7d720cbbd5d..410abc6ca93 100644
--- a/yt/yql/providers/yt/comp_nodes/yql_mkql_file_list.cpp
+++ b/yt/yql/providers/yt/comp_nodes/yql_mkql_file_list.cpp
@@ -1,14 +1,16 @@
#include "yql_mkql_file_list.h"
-#include "yql_mkql_file_input_state.h"
+
+#include <yql/essentials/minikql/computation/mkql_block_impl.h>
namespace NYql {
using namespace NKikimr::NMiniKQL;
-TFileListValueBase::TIterator::TIterator(TMemoryUsageInfo* memInfo, THolder<IInputState>&& state, std::optional<ui64> length)
+TFileListValueBase::TIterator::TIterator(TMemoryUsageInfo* memInfo, const TMkqlIOSpecs& spec, THolder<TFileInputState>&& state, std::optional<ui64> length)
: TComputationValue(memInfo)
, State_(std::move(state))
, ExpectedLength_(std::move(length))
+ , Spec_(spec)
{
}
@@ -22,19 +24,25 @@ bool TFileListValueBase::TIterator::Next(NUdf::TUnboxedValue& value) {
return false;
}
+ value = State_->GetCurrent();
if (ExpectedLength_) {
MKQL_ENSURE(*ExpectedLength_ > 0, "Invalid file length. State: " << State_->DebugInfo());
- --(*ExpectedLength_);
+ if (Spec_.UseBlockInput_) {
+ auto blockSizeStructIndex = GetBlockSizeStructIndex(Spec_, State_->GetTableIndex());
+ auto blockCountValue = value.GetElement(blockSizeStructIndex);
+ (*ExpectedLength_) -= GetBlockCount(blockCountValue);
+ } else {
+ --(*ExpectedLength_);
+ }
}
- value = State_->GetCurrent();
return true;
}
NUdf::TUnboxedValue TFileListValueBase::GetListIterator() const {
- return NUdf::TUnboxedValuePod(new TIterator(GetMemInfo(), MakeState(), Length));
+ return NUdf::TUnboxedValuePod(new TIterator(GetMemInfo(), Spec, MakeState(), Length));
}
-THolder<IInputState> TFileListValue::MakeState() const {
+THolder<TFileInputState> TFileListValue::MakeState() const {
return MakeHolder<TFileInputState>(Spec, HolderFactory, MakeMkqlFileInputs(FilePaths, Decompress), BlockCount, BlockSize);
}
diff --git a/yt/yql/providers/yt/comp_nodes/yql_mkql_file_list.h b/yt/yql/providers/yt/comp_nodes/yql_mkql_file_list.h
index 912a083efe5..aa0b8d184c9 100644
--- a/yt/yql/providers/yt/comp_nodes/yql_mkql_file_list.h
+++ b/yt/yql/providers/yt/comp_nodes/yql_mkql_file_list.h
@@ -3,6 +3,7 @@
#include "yql_mkql_input_stream.h"
#include <yt/yql/providers/yt/codec/yt_codec.h>
+#include <yt/yql/providers/yt/comp_nodes/yql_mkql_file_input_state.h>
#include <yql/essentials/minikql/computation/mkql_computation_node.h>
#include <yql/essentials/minikql/computation/mkql_custom_list.h>
@@ -28,19 +29,21 @@ public:
protected:
class TIterator : public NKikimr::NMiniKQL::TComputationValue<TIterator> {
public:
- TIterator(NKikimr::NMiniKQL::TMemoryUsageInfo* memInfo, THolder<IInputState>&& state, std::optional<ui64> length);
+ TIterator(NKikimr::NMiniKQL::TMemoryUsageInfo* memInfo, const TMkqlIOSpecs& spec, THolder<TFileInputState>&& state, std::optional<ui64> length);
private:
bool Next(NUdf::TUnboxedValue& value) override;
bool AtStart_ = true;
- THolder<IInputState> State_;
+ THolder<TFileInputState> State_;
std::optional<ui64> ExpectedLength_;
+
+ const TMkqlIOSpecs& Spec_;
};
NUdf::TUnboxedValue GetListIterator() const override;
- virtual THolder<IInputState> MakeState() const = 0;
+ virtual THolder<TFileInputState> MakeState() const = 0;
protected:
const TMkqlIOSpecs& Spec;
@@ -66,7 +69,7 @@ public:
}
protected:
- THolder<IInputState> MakeState() const override;
+ THolder<TFileInputState> MakeState() const override;
private:
const TVector<TString> FilePaths;
diff --git a/yt/yql/providers/yt/gateway/file/yql_yt_file_comp_nodes.cpp b/yt/yql/providers/yt/gateway/file/yql_yt_file_comp_nodes.cpp
index 65453c8137e..889c06436f7 100644
--- a/yt/yql/providers/yt/gateway/file/yql_yt_file_comp_nodes.cpp
+++ b/yt/yql/providers/yt/gateway/file/yql_yt_file_comp_nodes.cpp
@@ -102,7 +102,7 @@ public:
}
protected:
- THolder<IInputState> MakeState() const override {
+ THolder<TFileInputState> MakeState() const override {
return MakeHolder<TFileInputStateWithTableState>(Spec, HolderFactory, MakeTextYsonInputs(TablePaths_),
0u, 1_MB, TTableState(TableState_));
}
diff --git a/yt/yql/providers/yt/gateway/file/yql_yt_file_mkql_compiler.cpp b/yt/yql/providers/yt/gateway/file/yql_yt_file_mkql_compiler.cpp
index f1ffba0d200..2296595dc7e 100644
--- a/yt/yql/providers/yt/gateway/file/yql_yt_file_mkql_compiler.cpp
+++ b/yt/yql/providers/yt/gateway/file/yql_yt_file_mkql_compiler.cpp
@@ -639,13 +639,7 @@ void RegisterYtFileMkqlCompilers(NCommon::TMkqlCallableCompilerBase& compiler) {
output.Ref(), itemsCount, ctx, true);
}
- return ctx.ProgramBuilder.WideToBlocks(ctx.ProgramBuilder.FromFlow(ctx.ProgramBuilder.ExpandMap(ctx.ProgramBuilder.ToFlow(values), [&](TRuntimeNode item) -> TRuntimeNode::TList {
- TRuntimeNode::TList result;
- for (auto& origItem : origItemStructType->GetItems()) {
- result.push_back(ctx.ProgramBuilder.Member(item, origItem->GetName()));
- }
- return result;
- })));
+ return ctx.ProgramBuilder.ListToBlocks(values);
});
compiler.AddCallable({TYtSort::CallableName(), TYtCopy::CallableName(), TYtMerge::CallableName()},
diff --git a/yt/yql/providers/yt/gateway/native/yql_yt_transform.cpp b/yt/yql/providers/yt/gateway/native/yql_yt_transform.cpp
index 26af2460bf8..18e6204a58d 100644
--- a/yt/yql/providers/yt/gateway/native/yql_yt_transform.cpp
+++ b/yt/yql/providers/yt/gateway/native/yql_yt_transform.cpp
@@ -82,11 +82,7 @@ TCallableVisitFunc TGatewayTransformer::operator()(TInternName internName) {
if (EPhase::Content == Phase_ || EPhase::All == Phase_) {
return [&, name, useBlocks](NMiniKQL::TCallable& callable, const TTypeEnvironment& env) {
- if (useBlocks) {
- YQL_ENSURE(callable.GetInputsCount() == 4, "Expected 4 args");
- } else {
- YQL_ENSURE(callable.GetInputsCount() == 3, "Expected 3 args");
- }
+ YQL_ENSURE(callable.GetInputsCount() == 3, "Expected 3 args");
const TString cluster = ExecCtx_.Cluster_;
const TString tmpFolder = GetTablesTmpFolder(*Settings_);
@@ -331,7 +327,6 @@ TCallableVisitFunc TGatewayTransformer::operator()(TInternName internName) {
callable.GetType()->GetReturnType());
if (useBlocks) {
call.Add(PgmBuilder_.NewDataLiteral<NUdf::EDataSlot::String>(uniqueId));
- call.Add(callable.GetInput(3)); // orig struct type
call.Add(PgmBuilder_.NewDataLiteral(tableList->GetItemsCount()));
call.Add(PgmBuilder_.NewDataLiteral<NUdf::EDataSlot::String>(NYT::NodeToYsonString(specNode)));
call.Add(PgmBuilder_.NewDataLiteral(ETableContentDeliveryMode::File == deliveryMode)); // use compression
diff --git a/yt/yql/providers/yt/job/yql_job_user.cpp b/yt/yql/providers/yt/job/yql_job_user.cpp
index f9e923ccffc..0b355e6609d 100644
--- a/yt/yql/providers/yt/job/yql_job_user.cpp
+++ b/yt/yql/providers/yt/job/yql_job_user.cpp
@@ -171,6 +171,7 @@ void TYqlUserJob::DoImpl(const TFile& inHandle, const TVector<TFile>& outHandles
}
if (UseBlockInput) {
MkqlIOSpecs->SetUseBlockInput();
+ MkqlIOSpecs->SetInputBlockRepresentation(TMkqlIOSpecs::EBlockRepresentation::WideBlock);
}
if (UseBlockOutput) {
MkqlIOSpecs->SetUseBlockOutput();
diff --git a/yt/yql/providers/yt/lib/expr_traits/yql_expr_traits.cpp b/yt/yql/providers/yt/lib/expr_traits/yql_expr_traits.cpp
index a55d9baf659..8d6b31a1571 100644
--- a/yt/yql/providers/yt/lib/expr_traits/yql_expr_traits.cpp
+++ b/yt/yql/providers/yt/lib/expr_traits/yql_expr_traits.cpp
@@ -372,6 +372,7 @@ namespace NYql {
TStringBuf("Last"),
TStringBuf("ToDict"),
TStringBuf("SqueezeToDict"),
+ TStringBuf("BlockStorage"),
TStringBuf("Iterator"), // Why?
TStringBuf("Collect"),
TStringBuf("Length"),
diff --git a/yt/yql/providers/yt/provider/yql_yt_block_input.cpp b/yt/yql/providers/yt/provider/yql_yt_block_input.cpp
index 5f0f3e07396..37313115425 100644
--- a/yt/yql/providers/yt/provider/yql_yt_block_input.cpp
+++ b/yt/yql/providers/yt/provider/yql_yt_block_input.cpp
@@ -68,46 +68,19 @@ private:
return EnsureWideFlowType(mapLambda.Cast().Args().Arg(0).Ref(), ctx);
}
- TMaybeNode<TExprBase> TryTransformTableContent(TExprBase node, TExprContext& ctx, const TGetParents& getParents) const {
+ TMaybeNode<TExprBase> TryTransformTableContent(TExprBase node, TExprContext& ctx) const {
auto tableContent = node.Cast<TYtTableContent>();
if (!NYql::HasSetting(tableContent.Settings().Ref(), EYtSettingType::BlockInputReady)) {
return tableContent;
}
- const TParentsMap* parentsMap = getParents();
- if (auto it = parentsMap->find(tableContent.Raw()); it != parentsMap->end() && it->second.size() > 1) {
- return tableContent;
- }
-
YQL_CLOG(INFO, ProviderYt) << "Rewrite YtTableContent with block input";
- auto inputStructType = GetSeqItemType(tableContent.Ref().GetTypeAnn())->Cast<TStructExprType>();
- auto asStructBuilder = Build<TCoAsStruct>(ctx, tableContent.Pos());
- TExprNode::TListType narrowMapArgs;
- for (auto& item : inputStructType->GetItems()) {
- auto arg = ctx.NewArgument(tableContent.Pos(), item->GetName());
- asStructBuilder.Add<TCoNameValueTuple>()
- .Name().Build(item->GetName())
- .Value(arg)
- .Build();
- narrowMapArgs.push_back(std::move(arg));
- }
-
auto settings = RemoveSetting(tableContent.Settings().Ref(), EYtSettingType::BlockInputReady, ctx);
- return Build<TCoForwardList>(ctx, tableContent.Pos())
- .Stream<TCoNarrowMap>()
- .Input<TCoToFlow>()
- .Input<TCoWideFromBlocks>()
- .Input<TYtBlockTableContent>()
- .Input(tableContent.Input())
- .Settings(settings)
- .Build()
- .Build()
- .Build()
- .Lambda()
- .Args(narrowMapArgs)
- .Body(asStructBuilder.Done())
- .Build()
+ return Build<TCoListFromBlocks>(ctx, tableContent.Pos())
+ .Input<TYtBlockTableContent>()
+ .Input(tableContent.Input())
+ .Settings(settings)
.Build()
.Done();
}
diff --git a/yt/yql/providers/yt/provider/yql_yt_datasource_constraints.cpp b/yt/yql/providers/yt/provider/yql_yt_datasource_constraints.cpp
index fcad1669af7..9e3bd1fbc2c 100644
--- a/yt/yql/providers/yt/provider/yql_yt_datasource_constraints.cpp
+++ b/yt/yql/providers/yt/provider/yql_yt_datasource_constraints.cpp
@@ -189,42 +189,9 @@ public:
return TStatus::Ok;
}
- TStatus HandleBlockTableContent(TExprBase input, TExprContext& ctx) {
+ TStatus HandleBlockTableContent(TExprBase input, TExprContext& /*ctx*/) {
TYtBlockTableContent tableContent = input.Cast<TYtBlockTableContent>();
-
- auto listType = tableContent.Input().Maybe<TYtOutput>()
- ? tableContent.Input().Ref().GetTypeAnn()
- : tableContent.Input().Ref().GetTypeAnn()->Cast<TTupleExprType>()->GetItems().back();
- auto itemStructType = listType->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>();
-
- auto pathRename = [&](TPartOfConstraintBase::TPathType path) -> std::vector<TPartOfConstraintBase::TPathType> {
- YQL_ENSURE(!path.empty());
-
- auto fieldIndex = itemStructType->FindItem(path[0]);
- YQL_ENSURE(fieldIndex.Defined());
-
- path[0] = ctx.GetIndexAsString(*fieldIndex);
- return { path };
- };
-
- TConstraintSet wideConstraints;
- for (auto constraint : tableContent.Input().Ref().GetAllConstraints()) {
- if (auto empty = dynamic_cast<const TEmptyConstraintNode*>(constraint)) {
- wideConstraints.AddConstraint(ctx.MakeConstraint<TEmptyConstraintNode>());
- } else if (auto sorted = dynamic_cast<const TSortedConstraintNode*>(constraint)) {
- wideConstraints.AddConstraint(sorted->RenameFields(ctx, pathRename));
- } else if (auto chopped = dynamic_cast<const TChoppedConstraintNode*>(constraint)) {
- wideConstraints.AddConstraint(chopped->RenameFields(ctx, pathRename));
- } else if (auto unique = dynamic_cast<const TUniqueConstraintNode*>(constraint)) {
- wideConstraints.AddConstraint(unique->RenameFields(ctx, pathRename));
- } else if (auto distinct = dynamic_cast<const TDistinctConstraintNode*>(constraint)) {
- wideConstraints.AddConstraint(distinct->RenameFields(ctx, pathRename));
- } else {
- YQL_ENSURE(false, "unexpected constraint");
- }
- }
-
- input.Ptr()->SetConstraints(wideConstraints);
+ input.Ptr()->CopyConstraints(tableContent.Input().Ref());
return TStatus::Ok;
}
diff --git a/yt/yql/providers/yt/provider/yql_yt_datasource_type_ann.cpp b/yt/yql/providers/yt/provider/yql_yt_datasource_type_ann.cpp
index 69dec1f558d..84c811cc816 100644
--- a/yt/yql/providers/yt/provider/yql_yt_datasource_type_ann.cpp
+++ b/yt/yql/providers/yt/provider/yql_yt_datasource_type_ann.cpp
@@ -898,14 +898,26 @@ public:
auto listType = tableContent.Input().Maybe<TYtOutput>()
? tableContent.Input().Ref().GetTypeAnn()
: tableContent.Input().Ref().GetTypeAnn()->Cast<TTupleExprType>()->GetItems().back();
- auto itemStructType = listType->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>();
+ auto tableStructType = listType->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>();
+
+ TVector<const TItemExprType*> outputStructItems;
+ for (auto item : tableStructType->GetItems()) {
+ auto itemType = item->GetItemType();
+ if (itemType->IsBlockOrScalar()) {
+ ctx.AddError(TIssue(ctx.GetPosition(input.Pos()), "Input type should not be a block or scalar"));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (!EnsureSupportedAsBlockType(input.Pos(), *itemType, ctx, *State_->Types)) {
+ return IGraphTransformer::TStatus::Error;
+ }
- TTypeAnnotationNode::TListType multiTypeItems;
- for (auto& item: itemStructType->GetItems()) {
- multiTypeItems.emplace_back(ctx.MakeType<TBlockExprType>(item->GetItemType()));
+ outputStructItems.push_back(ctx.MakeType<TItemExprType>(item->GetName(), ctx.MakeType<TBlockExprType>(itemType)));
}
- multiTypeItems.push_back(ctx.MakeType<TScalarExprType>(ctx.MakeType<TDataExprType>(EDataSlot::Uint64)));
- input.Ptr()->SetTypeAnn(ctx.MakeType<TStreamExprType>(ctx.MakeType<TMultiExprType>(multiTypeItems)));
+ outputStructItems.push_back(ctx.MakeType<TItemExprType>(BlockLengthColumnName, ctx.MakeType<TScalarExprType>(ctx.MakeType<TDataExprType>(EDataSlot::Uint64))));
+
+ auto outputStructType = ctx.MakeType<TStructExprType>(outputStructItems);
+ input.Ptr()->SetTypeAnn(ctx.MakeType<TListExprType>(outputStructType));
if (auto columnOrder = State_->Types->LookupColumnOrder(tableContent.Input().Ref())) {
return State_->Types->SetColumnOrder(input.Ref(), *columnOrder, ctx);
diff --git a/yt/yql/providers/yt/provider/yql_yt_mkql_compiler.cpp b/yt/yql/providers/yt/provider/yql_yt_mkql_compiler.cpp
index 0c3a1a208ba..a6c34dc8155 100644
--- a/yt/yql/providers/yt/provider/yql_yt_mkql_compiler.cpp
+++ b/yt/yql/providers/yt/provider/yql_yt_mkql_compiler.cpp
@@ -232,23 +232,13 @@ TRuntimeNode BuildTableContentCall(TStringBuf callName,
samplingTupleItems.push_back(ctx.ProgramBuilder.NewDataLiteral(isSystemSampling));
}
- TType* outType = nullptr;
if (useBlocks) {
- auto structType = AS_TYPE(TStructType, outItemType);
-
- std::vector<TType*> outputItems;
- outputItems.reserve(structType->GetMembersCount());
- for (size_t i = 0; i < structType->GetMembersCount(); i++) {
- outputItems.push_back(ctx.ProgramBuilder.NewBlockType(structType->GetMemberType(i), TBlockType::EShape::Many));
- }
- outputItems.push_back(ctx.ProgramBuilder.NewBlockType(ctx.ProgramBuilder.NewDataType(NUdf::TDataType<ui64>::Id), TBlockType::EShape::Scalar));
- outType = ctx.ProgramBuilder.NewStreamType(ctx.ProgramBuilder.NewMultiType(outputItems));
-
- } else {
- outType = ctx.ProgramBuilder.NewListType(outItemType);
+ outItemType = ctx.ProgramBuilder.BuildBlockStructType(AS_TYPE(TStructType, outItemType));
}
- TCallableBuilder call(ctx.ProgramBuilder.GetTypeEnvironment(), callName, outType);
+ auto outListType = ctx.ProgramBuilder.NewListType(outItemType);
+
+ TCallableBuilder call(ctx.ProgramBuilder.GetTypeEnvironment(), callName, outListType);
call.Add(ctx.ProgramBuilder.NewList(listTypeGroup, groups));
call.Add(ctx.ProgramBuilder.NewTuple(samplingTupleItems));
@@ -259,10 +249,6 @@ TRuntimeNode BuildTableContentCall(TStringBuf callName,
call.Add(ctx.ProgramBuilder.NewEmptyTuple());
}
- if (useBlocks) {
- call.Add(TRuntimeNode(outItemType, true));
- }
-
auto res = TRuntimeNode(call.Build(), false);
if (settings) {
@@ -505,8 +491,8 @@ void RegisterYtMkqlCompilers(NCommon::TMkqlCallableCompilerBase& compiler) {
[](const TExprNode& node, NCommon::TMkqlBuildContext& ctx) {
TYtBlockTableContent tableContent(&node);
if (node.GetConstraint<TEmptyConstraintNode>()) {
- const auto streamType = ctx.BuildType(node, *node.GetTypeAnn());
- return ctx.ProgramBuilder.EmptyIterator(streamType);
+ const auto itemType = ctx.BuildType(node, GetSeqItemType(*node.GetTypeAnn()));
+ return ctx.ProgramBuilder.NewEmptyList(itemType);
}
auto origItemStructType = (