aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMaxim Kovalev <maxkovalev@ydb.tech>2024-04-22 16:59:08 +0300
committerGitHub <noreply@github.com>2024-04-22 16:59:08 +0300
commit6823b419d923d8854473893de9747e7a73c4bc1d (patch)
tree6ac508feaa4202eecfad488e62438d90ebfa7b88
parent0fea310a336608fa290c8efee866be9e83318335 (diff)
downloadydb-6823b419d923d8854473893de9747e7a73c4bc1d.tar.gz
YQL-18225: Fix sorted output when rewritting reduce in hybrid (#3971)
-rw-r--r--ydb/library/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp57
-rw-r--r--ydb/library/yql/tests/sql/hybrid_file/part7/canondata/result.json12
-rw-r--r--ydb/library/yql/tests/sql/sql2yql/canondata/result.json6
-rw-r--r--ydb/library/yql/tests/sql/suites/produce/reduce_with_assume.sql1
4 files changed, 46 insertions, 30 deletions
diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp
index 7c7161852c..1828263a72 100644
--- a/ydb/library/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp
+++ b/ydb/library/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp
@@ -254,19 +254,6 @@ private:
}
auto newWorld = ApplySyncListToWorld(sort.World().Ptr(), syncList, ctx);
- TExprNode::TPtr direct, selector;
- if (const auto& sorted = TYtOutTableInfo(sort.Output().Item(0)).RowSpec->GetForeignSort(); !sorted.empty()) {
- TExprNode::TListType nodes(sorted.size());
- std::transform(sorted.cbegin(), sorted.cend(), nodes.begin(), [&](const std::pair<TString, bool>& item) { return MakeBool(sort.Pos(), item.second, ctx); });
- direct = nodes.size() > 1U ? ctx.NewList(sort.Pos(), std::move(nodes)) : std::move(nodes.front());
- nodes.resize(sorted.size());
- auto arg = ctx.NewArgument(sort.Pos(), "row");
- std::transform(sorted.cbegin(), sorted.cend(), nodes.begin(), [&](const std::pair<TString, bool>& item) {
- return ctx.NewCallable(sort.Pos(), "Member", {arg, ctx.NewAtom(sort.Pos(), item.first)});
- });
- selector = ctx.NewLambda(sort.Pos(), ctx.NewArguments(sort.Pos(), {std::move(arg)}), nodes.size() > 1U ? ctx.NewList(sort.Pos(), std::move(nodes)) : std::move(nodes.front()));
- }
-
const auto input = Build<TCoToFlow>(ctx, sort.Pos())
.Input<TYtTableContent>()
.Input<TYtReadTable>()
@@ -286,6 +273,7 @@ private:
limit = GetLimitExpr(limitNode, ctx);
}
+ auto [direct, selector] = GetOutputSortSettings(sort, ctx);
auto work = direct && selector ?
limit ?
Build<TCoTopSort>(ctx, sort.Pos())
@@ -605,6 +593,23 @@ private:
.Build()
.Done();
+ auto partitionsByKeys = Build<TCoPartitionsByKeys>(ctx, reduce.Pos())
+ .Input(std::move(input))
+ .KeySelectorLambda(extract)
+ .SortDirections(std::move(sortDirs))
+ .SortKeySelectorLambda(std::move(sortKeys))
+ .ListHandlerLambda(std::move(reducer))
+ .Done().Ptr();
+
+ auto [direct, selector] = GetOutputSortSettings(reduce, ctx);
+ if (direct && selector) {
+ partitionsByKeys = Build<TCoSort>(ctx, reduce.Pos())
+ .Input(std::move(partitionsByKeys))
+ .SortDirections(std::move(direct))
+ .KeySelectorLambda(std::move(selector))
+ .Done().Ptr();
+ }
+
return Build<TYtTryFirst>(ctx, reduce.Pos())
.template First<TYtDqProcessWrite>()
.World(std::move(newWorld))
@@ -617,13 +622,7 @@ private:
.template Program<TCoLambda>()
.Args({})
.template Body<TDqWrite>()
- .template Input<TCoPartitionsByKeys>()
- .Input(std::move(input))
- .KeySelectorLambda(extract)
- .SortDirections(std::move(sortDirs))
- .SortKeySelectorLambda(std::move(sortKeys))
- .ListHandlerLambda(std::move(reducer))
- .Build()
+ .Input(std::move(partitionsByKeys))
.Provider().Value(YtProviderName).Build()
.template Settings<TCoNameValueTupleList>().Build()
.Build()
@@ -674,6 +673,24 @@ private:
return node;
}
+ std::pair<TExprNode::TPtr, TExprNode::TPtr> GetOutputSortSettings(const TYtOutputOpBase& op, TExprContext& ctx) const {
+ TExprNode::TPtr direct, selector;
+ if (const auto& sorted = TYtOutTableInfo(op.Output().Item(0)).RowSpec->GetForeignSort(); !sorted.empty()) {
+ TExprNode::TListType nodes(sorted.size());
+ std::transform(sorted.cbegin(), sorted.cend(), nodes.begin(),
+ [&](const std::pair<TString, bool>& item) { return MakeBool(op.Pos(), item.second, ctx); });
+ direct = nodes.size() > 1U ? ctx.NewList(op.Pos(), std::move(nodes)) : std::move(nodes.front());
+ nodes.resize(sorted.size());
+ auto arg = ctx.NewArgument(op.Pos(), "row");
+ std::transform(sorted.cbegin(), sorted.cend(), nodes.begin(), [&](const std::pair<TString, bool>& item) {
+ return ctx.NewCallable(op.Pos(), "Member", {arg, ctx.NewAtom(op.Pos(), item.first)});
+ });
+ selector = ctx.NewLambda(op.Pos(), ctx.NewArguments(op.Pos(), {std::move(arg)}),
+ nodes.size() > 1U ? ctx.NewList(op.Pos(), std::move(nodes)) : std::move(nodes.front()));
+ }
+ return std::make_pair(direct, selector);
+ }
+
void PushSkipStat(const TStringBuf& statName, const TStringBuf& nodeName) const {
PushHybridStat(statName, nodeName, "SkipReasons");
PushHybridStat("Skip", nodeName);
diff --git a/ydb/library/yql/tests/sql/hybrid_file/part7/canondata/result.json b/ydb/library/yql/tests/sql/hybrid_file/part7/canondata/result.json
index e27b106512..846dbb9e08 100644
--- a/ydb/library/yql/tests/sql/hybrid_file/part7/canondata/result.json
+++ b/ydb/library/yql/tests/sql/hybrid_file/part7/canondata/result.json
@@ -2297,16 +2297,16 @@
],
"test.test[produce-reduce_with_assume--Debug]": [
{
- "checksum": "4f9163dbb74e5d73aba069e02b990cb3",
- "size": 3289,
- "uri": "https://{canondata_backend}/1936842/15d1b251a19a947bc78bcd914d26903ce91d665f/resource.tar.gz#test.test_produce-reduce_with_assume--Debug_/opt.yql_patched"
+ "checksum": "c96fe0f70d0d94fc7237f1a660b083b5",
+ "size": 3349,
+ "uri": "https://{canondata_backend}/1847551/acd36ac04bd4a5b96c5ca1824b29d5b8562dc682/resource.tar.gz#test.test_produce-reduce_with_assume--Debug_/opt.yql_patched"
}
],
"test.test[produce-reduce_with_assume--Plan]": [
{
- "checksum": "b0096f865c23e37107180610b53aa6a7",
- "size": 4423,
- "uri": "https://{canondata_backend}/212715/61f0c59354c0aee96d5e21e3fd5f5993b2817ac3/resource.tar.gz#test.test_produce-reduce_with_assume--Plan_/plan.txt"
+ "checksum": "c111884233e68b6bed387cc377956b24",
+ "size": 4520,
+ "uri": "https://{canondata_backend}/1847551/acd36ac04bd4a5b96c5ca1824b29d5b8562dc682/resource.tar.gz#test.test_produce-reduce_with_assume--Plan_/plan.txt"
}
],
"test.test[sampling-bind_topsort-default.txt-Debug]": [
diff --git a/ydb/library/yql/tests/sql/sql2yql/canondata/result.json b/ydb/library/yql/tests/sql/sql2yql/canondata/result.json
index 2d79e09142..f2de6a35d6 100644
--- a/ydb/library/yql/tests/sql/sql2yql/canondata/result.json
+++ b/ydb/library/yql/tests/sql/sql2yql/canondata/result.json
@@ -30220,9 +30220,9 @@
],
"test_sql_format.test[produce-reduce_with_assume]": [
{
- "checksum": "4daf2f046eb660f2a56a1f84928ee6c4",
- "size": 412,
- "uri": "https://{canondata_backend}/1917492/ff234d5884581cc297a96d70a1828b0d89366fd5/resource.tar.gz#test_sql_format.test_produce-reduce_with_assume_/formatted.sql"
+ "checksum": "f66b71a12b26ca4bc97619a3b43cbd5e",
+ "size": 378,
+ "uri": "https://{canondata_backend}/1936273/d3ce9e6be300ac95ccb90e8a7420184de361fe19/resource.tar.gz#test_sql_format.test_produce-reduce_with_assume_/formatted.sql"
}
],
"test_sql_format.test[produce-reduce_with_assume_in_subquery]": [
diff --git a/ydb/library/yql/tests/sql/suites/produce/reduce_with_assume.sql b/ydb/library/yql/tests/sql/suites/produce/reduce_with_assume.sql
index b05403414c..3f51be0d12 100644
--- a/ydb/library/yql/tests/sql/suites/produce/reduce_with_assume.sql
+++ b/ydb/library/yql/tests/sql/suites/produce/reduce_with_assume.sql
@@ -1,6 +1,5 @@
/* postgres can not */
/* multirun can not */
-/* hybridfile can not YQL-18225 */
/* syntax version 1 */
USE plato;