diff options
author | udovichenko-r <udovichenko-r@yandex-team.com> | 2024-12-18 14:20:22 +0300 |
---|---|---|
committer | udovichenko-r <udovichenko-r@yandex-team.com> | 2024-12-18 14:54:02 +0300 |
commit | bd7f3c57896d359cdaadef4d5645574d1d60d541 (patch) | |
tree | f6f9f593b710810c68b635c9dd450099d4a041f0 | |
parent | 810bee0857cbc4521eb08d8e031ad426255c6cbe (diff) | |
download | ydb-bd7f3c57896d359cdaadef4d5645574d1d60d541.tar.gz |
[yt provider] Use force_transform for merge on input tables with erasure codec
YQL-14195
commit_hash:bff9b3549fe1dbc0fae4a3a552e4d037e55a85ba
12 files changed, 93 insertions, 32 deletions
diff --git a/yql/essentials/tests/sql/sql2yql/canondata/result.json b/yql/essentials/tests/sql/sql2yql/canondata/result.json index 0c46d1d209..f687bca643 100644 --- a/yql/essentials/tests/sql/sql2yql/canondata/result.json +++ b/yql/essentials/tests/sql/sql2yql/canondata/result.json @@ -8042,6 +8042,13 @@ "uri": "https://{canondata_backend}/1942173/99e88108149e222741552e7e6cddef041d6a2846/resource.tar.gz#test_sql2yql.test_insert-fail_read_view_after_modify_/sql.yql" } ], + "test_sql2yql.test[insert-from_erasure_to_none]": [ + { + "checksum": "124c99a11a96bfb2da3f97a3d7ce782f", + "size": 1296, + "uri": "https://{canondata_backend}/1937424/fccb5c0ff69cf69d6e37662f3d3d4a9967eea7ca/resource.tar.gz#test_sql2yql.test_insert-from_erasure_to_none_/sql.yql" + } + ], "test_sql2yql.test[insert-from_two_sorted_by_calc]": [ { "checksum": "5ab9900c33a9489a6b4fe7754106d398", @@ -25743,6 +25750,11 @@ "uri": "file://test_sql_format.test_insert-fail_read_view_after_modify_/formatted.sql" } ], + "test_sql_format.test[insert-from_erasure_to_none]": [ + { + "uri": "file://test_sql_format.test_insert-from_erasure_to_none_/formatted.sql" + } + ], "test_sql_format.test[insert-from_two_sorted_by_calc]": [ { "uri": "file://test_sql_format.test_insert-from_two_sorted_by_calc_/formatted.sql" diff --git a/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_insert-from_erasure_to_none_/formatted.sql b/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_insert-from_erasure_to_none_/formatted.sql new file mode 100644 index 0000000000..171b1b6107 --- /dev/null +++ b/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_insert-from_erasure_to_none_/formatted.sql @@ -0,0 +1,10 @@ +USE plato; + +PRAGMA yt.PublishedErasureCodec = 'none'; + +INSERT INTO Output +SELECT + * +FROM + Input +LIMIT 100; diff --git a/yql/essentials/tests/sql/suites/insert/erasure.txt b/yql/essentials/tests/sql/suites/insert/erasure.txt new file mode 100644 index 0000000000..65949ea745 --- /dev/null +++ b/yql/essentials/tests/sql/suites/insert/erasure.txt @@ -0,0 +1,4 @@ +{"key"="075";"subkey"="1";"value"="abc"}; +{"key"="800";"subkey"="2";"value"="ddd"}; +{"key"="020";"subkey"="3";"value"="q"}; +{"key"="150";"subkey"="4";"value"="qzz"}; diff --git a/yql/essentials/tests/sql/suites/insert/erasure.txt.attr b/yql/essentials/tests/sql/suites/insert/erasure.txt.attr new file mode 100644 index 0000000000..0214647b1c --- /dev/null +++ b/yql/essentials/tests/sql/suites/insert/erasure.txt.attr @@ -0,0 +1,10 @@ +{ + "_yql_row_spec" = { + "Type"=["StructType";[ + ["key";["DataType";"String"]]; + ["subkey";["DataType";"String"]]; + ["value";["DataType";"String"]]; + ]] + }; + "erasure_codec" = "lrc_12_2_2" +} diff --git a/yql/essentials/tests/sql/suites/insert/from_erasure_to_none.cfg b/yql/essentials/tests/sql/suites/insert/from_erasure_to_none.cfg new file mode 100644 index 0000000000..b761812b2d --- /dev/null +++ b/yql/essentials/tests/sql/suites/insert/from_erasure_to_none.cfg @@ -0,0 +1,2 @@ +in Input erasure.txt +out Output output.txt diff --git a/yql/essentials/tests/sql/suites/insert/from_erasure_to_none.sql b/yql/essentials/tests/sql/suites/insert/from_erasure_to_none.sql new file mode 100644 index 0000000000..240cac66fb --- /dev/null +++ b/yql/essentials/tests/sql/suites/insert/from_erasure_to_none.sql @@ -0,0 +1,8 @@ +USE plato; + +PRAGMA yt.PublishedErasureCodec = 'none'; + +INSERT INTO Output +SELECT * +FROM Input +LIMIT 100; diff --git a/yt/yql/providers/yt/gateway/file/yql_yt_file.cpp b/yt/yql/providers/yt/gateway/file/yql_yt_file.cpp index c742d4f06b..c5357e28bd 100644 --- a/yt/yql/providers/yt/gateway/file/yql_yt_file.cpp +++ b/yt/yql/providers/yt/gateway/file/yql_yt_file.cpp @@ -1235,6 +1235,12 @@ private: req.Table(), attrs, req.IgnoreYamrDsv(), req.IgnoreWeakSchema() ); + if (attrs.AsMap().contains("erasure_codec") && attrs["erasure_codec"].AsString() != "none") { + info.Attrs["erasure_codec"] = attrs["erasure_codec"].AsString(); + } + if (attrs.AsMap().contains("optimize_for") && attrs["optimize_for"].AsString() != "scan") { + info.Attrs["optimize_for"] = attrs["optimize_for"].AsString(); + } if (attrs.AsMap().contains("schema_mode") && attrs["schema_mode"].AsString() == "weak") { info.Attrs["schema_mode"] = attrs["schema_mode"].AsString(); } diff --git a/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp b/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp index 98347b2c8e..4152ae1e0e 100644 --- a/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp +++ b/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp @@ -3355,7 +3355,7 @@ private: TFuture<void> DoMerge(TYtMerge merge, const TExecContext<TRunOptions>::TPtr& execCtx) { YQL_ENSURE(execCtx->OutTables_.size() == 1); - bool forceTransform = NYql::HasAnySetting(merge.Settings().Ref(), EYtSettingType::ForceTransform | EYtSettingType::TransformColGroups); + bool forceTransform = NYql::HasAnySetting(merge.Settings().Ref(), EYtSettingType::ForceTransform | EYtSettingType::SoftTransform); bool combineChunks = NYql::HasSetting(merge.Settings().Ref(), EYtSettingType::CombineChunks); TMaybe<ui64> limit = GetLimit(merge.Settings().Ref()); diff --git a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_merge.cpp b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_merge.cpp index b42173cf62..e3230f6f32 100644 --- a/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_merge.cpp +++ b/yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_merge.cpp @@ -409,7 +409,7 @@ TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::MergeToCopy(TExprBase n return node; } - if (NYql::HasAnySetting(merge.Settings().Ref(), EYtSettingType::ForceTransform | EYtSettingType::TransformColGroups | EYtSettingType::CombineChunks)) { + if (NYql::HasAnySetting(merge.Settings().Ref(), EYtSettingType::ForceTransform | EYtSettingType::SoftTransform | EYtSettingType::CombineChunks)) { return node; } @@ -484,42 +484,51 @@ TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::ForceTransform(TExprBas return TExprBase(ctx.ChangeChild(merge.Ref(), TYtMerge::idx_Settings, NYql::AddSetting(merge.Settings().Ref(), EYtSettingType::ForceTransform, {}, ctx))); } + bool needTransform = false; const auto cluster = merge.DataSink().Cluster().StringValue(); - if (State_->Configuration->OptimizeFor.Get(cluster).GetOrElse(NYT::OF_LOOKUP_ATTR) == NYT::OF_LOOKUP_ATTR) { - return node; - } - TString outGroup; - if (auto setting = NYql::GetSetting(merge.Output().Item(0).Settings().Ref(), EYtSettingType::ColumnGroups)) { - outGroup = setting->Tail().Content(); + if (State_->Configuration->OptimizeFor.Get(cluster).GetOrElse(NYT::OF_LOOKUP_ATTR) != NYT::OF_LOOKUP_ATTR) { + TString outGroup; + if (auto setting = NYql::GetSetting(merge.Output().Item(0).Settings().Ref(), EYtSettingType::ColumnGroups)) { + outGroup = setting->Tail().Content(); + } + + std::vector<TString> inputColGroupSpecs; + for (const auto& path: merge.Input().Item(0).Paths()) { + inputColGroupSpecs.emplace_back(); + if (auto table = path.Table().Maybe<TYtTable>()) { + if (auto tableDesc = State_->TablesData->FindTable(cluster, TString{TYtTableInfo::GetTableLabel(table.Cast())}, TEpochInfo::Parse(table.Cast().Epoch().Ref()))) { + inputColGroupSpecs.back() = tableDesc->ColumnGroupSpec; + } + } else if (auto out = path.Table().Maybe<TYtOutput>()) { + if (auto setting = NYql::GetSetting(GetOutputOp(out.Cast()).Output().Item(FromString<ui32>(out.Cast().OutIndex().Value())).Settings().Ref(), EYtSettingType::ColumnGroups)) { + inputColGroupSpecs.back() = setting->Tail().Content(); + } + } + } + + if (!outGroup.empty() && AnyOf(inputColGroupSpecs, [&outGroup](const auto& g) { return outGroup != g; })) { + needTransform = true; + } + if (outGroup.empty() && AnyOf(inputColGroupSpecs, [](const auto& g) { return !g.empty(); })) { + needTransform = true; + } } - std::vector<TString> inputColGroupSpecs; + const auto erasureCodec = ToString(State_->Configuration->TemporaryErasureCodec.Get(cluster).GetOrElse(NYT::EErasureCodecAttr::EC_NONE_ATTR)); for (const auto& path: merge.Input().Item(0).Paths()) { - inputColGroupSpecs.emplace_back(); if (auto table = path.Table().Maybe<TYtTable>()) { - if (auto tableDesc = State_->TablesData->FindTable(cluster, TString{TYtTableInfo::GetTableLabel(table.Cast())}, TEpochInfo::Parse(table.Cast().Epoch().Ref()))) { - inputColGroupSpecs.back() = tableDesc->ColumnGroupSpec; - } - } else if (auto out = path.Table().Maybe<TYtOutput>()) { - if (auto setting = NYql::GetSetting(GetOutputOp(out.Cast()).Output().Item(FromString<ui32>(out.Cast().OutIndex().Value())).Settings().Ref(), EYtSettingType::ColumnGroups)) { - inputColGroupSpecs.back() = setting->Tail().Content(); + if (TYtTableBaseInfo::GetMeta(table.Cast())->Attrs.Value("erasure_codec", "none") != erasureCodec) { + needTransform = true; + break; } } } - bool needTransformColGroups = false; - if (!outGroup.empty() && AnyOf(inputColGroupSpecs, [&outGroup](const auto& g) { return outGroup != g; })) { - needTransformColGroups = true; - } - if (outGroup.empty() && AnyOf(inputColGroupSpecs, [](const auto& g) { return !g.empty(); })) { - needTransformColGroups = true; - } - - if (needTransformColGroups && !NYql::HasSetting(merge.Settings().Ref(), EYtSettingType::TransformColGroups)) { - return TExprBase(ctx.ChangeChild(merge.Ref(), TYtMerge::idx_Settings, NYql::AddSetting(merge.Settings().Ref(), EYtSettingType::TransformColGroups, {}, ctx))); - } else if (!needTransformColGroups && NYql::HasSetting(merge.Settings().Ref(), EYtSettingType::TransformColGroups)) { - return TExprBase(ctx.ChangeChild(merge.Ref(), TYtMerge::idx_Settings, NYql::RemoveSetting(merge.Settings().Ref(), EYtSettingType::TransformColGroups, ctx))); + if (needTransform && !NYql::HasSetting(merge.Settings().Ref(), EYtSettingType::SoftTransform)) { + return TExprBase(ctx.ChangeChild(merge.Ref(), TYtMerge::idx_Settings, NYql::AddSetting(merge.Settings().Ref(), EYtSettingType::SoftTransform, {}, ctx))); + } else if (!needTransform && NYql::HasSetting(merge.Settings().Ref(), EYtSettingType::SoftTransform)) { + return TExprBase(ctx.ChangeChild(merge.Ref(), TYtMerge::idx_Settings, NYql::RemoveSetting(merge.Settings().Ref(), EYtSettingType::SoftTransform, ctx))); } return node; } diff --git a/yt/yql/providers/yt/provider/yql_yt_datasink_type_ann.cpp b/yt/yql/providers/yt/provider/yql_yt_datasink_type_ann.cpp index b422af1bf2..56b1d6866f 100644 --- a/yt/yql/providers/yt/provider/yql_yt_datasink_type_ann.cpp +++ b/yt/yql/providers/yt/provider/yql_yt_datasink_type_ann.cpp @@ -1013,7 +1013,7 @@ private: auto merge = TYtMerge(input); - if (!ValidateSettings(merge.Settings().Ref(), EYtSettingType::ForceTransform | EYtSettingType::TransformColGroups | EYtSettingType::CombineChunks | EYtSettingType::Limit | EYtSettingType::KeepSorted | EYtSettingType::NoDq, ctx)) { + if (!ValidateSettings(merge.Settings().Ref(), EYtSettingType::ForceTransform | EYtSettingType::SoftTransform | EYtSettingType::CombineChunks | EYtSettingType::Limit | EYtSettingType::KeepSorted | EYtSettingType::NoDq, ctx)) { return TStatus::Error; } diff --git a/yt/yql/providers/yt/provider/yql_yt_op_settings.cpp b/yt/yql/providers/yt/provider/yql_yt_op_settings.cpp index 0f4b017e7c..d54ec13077 100644 --- a/yt/yql/providers/yt/provider/yql_yt_op_settings.cpp +++ b/yt/yql/providers/yt/provider/yql_yt_op_settings.cpp @@ -423,7 +423,7 @@ bool ValidateSettings(const TExprNode& settingsNode, EYtSettingTypes accepted, T case EYtSettingType::IgnoreNonExisting: case EYtSettingType::WarnNonExisting: case EYtSettingType::ForceTransform: - case EYtSettingType::TransformColGroups: + case EYtSettingType::SoftTransform: case EYtSettingType::CombineChunks: case EYtSettingType::WithQB: case EYtSettingType::Inline: diff --git a/yt/yql/providers/yt/provider/yql_yt_op_settings.h b/yt/yql/providers/yt/provider/yql_yt_op_settings.h index 2627bbb7f7..f249a51ddf 100644 --- a/yt/yql/providers/yt/provider/yql_yt_op_settings.h +++ b/yt/yql/providers/yt/provider/yql_yt_op_settings.h @@ -87,7 +87,7 @@ enum class EYtSettingType: ui64 { ReduceBy /* "reduceBy" */, // hybrid supported ReduceFilterBy /* "reduceFilterBy" */, ForceTransform /* "forceTransform" */, // hybrid supported - TransformColGroups /* "transformColGroups" */, // hybrid supported + SoftTransform /* "softTransform" */, // hybrid supported WeakFields /* "weakFields" */, Sharded /* "sharded" */, CombineChunks /* "combineChunks" */, @@ -171,7 +171,7 @@ const auto DqReadSupportedSettings = EYtSettingType::SysColumns | EYtSettingType const auto DqOpSupportedSettings = EYtSettingType::Ordered | EYtSettingType::Limit | EYtSettingType::SortLimitBy | EYtSettingType::SortBy | EYtSettingType::ReduceBy | EYtSettingType::ForceTransform | EYtSettingType::JobCount | EYtSettingType::JoinReduce | EYtSettingType::FirstAsPrimary | EYtSettingType::Flow | EYtSettingType::BlockInputReady | EYtSettingType::BlockInputApplied | EYtSettingType::BlockOutputReady | EYtSettingType::BlockOutputApplied | - EYtSettingType::KeepSorted | EYtSettingType::KeySwitch | EYtSettingType::ReduceInputType | EYtSettingType::MapOutputType | EYtSettingType::Sharded | EYtSettingType::TransformColGroups; + EYtSettingType::KeepSorted | EYtSettingType::KeySwitch | EYtSettingType::ReduceInputType | EYtSettingType::MapOutputType | EYtSettingType::Sharded | EYtSettingType::SoftTransform; /////////////////////////////////////////////////////////////////////////////////////////////// |