aboutsummaryrefslogtreecommitdiffstats
path: root/yt/yql
diff options
context:
space:
mode:
authorAlexander Smirnov <alex@ydb.tech>2024-12-24 23:15:05 +0000
committerAlexander Smirnov <alex@ydb.tech>2024-12-24 23:15:05 +0000
commit59c9675625adf036a007e8e7db9ef26cbe183626 (patch)
tree1ccd4c7726af8dc3582d51627a689dacb5859df1 /yt/yql
parentbd0e2de0b1035962a4d5b9e847eaa6508fad7fcf (diff)
parent75f1af270a6cf9a17b65fde6d12efbb94f235960 (diff)
downloadydb-59c9675625adf036a007e8e7db9ef26cbe183626.tar.gz
Merge branch 'rightlib' into merge-libs-241224-2313
Diffstat (limited to 'yt/yql')
-rw-r--r--yt/yql/providers/yt/codec/ya.make1
-rw-r--r--yt/yql/providers/yt/codec/yt_codec.cpp4
-rw-r--r--yt/yql/providers/yt/common/yql_names.h1
-rw-r--r--yt/yql/providers/yt/gateway/lib/yt_helpers.cpp2
-rw-r--r--yt/yql/providers/yt/lib/row_spec/yql_row_spec.cpp79
-rw-r--r--yt/yql/providers/yt/lib/row_spec/yql_row_spec.h2
-rw-r--r--yt/yql/providers/yt/provider/yql_yt_dq_integration.cpp5
-rw-r--r--yt/yql/providers/yt/provider/yql_yt_table.cpp3
-rw-r--r--yt/yql/providers/yt/provider/yql_yt_table.h2
9 files changed, 89 insertions, 10 deletions
diff --git a/yt/yql/providers/yt/codec/ya.make b/yt/yql/providers/yt/codec/ya.make
index 036561f455..0bbae8552b 100644
--- a/yt/yql/providers/yt/codec/ya.make
+++ b/yt/yql/providers/yt/codec/ya.make
@@ -18,6 +18,7 @@ PEERDIR(
yt/cpp/mapreduce/interface
yt/cpp/mapreduce/io
contrib/libs/apache/arrow
+ yql/essentials/core
yql/essentials/minikql
yql/essentials/minikql/computation
yql/essentials/public/udf
diff --git a/yt/yql/providers/yt/codec/yt_codec.cpp b/yt/yql/providers/yt/codec/yt_codec.cpp
index a4811f12b4..86381e1920 100644
--- a/yt/yql/providers/yt/codec/yt_codec.cpp
+++ b/yt/yql/providers/yt/codec/yt_codec.cpp
@@ -10,6 +10,7 @@
#include <yql/essentials/minikql/mkql_node_builder.h>
#include <yql/essentials/minikql/mkql_string_util.h>
#include <yql/essentials/utils/yql_panic.h>
+#include <yql/essentials/core/yql_type_annotation.h>
#include <library/cpp/yson/node/node_io.h>
@@ -411,8 +412,9 @@ void TMkqlIOSpecs::InitInput(NCommon::TCodecContext& codecCtx,
bool useCommonColumns = true;
THashMap<TString, ui32> structColumns;
if (columns.Defined()) {
+ TColumnOrder order(*columns);
for (size_t i = 0; i < columns->size(); ++i) {
- structColumns.insert({columns->at(i), (ui32)i});
+ structColumns.insert({order.at(i).PhysicalName, (ui32)i});
}
}
else if (itemType && InputGroups.empty()) {
diff --git a/yt/yql/providers/yt/common/yql_names.h b/yt/yql/providers/yt/common/yql_names.h
index e8ca0c8222..4d35a04c15 100644
--- a/yt/yql/providers/yt/common/yql_names.h
+++ b/yt/yql/providers/yt/common/yql_names.h
@@ -22,6 +22,7 @@ const TStringBuf RowSpecAttrUseNativeYtTypes = "UseNativeYtTypes";
const TStringBuf RowSpecAttrNativeYtTypeFlags = "NativeYtTypeFlags";
const TStringBuf RowSpecAttrExplicitYson = "ExplicitYson";
const TStringBuf RowSpecAttrConstraints = "Constraints";
+const TStringBuf RowSpecAttrColumnOrder = "ColumnOrder";
const TStringBuf YqlReadUdfAttribute = "_yql_read_udf";
const TStringBuf YqlReadUdfTypeConfigAttribute = "_yql_read_udf_type_config";
diff --git a/yt/yql/providers/yt/gateway/lib/yt_helpers.cpp b/yt/yql/providers/yt/gateway/lib/yt_helpers.cpp
index 494bce164e..6ad53ec6d9 100644
--- a/yt/yql/providers/yt/gateway/lib/yt_helpers.cpp
+++ b/yt/yql/providers/yt/gateway/lib/yt_helpers.cpp
@@ -354,7 +354,7 @@ static bool IterateRows(NYT::ITransactionPtr tx,
if (!YAMRED_DSV && exec.GetColumns()) {
if (!specsCache.GetSpecs().Inputs[tableIndex]->OthersStructIndex) {
- path.Columns(*exec.GetColumns());
+ path.Columns(TColumnOrder(*exec.GetColumns()).GetPhysicalNames());
}
}
diff --git a/yt/yql/providers/yt/lib/row_spec/yql_row_spec.cpp b/yt/yql/providers/yt/lib/row_spec/yql_row_spec.cpp
index 10ee1b54ad..e23bc48c80 100644
--- a/yt/yql/providers/yt/lib/row_spec/yql_row_spec.cpp
+++ b/yt/yql/providers/yt/lib/row_spec/yql_row_spec.cpp
@@ -161,6 +161,7 @@ bool TYqlRowSpecInfo::Parse(const NYT::TNode& rowSpecAttr, TExprContext& ctx, co
if (!ParseType(rowSpecAttr, ctx, pos) || !ParseSort(rowSpecAttr, ctx, pos)) {
return false;
}
+ ParseColumnOrder(rowSpecAttr);
ParseFlags(rowSpecAttr);
ParseDefValues(rowSpecAttr);
ParseConstraints(rowSpecAttr);
@@ -279,6 +280,7 @@ bool TYqlRowSpecInfo::ParsePatched(const NYT::TNode& rowSpecAttr, const THashMap
return false;
}
+ ParseColumnOrder(rowSpecAttr);
ParseFlags(rowSpecAttr);
ParseDefValues(rowSpecAttr);
ParseConstraints(rowSpecAttr);
@@ -290,6 +292,7 @@ bool TYqlRowSpecInfo::ParseFull(const NYT::TNode& rowSpecAttr, const THashMap<TS
if (!ParseType(rowSpecAttr, ctx, pos) || !ParseSort(rowSpecAttr, ctx, pos)) {
return false;
}
+ ParseColumnOrder(rowSpecAttr);
ParseFlags(rowSpecAttr);
ParseDefValues(rowSpecAttr);
ParseConstraints(rowSpecAttr);
@@ -400,15 +403,34 @@ bool TYqlRowSpecInfo::Parse(const THashMap<TString, TString>& attrs, TExprContex
return Validate(ctx, pos);
}
+void TYqlRowSpecInfo::ParseColumnOrder(const NYT::TNode& rowSpecAttr) {
+ if (rowSpecAttr.HasKey(RowSpecAttrColumnOrder)) {
+ TColumnOrder columns;
+ auto columnOrderAttr = rowSpecAttr[RowSpecAttrColumnOrder];
+ if (!columnOrderAttr.IsList()) {
+ YQL_LOG_CTX_THROW yexception() << "Row spec has invalid column order";
+ }
+ for (const auto& name: columnOrderAttr.AsList()) {
+ if (!name.IsString()) {
+ YQL_LOG_CTX_THROW yexception() << "Row spec has invalid column order";
+ }
+ columns.AddColumn(name.AsString());
+ }
+ Columns = columns;
+ }
+}
+
bool TYqlRowSpecInfo::ParseType(const NYT::TNode& rowSpecAttr, TExprContext& ctx, const TPositionHandle& pos) {
if (!rowSpecAttr.HasKey(RowSpecAttrType)) {
YQL_LOG_CTX_THROW yexception() << "Row spec doesn't have mandatory Type attribute";
}
TColumnOrder columns;
+
auto type = NCommon::ParseOrderAwareTypeFromYson(rowSpecAttr[RowSpecAttrType], columns, ctx, ctx.GetPosition(pos));
if (!type) {
return false;
}
+
if (type->GetKind() != ETypeAnnotationKind::Struct) {
YQL_LOG_CTX_THROW yexception() << "Row spec defines not a struct type";
}
@@ -694,6 +716,19 @@ bool TYqlRowSpecInfo::Validate(const TExprNode& node, TExprContext& ctx, const T
return false;
}
type = rawType->Cast<TStructExprType>();
+ } else if (name->Content() == RowSpecAttrColumnOrder) {
+ if (!EnsureTuple(*value, ctx)) {
+ return false;
+ }
+ TColumnOrder order;
+ for (const TExprNode::TPtr& item: value->Children()) {
+ if (!EnsureAtom(*item, ctx)) {
+ return false;
+ }
+ order.AddColumn(TString(item->Content()));
+ }
+ columnOrder = order;
+
} else if (name->Content() == RowSpecAttrSortedBy) {
if (!EnsureTuple(*value, ctx)) {
return false;
@@ -783,6 +818,18 @@ bool TYqlRowSpecInfo::Validate(const TExprNode& node, TExprContext& ctx, const T
<< " option is mandatory for " << TYqlRowSpec::CallableName()));
return false;
}
+ if (columnOrder) {
+ if (columnOrder->Size() != type->GetSize()) {
+ ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder() << "Column order size " << columnOrder->Size()
+ << " != " << type->GetSize() << " (type size)"));
+ }
+ for (auto& [_, name]: *columnOrder) {
+ if (!type->FindItem(name)) {
+ ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder() << "Column " << name.Quote()
+ << " from column order isn't present in type"));
+ }
+ }
+ }
if (sortedBy.size() != sortDirectionsCount) {
ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder() << TString{RowSpecAttrSortDirections}.Quote()
<< " should have the same size as " << TString{RowSpecAttrSortedBy}.Quote()));
@@ -911,6 +958,13 @@ void TYqlRowSpecInfo::Parse(NNodes::TExprBase node, bool withTypes) {
}
Type = node.Ref().GetTypeAnn()->Cast<TStructExprType>();
}
+ } else if (setting.Name().Value() == RowSpecAttrColumnOrder) {
+ auto& val = setting.Value().Cast().Ref();
+ TColumnOrder order;
+ for (const TExprNode::TPtr& item: val.Children()) {
+ order.AddColumn(TString(item->Content()));
+ }
+ Columns = order;
} else if (setting.Name().Value() == RowSpecAttrConstraints) {
ConstraintsNode = NYT::NodeFromYsonString(setting.Value().Cast().Ref().Content());
} else if (setting.Name().Value() == RowSpecAttrSortedBy) {
@@ -989,6 +1043,17 @@ void TYqlRowSpecInfo::SetColumnOrder(const TMaybe<TColumnOrder>& columns) {
Columns = columns;
}
+void TYqlRowSpecInfo::FillColumnOrder(NYT::TNode& attrs) const {
+ if (!Columns || !Columns->HasDuplicates()) {
+ return;
+ }
+ NYT::TNode order = NYT::TNode::CreateList();
+ for (const auto &name: Columns->GetLogicalNames()) {
+ order.Add(name);
+ }
+ attrs[RowSpecAttrColumnOrder] = order;
+}
+
TString TYqlRowSpecInfo::ToYsonString() const {
NYT::TNode attrs = NYT::TNode::CreateMap();
FillCodecNode(attrs[YqlRowSpecAttribute]);
@@ -1023,14 +1088,14 @@ void TYqlRowSpecInfo::CopyTypeOrders(const NYT::TNode& typeNode) {
if (!StrictSchema && name == YqlOthersColumnName) {
continue;
}
- auto origType = Type->FindItemType(name);
+ auto origType = Type->FindItemType(gen_name);
YQL_ENSURE(origType);
auto origTypeNode = NCommon::TypeToYsonNode(origType);
- auto it = fromMembers.find(name);
+ auto it = fromMembers.find(gen_name);
if (it == fromMembers.end() || !NCommon::EqualsYsonTypesIgnoreStructOrder(origTypeNode, it->second)) {
- members.Add(NYT::TNode::CreateList().Add(name).Add(origTypeNode));
+ members.Add(NYT::TNode::CreateList().Add(gen_name).Add(origTypeNode));
} else {
- members.Add(NYT::TNode::CreateList().Add(name).Add(it->second));
+ members.Add(NYT::TNode::CreateList().Add(gen_name).Add(it->second));
}
}
@@ -1098,6 +1163,7 @@ void TYqlRowSpecInfo::FillSort(NYT::TNode& attrs, const NCommon::TStructMemberMa
}
}
}
+
if (!curSortedBy->empty()) {
attrs[RowSpecAttrUniqueKeys] = curUniqueKeys;
}
@@ -1191,6 +1257,7 @@ void TYqlRowSpecInfo::FillCodecNode(NYT::TNode& attrs, const NCommon::TStructMem
FillDefValues(attrs, mapper);
FillFlags(attrs);
FillExplicitYson(attrs, mapper);
+ FillColumnOrder(attrs);
}
void TYqlRowSpecInfo::FillAttrNode(NYT::TNode& attrs, ui64 nativeTypeCompatibility, bool useCompactForm) const {
@@ -1230,6 +1297,7 @@ void TYqlRowSpecInfo::FillAttrNode(NYT::TNode& attrs, ui64 nativeTypeCompatibili
if (!useCompactForm || HasAuxColumns() || AnyOf(SortedBy, [&patchedFields](const auto& name) { return patchedFields.contains(name); } )) {
FillSort(attrs);
}
+ FillColumnOrder(attrs);
FillDefValues(attrs);
FillFlags(attrs);
FillConstraints(attrs);
@@ -1327,6 +1395,9 @@ NNodes::TExprBase TYqlRowSpecInfo::ToExprNode(TExprContext& ctx, const TPosition
saveColumnList(RowSpecAttrSortMembers, SortMembers);
saveColumnList(RowSpecAttrSortedBy, SortedBy);
+ if (Columns && Columns->HasDuplicates()) {
+ saveColumnList(RowSpecAttrColumnOrder, Columns->GetLogicalNames());
+ }
if (!SortedByTypes.empty()) {
auto listBuilder = Build<TExprList>(ctx, pos);
diff --git a/yt/yql/providers/yt/lib/row_spec/yql_row_spec.h b/yt/yql/providers/yt/lib/row_spec/yql_row_spec.h
index 049b67d2a6..3cbf5ba90a 100644
--- a/yt/yql/providers/yt/lib/row_spec/yql_row_spec.h
+++ b/yt/yql/providers/yt/lib/row_spec/yql_row_spec.h
@@ -50,6 +50,7 @@ struct TYqlRowSpecInfo: public TThrRefBase {
TString ToYsonString() const;
void FillCodecNode(NYT::TNode& attrs, const NCommon::TStructMemberMapper& mapper = {}) const;
void FillAttrNode(NYT::TNode& attrs, ui64 nativeTypeCompatibility, bool useCompactForm) const;
+ void FillColumnOrder(NYT::TNode& attr) const;
NNodes::TExprBase ToExprNode(TExprContext& ctx, const TPositionHandle& pos) const;
bool IsSorted() const {
@@ -129,6 +130,7 @@ private:
bool ParsePatched(const NYT::TNode& rowSpecAttr, const THashMap<TString, TString>& attrs, TExprContext& ctx, const TPositionHandle& pos);
bool ParseFull(const NYT::TNode& rowSpecAttr, const THashMap<TString, TString>& attrs, TExprContext& ctx, const TPositionHandle& pos);
bool ParseType(const NYT::TNode& rowSpecAttr, TExprContext& ctx, const TPositionHandle& pos);
+ void ParseColumnOrder(const NYT::TNode& rowSpecAttr);
bool ParseSort(const NYT::TNode& rowSpecAttr, TExprContext& ctx, const TPositionHandle& pos);
void ParseFlags(const NYT::TNode& rowSpecAttr);
void ParseConstraints(const NYT::TNode& rowSpecAttr);
diff --git a/yt/yql/providers/yt/provider/yql_yt_dq_integration.cpp b/yt/yql/providers/yt/provider/yql_yt_dq_integration.cpp
index b8cfc3c087..b8891b1208 100644
--- a/yt/yql/providers/yt/provider/yql_yt_dq_integration.cpp
+++ b/yt/yql/providers/yt/provider/yql_yt_dq_integration.cpp
@@ -825,7 +825,7 @@ public:
YQL_CLOG(INFO, ProviderYt) << "DQ annotate: adding yt.write=" << param;
}
- bool PrepareFullResultTableParams(const TExprNode& root, TExprContext& ctx, THashMap<TString, TString>& params, THashMap<TString, TString>& secureParams) override {
+ bool PrepareFullResultTableParams(const TExprNode& root, TExprContext& ctx, THashMap<TString, TString>& params, THashMap<TString, TString>& secureParams, const TMaybe<TColumnOrder>& order) override {
const auto resOrPull = TResOrPullBase(&root);
if (FromString<bool>(resOrPull.Discard().Value())) {
@@ -853,8 +853,9 @@ public:
}
const auto type = GetSequenceItemType(input->Pos(), input->GetTypeAnn(), false, ctx);
+
YQL_ENSURE(type);
- TYtOutTableInfo outTableInfo(type->Cast<TStructExprType>(), State_->Configuration->UseNativeYtTypes.Get().GetOrElse(DEFAULT_USE_NATIVE_YT_TYPES) ? NTCF_ALL : NTCF_NONE);
+ TYtOutTableInfo outTableInfo(type->Cast<TStructExprType>(), State_->Configuration->UseNativeYtTypes.Get().GetOrElse(DEFAULT_USE_NATIVE_YT_TYPES) ? NTCF_ALL : NTCF_NONE, order);
const auto res = State_->Gateway->PrepareFullResultTable(
IYtGateway::TFullResultTableOptions(State_->SessionId)
diff --git a/yt/yql/providers/yt/provider/yql_yt_table.cpp b/yt/yql/providers/yt/provider/yql_yt_table.cpp
index d793aee0a1..8be885b84c 100644
--- a/yt/yql/providers/yt/provider/yql_yt_table.cpp
+++ b/yt/yql/providers/yt/provider/yql_yt_table.cpp
@@ -900,9 +900,10 @@ bool TYtTableInfo::HasSubstAnonymousLabel(NNodes::TExprBase node) {
/////////////////////////////////////////////////////////////////////////////////////////////////////////
-TYtOutTableInfo::TYtOutTableInfo(const TStructExprType* type, ui64 nativeYtTypeFlags) {
+TYtOutTableInfo::TYtOutTableInfo(const TStructExprType* type, ui64 nativeYtTypeFlags, const TMaybe<TColumnOrder>& columnOrder) {
RowSpec = MakeIntrusive<TYqlRowSpecInfo>();
RowSpec->SetType(type, nativeYtTypeFlags);
+ RowSpec->SetColumnOrder(columnOrder);
Meta = MakeIntrusive<TYtTableMetaInfo>();
Meta->CanWrite = true;
diff --git a/yt/yql/providers/yt/provider/yql_yt_table.h b/yt/yql/providers/yt/provider/yql_yt_table.h
index 9675ce7c74..bae060b89d 100644
--- a/yt/yql/providers/yt/provider/yql_yt_table.h
+++ b/yt/yql/providers/yt/provider/yql_yt_table.h
@@ -157,7 +157,7 @@ struct TYtOutTableInfo: public TYtTableBaseInfo {
TYtOutTableInfo() {
IsTemp = true;
}
- TYtOutTableInfo(const TStructExprType* type, ui64 nativeYtTypeFlags);
+ TYtOutTableInfo(const TStructExprType* type, ui64 nativeYtTypeFlags, const TMaybe<TColumnOrder>& columnOrder = {});
TYtOutTableInfo(NNodes::TExprBase node) {
Parse(node);
IsTemp = true;