diff options
author | Alexander Smirnov <alex@ydb.tech> | 2024-12-24 23:15:05 +0000 |
---|---|---|
committer | Alexander Smirnov <alex@ydb.tech> | 2024-12-24 23:15:05 +0000 |
commit | 59c9675625adf036a007e8e7db9ef26cbe183626 (patch) | |
tree | 1ccd4c7726af8dc3582d51627a689dacb5859df1 /yt/yql | |
parent | bd0e2de0b1035962a4d5b9e847eaa6508fad7fcf (diff) | |
parent | 75f1af270a6cf9a17b65fde6d12efbb94f235960 (diff) | |
download | ydb-59c9675625adf036a007e8e7db9ef26cbe183626.tar.gz |
Merge branch 'rightlib' into merge-libs-241224-2313
Diffstat (limited to 'yt/yql')
-rw-r--r-- | yt/yql/providers/yt/codec/ya.make | 1 | ||||
-rw-r--r-- | yt/yql/providers/yt/codec/yt_codec.cpp | 4 | ||||
-rw-r--r-- | yt/yql/providers/yt/common/yql_names.h | 1 | ||||
-rw-r--r-- | yt/yql/providers/yt/gateway/lib/yt_helpers.cpp | 2 | ||||
-rw-r--r-- | yt/yql/providers/yt/lib/row_spec/yql_row_spec.cpp | 79 | ||||
-rw-r--r-- | yt/yql/providers/yt/lib/row_spec/yql_row_spec.h | 2 | ||||
-rw-r--r-- | yt/yql/providers/yt/provider/yql_yt_dq_integration.cpp | 5 | ||||
-rw-r--r-- | yt/yql/providers/yt/provider/yql_yt_table.cpp | 3 | ||||
-rw-r--r-- | yt/yql/providers/yt/provider/yql_yt_table.h | 2 |
9 files changed, 89 insertions, 10 deletions
diff --git a/yt/yql/providers/yt/codec/ya.make b/yt/yql/providers/yt/codec/ya.make index 036561f455..0bbae8552b 100644 --- a/yt/yql/providers/yt/codec/ya.make +++ b/yt/yql/providers/yt/codec/ya.make @@ -18,6 +18,7 @@ PEERDIR( yt/cpp/mapreduce/interface yt/cpp/mapreduce/io contrib/libs/apache/arrow + yql/essentials/core yql/essentials/minikql yql/essentials/minikql/computation yql/essentials/public/udf diff --git a/yt/yql/providers/yt/codec/yt_codec.cpp b/yt/yql/providers/yt/codec/yt_codec.cpp index a4811f12b4..86381e1920 100644 --- a/yt/yql/providers/yt/codec/yt_codec.cpp +++ b/yt/yql/providers/yt/codec/yt_codec.cpp @@ -10,6 +10,7 @@ #include <yql/essentials/minikql/mkql_node_builder.h> #include <yql/essentials/minikql/mkql_string_util.h> #include <yql/essentials/utils/yql_panic.h> +#include <yql/essentials/core/yql_type_annotation.h> #include <library/cpp/yson/node/node_io.h> @@ -411,8 +412,9 @@ void TMkqlIOSpecs::InitInput(NCommon::TCodecContext& codecCtx, bool useCommonColumns = true; THashMap<TString, ui32> structColumns; if (columns.Defined()) { + TColumnOrder order(*columns); for (size_t i = 0; i < columns->size(); ++i) { - structColumns.insert({columns->at(i), (ui32)i}); + structColumns.insert({order.at(i).PhysicalName, (ui32)i}); } } else if (itemType && InputGroups.empty()) { diff --git a/yt/yql/providers/yt/common/yql_names.h b/yt/yql/providers/yt/common/yql_names.h index e8ca0c8222..4d35a04c15 100644 --- a/yt/yql/providers/yt/common/yql_names.h +++ b/yt/yql/providers/yt/common/yql_names.h @@ -22,6 +22,7 @@ const TStringBuf RowSpecAttrUseNativeYtTypes = "UseNativeYtTypes"; const TStringBuf RowSpecAttrNativeYtTypeFlags = "NativeYtTypeFlags"; const TStringBuf RowSpecAttrExplicitYson = "ExplicitYson"; const TStringBuf RowSpecAttrConstraints = "Constraints"; +const TStringBuf RowSpecAttrColumnOrder = "ColumnOrder"; const TStringBuf YqlReadUdfAttribute = "_yql_read_udf"; const TStringBuf YqlReadUdfTypeConfigAttribute = "_yql_read_udf_type_config"; diff --git a/yt/yql/providers/yt/gateway/lib/yt_helpers.cpp b/yt/yql/providers/yt/gateway/lib/yt_helpers.cpp index 494bce164e..6ad53ec6d9 100644 --- a/yt/yql/providers/yt/gateway/lib/yt_helpers.cpp +++ b/yt/yql/providers/yt/gateway/lib/yt_helpers.cpp @@ -354,7 +354,7 @@ static bool IterateRows(NYT::ITransactionPtr tx, if (!YAMRED_DSV && exec.GetColumns()) { if (!specsCache.GetSpecs().Inputs[tableIndex]->OthersStructIndex) { - path.Columns(*exec.GetColumns()); + path.Columns(TColumnOrder(*exec.GetColumns()).GetPhysicalNames()); } } diff --git a/yt/yql/providers/yt/lib/row_spec/yql_row_spec.cpp b/yt/yql/providers/yt/lib/row_spec/yql_row_spec.cpp index 10ee1b54ad..e23bc48c80 100644 --- a/yt/yql/providers/yt/lib/row_spec/yql_row_spec.cpp +++ b/yt/yql/providers/yt/lib/row_spec/yql_row_spec.cpp @@ -161,6 +161,7 @@ bool TYqlRowSpecInfo::Parse(const NYT::TNode& rowSpecAttr, TExprContext& ctx, co if (!ParseType(rowSpecAttr, ctx, pos) || !ParseSort(rowSpecAttr, ctx, pos)) { return false; } + ParseColumnOrder(rowSpecAttr); ParseFlags(rowSpecAttr); ParseDefValues(rowSpecAttr); ParseConstraints(rowSpecAttr); @@ -279,6 +280,7 @@ bool TYqlRowSpecInfo::ParsePatched(const NYT::TNode& rowSpecAttr, const THashMap return false; } + ParseColumnOrder(rowSpecAttr); ParseFlags(rowSpecAttr); ParseDefValues(rowSpecAttr); ParseConstraints(rowSpecAttr); @@ -290,6 +292,7 @@ bool TYqlRowSpecInfo::ParseFull(const NYT::TNode& rowSpecAttr, const THashMap<TS if (!ParseType(rowSpecAttr, ctx, pos) || !ParseSort(rowSpecAttr, ctx, pos)) { return false; } + ParseColumnOrder(rowSpecAttr); ParseFlags(rowSpecAttr); ParseDefValues(rowSpecAttr); ParseConstraints(rowSpecAttr); @@ -400,15 +403,34 @@ bool TYqlRowSpecInfo::Parse(const THashMap<TString, TString>& attrs, TExprContex return Validate(ctx, pos); } +void TYqlRowSpecInfo::ParseColumnOrder(const NYT::TNode& rowSpecAttr) { + if (rowSpecAttr.HasKey(RowSpecAttrColumnOrder)) { + TColumnOrder columns; + auto columnOrderAttr = rowSpecAttr[RowSpecAttrColumnOrder]; + if (!columnOrderAttr.IsList()) { + YQL_LOG_CTX_THROW yexception() << "Row spec has invalid column order"; + } + for (const auto& name: columnOrderAttr.AsList()) { + if (!name.IsString()) { + YQL_LOG_CTX_THROW yexception() << "Row spec has invalid column order"; + } + columns.AddColumn(name.AsString()); + } + Columns = columns; + } +} + bool TYqlRowSpecInfo::ParseType(const NYT::TNode& rowSpecAttr, TExprContext& ctx, const TPositionHandle& pos) { if (!rowSpecAttr.HasKey(RowSpecAttrType)) { YQL_LOG_CTX_THROW yexception() << "Row spec doesn't have mandatory Type attribute"; } TColumnOrder columns; + auto type = NCommon::ParseOrderAwareTypeFromYson(rowSpecAttr[RowSpecAttrType], columns, ctx, ctx.GetPosition(pos)); if (!type) { return false; } + if (type->GetKind() != ETypeAnnotationKind::Struct) { YQL_LOG_CTX_THROW yexception() << "Row spec defines not a struct type"; } @@ -694,6 +716,19 @@ bool TYqlRowSpecInfo::Validate(const TExprNode& node, TExprContext& ctx, const T return false; } type = rawType->Cast<TStructExprType>(); + } else if (name->Content() == RowSpecAttrColumnOrder) { + if (!EnsureTuple(*value, ctx)) { + return false; + } + TColumnOrder order; + for (const TExprNode::TPtr& item: value->Children()) { + if (!EnsureAtom(*item, ctx)) { + return false; + } + order.AddColumn(TString(item->Content())); + } + columnOrder = order; + } else if (name->Content() == RowSpecAttrSortedBy) { if (!EnsureTuple(*value, ctx)) { return false; @@ -783,6 +818,18 @@ bool TYqlRowSpecInfo::Validate(const TExprNode& node, TExprContext& ctx, const T << " option is mandatory for " << TYqlRowSpec::CallableName())); return false; } + if (columnOrder) { + if (columnOrder->Size() != type->GetSize()) { + ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder() << "Column order size " << columnOrder->Size() + << " != " << type->GetSize() << " (type size)")); + } + for (auto& [_, name]: *columnOrder) { + if (!type->FindItem(name)) { + ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder() << "Column " << name.Quote() + << " from column order isn't present in type")); + } + } + } if (sortedBy.size() != sortDirectionsCount) { ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder() << TString{RowSpecAttrSortDirections}.Quote() << " should have the same size as " << TString{RowSpecAttrSortedBy}.Quote())); @@ -911,6 +958,13 @@ void TYqlRowSpecInfo::Parse(NNodes::TExprBase node, bool withTypes) { } Type = node.Ref().GetTypeAnn()->Cast<TStructExprType>(); } + } else if (setting.Name().Value() == RowSpecAttrColumnOrder) { + auto& val = setting.Value().Cast().Ref(); + TColumnOrder order; + for (const TExprNode::TPtr& item: val.Children()) { + order.AddColumn(TString(item->Content())); + } + Columns = order; } else if (setting.Name().Value() == RowSpecAttrConstraints) { ConstraintsNode = NYT::NodeFromYsonString(setting.Value().Cast().Ref().Content()); } else if (setting.Name().Value() == RowSpecAttrSortedBy) { @@ -989,6 +1043,17 @@ void TYqlRowSpecInfo::SetColumnOrder(const TMaybe<TColumnOrder>& columns) { Columns = columns; } +void TYqlRowSpecInfo::FillColumnOrder(NYT::TNode& attrs) const { + if (!Columns || !Columns->HasDuplicates()) { + return; + } + NYT::TNode order = NYT::TNode::CreateList(); + for (const auto &name: Columns->GetLogicalNames()) { + order.Add(name); + } + attrs[RowSpecAttrColumnOrder] = order; +} + TString TYqlRowSpecInfo::ToYsonString() const { NYT::TNode attrs = NYT::TNode::CreateMap(); FillCodecNode(attrs[YqlRowSpecAttribute]); @@ -1023,14 +1088,14 @@ void TYqlRowSpecInfo::CopyTypeOrders(const NYT::TNode& typeNode) { if (!StrictSchema && name == YqlOthersColumnName) { continue; } - auto origType = Type->FindItemType(name); + auto origType = Type->FindItemType(gen_name); YQL_ENSURE(origType); auto origTypeNode = NCommon::TypeToYsonNode(origType); - auto it = fromMembers.find(name); + auto it = fromMembers.find(gen_name); if (it == fromMembers.end() || !NCommon::EqualsYsonTypesIgnoreStructOrder(origTypeNode, it->second)) { - members.Add(NYT::TNode::CreateList().Add(name).Add(origTypeNode)); + members.Add(NYT::TNode::CreateList().Add(gen_name).Add(origTypeNode)); } else { - members.Add(NYT::TNode::CreateList().Add(name).Add(it->second)); + members.Add(NYT::TNode::CreateList().Add(gen_name).Add(it->second)); } } @@ -1098,6 +1163,7 @@ void TYqlRowSpecInfo::FillSort(NYT::TNode& attrs, const NCommon::TStructMemberMa } } } + if (!curSortedBy->empty()) { attrs[RowSpecAttrUniqueKeys] = curUniqueKeys; } @@ -1191,6 +1257,7 @@ void TYqlRowSpecInfo::FillCodecNode(NYT::TNode& attrs, const NCommon::TStructMem FillDefValues(attrs, mapper); FillFlags(attrs); FillExplicitYson(attrs, mapper); + FillColumnOrder(attrs); } void TYqlRowSpecInfo::FillAttrNode(NYT::TNode& attrs, ui64 nativeTypeCompatibility, bool useCompactForm) const { @@ -1230,6 +1297,7 @@ void TYqlRowSpecInfo::FillAttrNode(NYT::TNode& attrs, ui64 nativeTypeCompatibili if (!useCompactForm || HasAuxColumns() || AnyOf(SortedBy, [&patchedFields](const auto& name) { return patchedFields.contains(name); } )) { FillSort(attrs); } + FillColumnOrder(attrs); FillDefValues(attrs); FillFlags(attrs); FillConstraints(attrs); @@ -1327,6 +1395,9 @@ NNodes::TExprBase TYqlRowSpecInfo::ToExprNode(TExprContext& ctx, const TPosition saveColumnList(RowSpecAttrSortMembers, SortMembers); saveColumnList(RowSpecAttrSortedBy, SortedBy); + if (Columns && Columns->HasDuplicates()) { + saveColumnList(RowSpecAttrColumnOrder, Columns->GetLogicalNames()); + } if (!SortedByTypes.empty()) { auto listBuilder = Build<TExprList>(ctx, pos); diff --git a/yt/yql/providers/yt/lib/row_spec/yql_row_spec.h b/yt/yql/providers/yt/lib/row_spec/yql_row_spec.h index 049b67d2a6..3cbf5ba90a 100644 --- a/yt/yql/providers/yt/lib/row_spec/yql_row_spec.h +++ b/yt/yql/providers/yt/lib/row_spec/yql_row_spec.h @@ -50,6 +50,7 @@ struct TYqlRowSpecInfo: public TThrRefBase { TString ToYsonString() const; void FillCodecNode(NYT::TNode& attrs, const NCommon::TStructMemberMapper& mapper = {}) const; void FillAttrNode(NYT::TNode& attrs, ui64 nativeTypeCompatibility, bool useCompactForm) const; + void FillColumnOrder(NYT::TNode& attr) const; NNodes::TExprBase ToExprNode(TExprContext& ctx, const TPositionHandle& pos) const; bool IsSorted() const { @@ -129,6 +130,7 @@ private: bool ParsePatched(const NYT::TNode& rowSpecAttr, const THashMap<TString, TString>& attrs, TExprContext& ctx, const TPositionHandle& pos); bool ParseFull(const NYT::TNode& rowSpecAttr, const THashMap<TString, TString>& attrs, TExprContext& ctx, const TPositionHandle& pos); bool ParseType(const NYT::TNode& rowSpecAttr, TExprContext& ctx, const TPositionHandle& pos); + void ParseColumnOrder(const NYT::TNode& rowSpecAttr); bool ParseSort(const NYT::TNode& rowSpecAttr, TExprContext& ctx, const TPositionHandle& pos); void ParseFlags(const NYT::TNode& rowSpecAttr); void ParseConstraints(const NYT::TNode& rowSpecAttr); diff --git a/yt/yql/providers/yt/provider/yql_yt_dq_integration.cpp b/yt/yql/providers/yt/provider/yql_yt_dq_integration.cpp index b8cfc3c087..b8891b1208 100644 --- a/yt/yql/providers/yt/provider/yql_yt_dq_integration.cpp +++ b/yt/yql/providers/yt/provider/yql_yt_dq_integration.cpp @@ -825,7 +825,7 @@ public: YQL_CLOG(INFO, ProviderYt) << "DQ annotate: adding yt.write=" << param; } - bool PrepareFullResultTableParams(const TExprNode& root, TExprContext& ctx, THashMap<TString, TString>& params, THashMap<TString, TString>& secureParams) override { + bool PrepareFullResultTableParams(const TExprNode& root, TExprContext& ctx, THashMap<TString, TString>& params, THashMap<TString, TString>& secureParams, const TMaybe<TColumnOrder>& order) override { const auto resOrPull = TResOrPullBase(&root); if (FromString<bool>(resOrPull.Discard().Value())) { @@ -853,8 +853,9 @@ public: } const auto type = GetSequenceItemType(input->Pos(), input->GetTypeAnn(), false, ctx); + YQL_ENSURE(type); - TYtOutTableInfo outTableInfo(type->Cast<TStructExprType>(), State_->Configuration->UseNativeYtTypes.Get().GetOrElse(DEFAULT_USE_NATIVE_YT_TYPES) ? NTCF_ALL : NTCF_NONE); + TYtOutTableInfo outTableInfo(type->Cast<TStructExprType>(), State_->Configuration->UseNativeYtTypes.Get().GetOrElse(DEFAULT_USE_NATIVE_YT_TYPES) ? NTCF_ALL : NTCF_NONE, order); const auto res = State_->Gateway->PrepareFullResultTable( IYtGateway::TFullResultTableOptions(State_->SessionId) diff --git a/yt/yql/providers/yt/provider/yql_yt_table.cpp b/yt/yql/providers/yt/provider/yql_yt_table.cpp index d793aee0a1..8be885b84c 100644 --- a/yt/yql/providers/yt/provider/yql_yt_table.cpp +++ b/yt/yql/providers/yt/provider/yql_yt_table.cpp @@ -900,9 +900,10 @@ bool TYtTableInfo::HasSubstAnonymousLabel(NNodes::TExprBase node) { ///////////////////////////////////////////////////////////////////////////////////////////////////////// -TYtOutTableInfo::TYtOutTableInfo(const TStructExprType* type, ui64 nativeYtTypeFlags) { +TYtOutTableInfo::TYtOutTableInfo(const TStructExprType* type, ui64 nativeYtTypeFlags, const TMaybe<TColumnOrder>& columnOrder) { RowSpec = MakeIntrusive<TYqlRowSpecInfo>(); RowSpec->SetType(type, nativeYtTypeFlags); + RowSpec->SetColumnOrder(columnOrder); Meta = MakeIntrusive<TYtTableMetaInfo>(); Meta->CanWrite = true; diff --git a/yt/yql/providers/yt/provider/yql_yt_table.h b/yt/yql/providers/yt/provider/yql_yt_table.h index 9675ce7c74..bae060b89d 100644 --- a/yt/yql/providers/yt/provider/yql_yt_table.h +++ b/yt/yql/providers/yt/provider/yql_yt_table.h @@ -157,7 +157,7 @@ struct TYtOutTableInfo: public TYtTableBaseInfo { TYtOutTableInfo() { IsTemp = true; } - TYtOutTableInfo(const TStructExprType* type, ui64 nativeYtTypeFlags); + TYtOutTableInfo(const TStructExprType* type, ui64 nativeYtTypeFlags, const TMaybe<TColumnOrder>& columnOrder = {}); TYtOutTableInfo(NNodes::TExprBase node) { Parse(node); IsTemp = true; |