diff options
author | Maxim Kovalev <maxkovalev@ydb.tech> | 2024-04-22 16:59:08 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-04-22 16:59:08 +0300 |
commit | 6823b419d923d8854473893de9747e7a73c4bc1d (patch) | |
tree | 6ac508feaa4202eecfad488e62438d90ebfa7b88 | |
parent | 0fea310a336608fa290c8efee866be9e83318335 (diff) | |
download | ydb-6823b419d923d8854473893de9747e7a73c4bc1d.tar.gz |
YQL-18225: Fix sorted output when rewritting reduce in hybrid (#3971)
4 files changed, 46 insertions, 30 deletions
diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp index 7c7161852c..1828263a72 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp +++ b/ydb/library/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp @@ -254,19 +254,6 @@ private: } auto newWorld = ApplySyncListToWorld(sort.World().Ptr(), syncList, ctx); - TExprNode::TPtr direct, selector; - if (const auto& sorted = TYtOutTableInfo(sort.Output().Item(0)).RowSpec->GetForeignSort(); !sorted.empty()) { - TExprNode::TListType nodes(sorted.size()); - std::transform(sorted.cbegin(), sorted.cend(), nodes.begin(), [&](const std::pair<TString, bool>& item) { return MakeBool(sort.Pos(), item.second, ctx); }); - direct = nodes.size() > 1U ? ctx.NewList(sort.Pos(), std::move(nodes)) : std::move(nodes.front()); - nodes.resize(sorted.size()); - auto arg = ctx.NewArgument(sort.Pos(), "row"); - std::transform(sorted.cbegin(), sorted.cend(), nodes.begin(), [&](const std::pair<TString, bool>& item) { - return ctx.NewCallable(sort.Pos(), "Member", {arg, ctx.NewAtom(sort.Pos(), item.first)}); - }); - selector = ctx.NewLambda(sort.Pos(), ctx.NewArguments(sort.Pos(), {std::move(arg)}), nodes.size() > 1U ? ctx.NewList(sort.Pos(), std::move(nodes)) : std::move(nodes.front())); - } - const auto input = Build<TCoToFlow>(ctx, sort.Pos()) .Input<TYtTableContent>() .Input<TYtReadTable>() @@ -286,6 +273,7 @@ private: limit = GetLimitExpr(limitNode, ctx); } + auto [direct, selector] = GetOutputSortSettings(sort, ctx); auto work = direct && selector ? limit ? Build<TCoTopSort>(ctx, sort.Pos()) @@ -605,6 +593,23 @@ private: .Build() .Done(); + auto partitionsByKeys = Build<TCoPartitionsByKeys>(ctx, reduce.Pos()) + .Input(std::move(input)) + .KeySelectorLambda(extract) + .SortDirections(std::move(sortDirs)) + .SortKeySelectorLambda(std::move(sortKeys)) + .ListHandlerLambda(std::move(reducer)) + .Done().Ptr(); + + auto [direct, selector] = GetOutputSortSettings(reduce, ctx); + if (direct && selector) { + partitionsByKeys = Build<TCoSort>(ctx, reduce.Pos()) + .Input(std::move(partitionsByKeys)) + .SortDirections(std::move(direct)) + .KeySelectorLambda(std::move(selector)) + .Done().Ptr(); + } + return Build<TYtTryFirst>(ctx, reduce.Pos()) .template First<TYtDqProcessWrite>() .World(std::move(newWorld)) @@ -617,13 +622,7 @@ private: .template Program<TCoLambda>() .Args({}) .template Body<TDqWrite>() - .template Input<TCoPartitionsByKeys>() - .Input(std::move(input)) - .KeySelectorLambda(extract) - .SortDirections(std::move(sortDirs)) - .SortKeySelectorLambda(std::move(sortKeys)) - .ListHandlerLambda(std::move(reducer)) - .Build() + .Input(std::move(partitionsByKeys)) .Provider().Value(YtProviderName).Build() .template Settings<TCoNameValueTupleList>().Build() .Build() @@ -674,6 +673,24 @@ private: return node; } + std::pair<TExprNode::TPtr, TExprNode::TPtr> GetOutputSortSettings(const TYtOutputOpBase& op, TExprContext& ctx) const { + TExprNode::TPtr direct, selector; + if (const auto& sorted = TYtOutTableInfo(op.Output().Item(0)).RowSpec->GetForeignSort(); !sorted.empty()) { + TExprNode::TListType nodes(sorted.size()); + std::transform(sorted.cbegin(), sorted.cend(), nodes.begin(), + [&](const std::pair<TString, bool>& item) { return MakeBool(op.Pos(), item.second, ctx); }); + direct = nodes.size() > 1U ? ctx.NewList(op.Pos(), std::move(nodes)) : std::move(nodes.front()); + nodes.resize(sorted.size()); + auto arg = ctx.NewArgument(op.Pos(), "row"); + std::transform(sorted.cbegin(), sorted.cend(), nodes.begin(), [&](const std::pair<TString, bool>& item) { + return ctx.NewCallable(op.Pos(), "Member", {arg, ctx.NewAtom(op.Pos(), item.first)}); + }); + selector = ctx.NewLambda(op.Pos(), ctx.NewArguments(op.Pos(), {std::move(arg)}), + nodes.size() > 1U ? ctx.NewList(op.Pos(), std::move(nodes)) : std::move(nodes.front())); + } + return std::make_pair(direct, selector); + } + void PushSkipStat(const TStringBuf& statName, const TStringBuf& nodeName) const { PushHybridStat(statName, nodeName, "SkipReasons"); PushHybridStat("Skip", nodeName); diff --git a/ydb/library/yql/tests/sql/hybrid_file/part7/canondata/result.json b/ydb/library/yql/tests/sql/hybrid_file/part7/canondata/result.json index e27b106512..846dbb9e08 100644 --- a/ydb/library/yql/tests/sql/hybrid_file/part7/canondata/result.json +++ b/ydb/library/yql/tests/sql/hybrid_file/part7/canondata/result.json @@ -2297,16 +2297,16 @@ ], "test.test[produce-reduce_with_assume--Debug]": [ { - "checksum": "4f9163dbb74e5d73aba069e02b990cb3", - "size": 3289, - "uri": "https://{canondata_backend}/1936842/15d1b251a19a947bc78bcd914d26903ce91d665f/resource.tar.gz#test.test_produce-reduce_with_assume--Debug_/opt.yql_patched" + "checksum": "c96fe0f70d0d94fc7237f1a660b083b5", + "size": 3349, + "uri": "https://{canondata_backend}/1847551/acd36ac04bd4a5b96c5ca1824b29d5b8562dc682/resource.tar.gz#test.test_produce-reduce_with_assume--Debug_/opt.yql_patched" } ], "test.test[produce-reduce_with_assume--Plan]": [ { - "checksum": "b0096f865c23e37107180610b53aa6a7", - "size": 4423, - "uri": "https://{canondata_backend}/212715/61f0c59354c0aee96d5e21e3fd5f5993b2817ac3/resource.tar.gz#test.test_produce-reduce_with_assume--Plan_/plan.txt" + "checksum": "c111884233e68b6bed387cc377956b24", + "size": 4520, + "uri": "https://{canondata_backend}/1847551/acd36ac04bd4a5b96c5ca1824b29d5b8562dc682/resource.tar.gz#test.test_produce-reduce_with_assume--Plan_/plan.txt" } ], "test.test[sampling-bind_topsort-default.txt-Debug]": [ diff --git a/ydb/library/yql/tests/sql/sql2yql/canondata/result.json b/ydb/library/yql/tests/sql/sql2yql/canondata/result.json index 2d79e09142..f2de6a35d6 100644 --- a/ydb/library/yql/tests/sql/sql2yql/canondata/result.json +++ b/ydb/library/yql/tests/sql/sql2yql/canondata/result.json @@ -30220,9 +30220,9 @@ ], "test_sql_format.test[produce-reduce_with_assume]": [ { - "checksum": "4daf2f046eb660f2a56a1f84928ee6c4", - "size": 412, - "uri": "https://{canondata_backend}/1917492/ff234d5884581cc297a96d70a1828b0d89366fd5/resource.tar.gz#test_sql_format.test_produce-reduce_with_assume_/formatted.sql" + "checksum": "f66b71a12b26ca4bc97619a3b43cbd5e", + "size": 378, + "uri": "https://{canondata_backend}/1936273/d3ce9e6be300ac95ccb90e8a7420184de361fe19/resource.tar.gz#test_sql_format.test_produce-reduce_with_assume_/formatted.sql" } ], "test_sql_format.test[produce-reduce_with_assume_in_subquery]": [ diff --git a/ydb/library/yql/tests/sql/suites/produce/reduce_with_assume.sql b/ydb/library/yql/tests/sql/suites/produce/reduce_with_assume.sql index b05403414c..3f51be0d12 100644 --- a/ydb/library/yql/tests/sql/suites/produce/reduce_with_assume.sql +++ b/ydb/library/yql/tests/sql/suites/produce/reduce_with_assume.sql @@ -1,6 +1,5 @@ /* postgres can not */ /* multirun can not */ -/* hybridfile can not YQL-18225 */ /* syntax version 1 */ USE plato; |