diff options
author | udovichenko-r <rvu@ydb.tech> | 2023-11-02 19:24:28 +0300 |
---|---|---|
committer | udovichenko-r <rvu@ydb.tech> | 2023-11-02 19:57:18 +0300 |
commit | a528d5d25d42706fe385120b27e1df3a257823fb (patch) | |
tree | baf2feb973b022dc9aa2e01f6740b08b9ca668ef | |
parent | 27204b42f48b633cfb3eecc9f13b4331d32516b3 (diff) | |
download | ydb-a528d5d25d42706fe385120b27e1df3a257823fb.tar.gz |
[dq] YT specific optimizers for DQ
YQL-16013
13 files changed, 257 insertions, 285 deletions
diff --git a/ydb/library/yql/providers/dq/opt/logical_optimize.cpp b/ydb/library/yql/providers/dq/opt/logical_optimize.cpp index f1681187ce..5c2f0628e4 100644 --- a/ydb/library/yql/providers/dq/opt/logical_optimize.cpp +++ b/ydb/library/yql/providers/dq/opt/logical_optimize.cpp @@ -71,7 +71,8 @@ public: AddHandler(0, &TDqQuery::Match, HNDL(MergeQueriesWithSinks)); AddHandler(0, &TCoSqlIn::Match, HNDL(SqlInDropCompact)); AddHandler(0, &TDqReplicate::Match, HNDL(ReplicateFieldSubset)); - AddHandler(0, &TDqReadWrapBase::Match, HNDL(DqReadWrapByProvider)); + + AddHandler(1, &TDqReadWrapBase::Match, HNDL(DqReadWrapByProvider)); #undef HNDL } @@ -85,9 +86,10 @@ protected: return node; } - TMaybeNode<TExprBase> UnorderedOverDqReadWrap(TExprBase node, TExprContext& ctx, const TGetParents& getParents) const { + TMaybeNode<TExprBase> UnorderedOverDqReadWrap(TExprBase node, TExprContext& ctx, const TGetParents& /*getParents*/) const { const auto unordered = node.Cast<TCoUnorderedBase>(); if (const auto maybeRead = unordered.Input().Maybe<TDqReadWrapBase>().Input()) { + /* if (Config->EnableDqReplicate.Get().GetOrElse(TDqSettings::TDefault::EnableDqReplicate)) { const TParentsMap* parentsMap = getParents(); auto parentsIt = parentsMap->find(unordered.Input().Raw()); @@ -96,6 +98,7 @@ protected: return node; } } + */ auto providerRead = maybeRead.Cast(); if (auto dqOpt = GetDqOptCallback(providerRead)) { auto updatedRead = dqOpt->ApplyUnordered(providerRead.Ptr(), ctx); @@ -111,9 +114,10 @@ protected: return node; } - TMaybeNode<TExprBase> ExtractMembersOverDqReadWrap(TExprBase node, TExprContext& ctx, const TGetParents& getParents) { + TMaybeNode<TExprBase> ExtractMembersOverDqReadWrap(TExprBase node, TExprContext& ctx, const TGetParents& /*getParents*/) { auto extract = node.Cast<TCoExtractMembers>(); if (const auto maybeRead = extract.Input().Maybe<TDqReadWrap>().Input()) { + /* if (Config->EnableDqReplicate.Get().GetOrElse(TDqSettings::TDefault::EnableDqReplicate)) { const TParentsMap* parentsMap = getParents(); auto parentsIt = parentsMap->find(extract.Input().Raw()); @@ -122,7 +126,7 @@ protected: return node; } } - + */ auto providerRead = maybeRead.Cast(); if (auto dqOpt = GetDqOptCallback(providerRead)) { auto updatedRead = dqOpt->ApplyExtractMembers(providerRead.Ptr(), extract.Members().Ptr(), ctx); @@ -212,9 +216,10 @@ protected: if (outReaders.empty()) { return {}; } - if (inReaders != outReaders) { - YQL_ENSURE(outReaders.size() <= inReaders.size()); + if (inReaders == outReaders) { + return node; } + YQL_ENSURE(outReaders.size() <= inReaders.size()); size_t i = 0; for (; i < outReaders.size(); ++i) { newChildren[list[i].first] = ctx.ChangeChild(*newChildren[list[i].first], TDqReadWrapBase::idx_Input, std::move(outReaders[i])); diff --git a/ydb/library/yql/providers/yt/provider/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/providers/yt/provider/CMakeLists.darwin-x86_64.txt index dd6cc43535..bcd890489a 100644 --- a/ydb/library/yql/providers/yt/provider/CMakeLists.darwin-x86_64.txt +++ b/ydb/library/yql/providers/yt/provider/CMakeLists.darwin-x86_64.txt @@ -105,6 +105,7 @@ target_sources(providers-yt-provider PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_table_desc.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_table.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_dq_optimize.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_wide_flow.cpp ) diff --git a/ydb/library/yql/providers/yt/provider/CMakeLists.linux-aarch64.txt b/ydb/library/yql/providers/yt/provider/CMakeLists.linux-aarch64.txt index 6d095ca3b0..4fb7023356 100644 --- a/ydb/library/yql/providers/yt/provider/CMakeLists.linux-aarch64.txt +++ b/ydb/library/yql/providers/yt/provider/CMakeLists.linux-aarch64.txt @@ -106,6 +106,7 @@ target_sources(providers-yt-provider PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_table_desc.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_table.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_dq_optimize.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_wide_flow.cpp ) diff --git a/ydb/library/yql/providers/yt/provider/CMakeLists.linux-x86_64.txt b/ydb/library/yql/providers/yt/provider/CMakeLists.linux-x86_64.txt index 6d095ca3b0..4fb7023356 100644 --- a/ydb/library/yql/providers/yt/provider/CMakeLists.linux-x86_64.txt +++ b/ydb/library/yql/providers/yt/provider/CMakeLists.linux-x86_64.txt @@ -106,6 +106,7 @@ target_sources(providers-yt-provider PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_table_desc.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_table.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_dq_optimize.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_wide_flow.cpp ) diff --git a/ydb/library/yql/providers/yt/provider/CMakeLists.windows-x86_64.txt b/ydb/library/yql/providers/yt/provider/CMakeLists.windows-x86_64.txt index dd6cc43535..bcd890489a 100644 --- a/ydb/library/yql/providers/yt/provider/CMakeLists.windows-x86_64.txt +++ b/ydb/library/yql/providers/yt/provider/CMakeLists.windows-x86_64.txt @@ -105,6 +105,7 @@ target_sources(providers-yt-provider PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_table_desc.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_table.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_dq_optimize.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_wide_flow.cpp ) diff --git a/ydb/library/yql/providers/yt/provider/ya.make b/ydb/library/yql/providers/yt/provider/ya.make index c0a858a1b3..744053b65e 100644 --- a/ydb/library/yql/providers/yt/provider/ya.make +++ b/ydb/library/yql/providers/yt/provider/ya.make @@ -37,6 +37,7 @@ SRCS( yql_yt_table_desc.cpp yql_yt_table.cpp yql_yt_dq_integration.cpp + yql_yt_dq_optimize.cpp yql_yt_dq_hybrid.cpp yql_yt_wide_flow.cpp ) diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_datasource.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_datasource.cpp index f11e0f1eff..4d96812bab 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_datasource.cpp +++ b/ydb/library/yql/providers/yt/provider/yql_yt_datasource.cpp @@ -4,6 +4,7 @@ #include "yql_yt_key.h" #include "yql_yt_op_settings.h" #include "yql_yt_dq_integration.h" +#include "yql_yt_dq_optimize.h" #include <ydb/library/yql/providers/yt/expr_nodes/yql_yt_expr_nodes.h> #include <ydb/library/yql/providers/result/expr_nodes/yql_res_expr_nodes.h> @@ -87,6 +88,7 @@ public: bool collectNodes = mode == EReleaseTempDataMode::Immediate; return MakeHolder<TYtDataSourceTrackableNodeProcessor>(collectNodes); }) + , DqOptimizer_([this]() { return CreateYtDqOptimizers(State_); }) { } @@ -483,6 +485,10 @@ public: return State_->DqIntegration_.Get(); } + IDqOptimization* GetDqOptimization() override { + return DqOptimizer_.Get(); + } + private: TExprNode::TPtr InjectUdfRemapperOrView(TYtRead readNode, TExprContext& ctx, bool fromReadSchema) { const bool weakConcat = NYql::HasSetting(readNode.Arg(4).Ref(), EYtSettingType::WeakConcat); @@ -845,6 +851,7 @@ private: TLazyInitHolder<IGraphTransformer> ConstraintTransformer_; TLazyInitHolder<TExecTransformerBase> ExecTransformer_; TLazyInitHolder<TYtDataSourceTrackableNodeProcessor> TrackableNodeProcessor_; + TLazyInitHolder<IDqOptimization> DqOptimizer_; }; TIntrusivePtr<IDataProvider> CreateYtDataSource(TYtState::TPtr state) { diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_dq_optimize.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_dq_optimize.cpp new file mode 100644 index 0000000000..db75d69c2e --- /dev/null +++ b/ydb/library/yql/providers/yt/provider/yql_yt_dq_optimize.cpp @@ -0,0 +1,197 @@ +#include "yql_yt_dq_optimize.h" +#include "yql_yt_helpers.h" +#include "yql_yt_optimize.h" + +#include <ydb/library/yql/providers/yt/expr_nodes/yql_yt_expr_nodes.h> +#include <ydb/library/yql/providers/common/dq/yql_dq_optimization_impl.h> +#include <ydb/library/yql/core/yql_expr_optimize.h> +#include <ydb/library/yql/utils/log/log.h> + + +namespace NYql { + +using namespace NNodes; + +class TYtDqOptimizers: public TDqOptimizationBase { +public: + TYtDqOptimizers(TYtState::TPtr state) + : State_(state) + { + } + + TExprNode::TPtr RewriteRead(const TExprNode::TPtr& reader, TExprContext& ctx) override { + if (auto disabledOpts = State_->Configuration->DisableOptimizers.Get(); disabledOpts && disabledOpts->contains(TStringBuilder() << "YtDqOptimizers-" << __FUNCTION__)) { + return reader; + } + auto ytReader = TYtReadTable(reader); + + TYtDSource dataSource = GetDataSource(ytReader, ctx); + if (!State_->Configuration->_EnableYtPartitioning.Get(dataSource.Cluster().StringValue()).GetOrElse(false)) { + return reader; + } + + TSyncMap syncList; + auto ret = OptimizeReadWithSettings(reader, false, false, syncList, State_, ctx); + YQL_ENSURE(syncList.empty()); + if (ret && ret != reader) { + YQL_CLOG(DEBUG, ProviderYt) << __FUNCTION__; + } + return ret; + } + + TExprNode::TPtr ApplyExtractMembers(const TExprNode::TPtr& reader, const TExprNode::TPtr& members, TExprContext& ctx) override { + if (auto disabledOpts = State_->Configuration->DisableOptimizers.Get(); disabledOpts && disabledOpts->contains(TStringBuilder() << "YtDqOptimizers-" << __FUNCTION__)) { + return reader; + } + auto ytReader = TYtReadTable(reader); + + TVector<TYtSection> sections; + for (auto section: ytReader.Input()) { + sections.push_back(UpdateInputFields(section, TExprBase(members), ctx)); + } + YQL_CLOG(DEBUG, ProviderYt) << __FUNCTION__; + return Build<TYtReadTable>(ctx, ytReader.Pos()) + .InitFrom(ytReader) + .Input() + .Add(sections) + .Build() + .Done().Ptr(); + } + + TExprNode::TPtr ApplyTakeOrSkip(const TExprNode::TPtr& reader, const TExprNode::TPtr& countBase, TExprContext& ctx) override { + if (auto disabledOpts = State_->Configuration->DisableOptimizers.Get(); disabledOpts && disabledOpts->contains(TStringBuilder() << "YtDqOptimizers-" << __FUNCTION__)) { + return reader; + } + auto ytReader = TYtReadTable(reader); + auto count = TCoCountBase(countBase); + + if (ytReader.Input().Size() != 1) { + return reader; + } + + TYtDSource dataSource = GetDataSource(ytReader, ctx); + TString cluster = dataSource.Cluster().StringValue(); + + if (!State_->Configuration->_EnableYtPartitioning.Get(cluster).GetOrElse(false)) { + return reader; + } + + TSyncMap syncList; + if (!IsYtCompleteIsolatedLambda(count.Count().Ref(), syncList, cluster, true, false)) { + return reader; + } + + TYtSection section = ytReader.Input().Item(0); + if (NYql::HasSetting(section.Settings().Ref(), EYtSettingType::Sample)) { + return reader; + } + if (AnyOf(section.Paths(), [](const auto& path) { TYtPathInfo pathInfo(path); return (pathInfo.Table->Meta && pathInfo.Table->Meta->IsDynamic) || pathInfo.Ranges; })) { + return reader; + } + + YQL_CLOG(DEBUG, ProviderYt) << __FUNCTION__; + EYtSettingType settingType = count.Maybe<TCoSkip>() ? EYtSettingType::Skip : EYtSettingType::Take; + return Build<TYtReadTable>(ctx, ytReader.Pos()) + .InitFrom(ytReader) + .Input() + .Add() + .InitFrom(section) + .Settings(NYql::AddSetting(section.Settings().Ref(), settingType, count.Count().Ptr(), ctx)) + .Build() + .Build() + .Done().Ptr(); + } + + TExprNode::TPtr ApplyUnordered(const TExprNode::TPtr& reader, TExprContext& ctx) override { + if (auto disabledOpts = State_->Configuration->DisableOptimizers.Get(); disabledOpts && disabledOpts->contains(TStringBuilder() << "YtDqOptimizers-" << __FUNCTION__)) { + return reader; + } + auto ytReader = TYtReadTable(reader); + + TExprNode::TListType sections(ytReader.Input().Size()); + for (auto i = 0U; i < sections.size(); ++i) { + sections[i] = MakeUnorderedSection<true>(ytReader.Input().Item(i), ctx).Ptr(); + } + + YQL_CLOG(DEBUG, ProviderYt) << __FUNCTION__; + return Build<TYtReadTable>(ctx, ytReader.Pos()) + .InitFrom(ytReader) + .Input() + .Add(std::move(sections)) + .Build() + .Done().Ptr(); + } + + TExprNode::TListType ApplyExtend(const TExprNode::TListType& listOfReader, bool /*ordered*/, TExprContext& ctx) override { + if (auto disabledOpts = State_->Configuration->DisableOptimizers.Get(); disabledOpts && disabledOpts->contains(TStringBuilder() << "YtDqOptimizers-" << __FUNCTION__)) { + return listOfReader; + } + + // Group readres by cluster/settings + THashMap<std::pair<TStringBuf, TExprNode::TPtr>, std::vector<std::pair<size_t, TYtReadTable>>> readers; + for (size_t i = 0; i < listOfReader.size(); ++i) { + const auto child = listOfReader[i]; + auto ytRead = TYtReadTable(child); + if (ytRead.Input().Size() != 1) { + continue; + } + if (NYql::HasAnySetting(ytRead.Input().Item(0).Settings().Ref(), EYtSettingType::Take | EYtSettingType::Skip)) { + continue; + } + readers[std::make_pair(ytRead.DataSource().Cluster().Value(), ytRead.Input().Item(0).Settings().Ptr())].emplace_back(i, ytRead); + } + + if (readers.empty() || AllOf(readers, [](const auto& item) { return item.second.size() < 2; })) { + return listOfReader; + } + + YQL_CLOG(DEBUG, ProviderYt) << __FUNCTION__; + + TExprNode::TListType newListOfReader = listOfReader; + for (auto& item: readers) { + if (item.second.size() > 1) { + TSyncMap syncList; + TVector<TYtPath> paths; + for (auto& r: item.second) { + if (!r.second.World().Ref().IsWorld()) { + syncList.emplace(r.second.World().Ptr(), syncList.size()); + } + paths.insert(paths.end(), r.second.Input().Item(0).Paths().begin(), r.second.Input().Item(0).Paths().end()); + } + const auto ndx = item.second.front().first; + const auto& firstYtReader = item.second.front().second; + newListOfReader[ndx] = Build<TYtReadTable>(ctx, firstYtReader.Pos()) + .InitFrom(firstYtReader) + .World(ApplySyncListToWorld(ctx.NewWorld(firstYtReader.Pos()), syncList, ctx)) + .Input() + .Add() + .Paths() + .Add(paths) + .Build() + .Settings(firstYtReader.Input().Item(0).Settings()) + .Build() + .Build() + .Done().Ptr(); + + for (size_t i = 1; i < item.second.size(); ++i) { + newListOfReader[item.second[i].first] = nullptr; + } + } + } + + newListOfReader.erase(std::remove(newListOfReader.begin(), newListOfReader.end(), TExprNode::TPtr{}), newListOfReader.end()); + YQL_ENSURE(!newListOfReader.empty()); + + return newListOfReader; + } + +private: + TYtState::TPtr State_; +}; + +THolder<IDqOptimization> CreateYtDqOptimizers(TYtState::TPtr state) { + Y_ABORT_UNLESS(state); + return MakeHolder<TYtDqOptimizers>(state); +} + +} diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_dq_optimize.h b/ydb/library/yql/providers/yt/provider/yql_yt_dq_optimize.h new file mode 100644 index 0000000000..91bb6c7b98 --- /dev/null +++ b/ydb/library/yql/providers/yt/provider/yql_yt_dq_optimize.h @@ -0,0 +1,13 @@ +#pragma once + +#include "yql_yt_provider.h" + +#include <ydb/library/yql/dq/integration/yql_dq_optimization.h> + +#include <util/generic/ptr.h> + +namespace NYql { + +THolder<IDqOptimization> CreateYtDqOptimizers(TYtState::TPtr state); + +} diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_logical_optimize.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_logical_optimize.cpp index 13b0525e8e..dbfcd6d135 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_logical_optimize.cpp +++ b/ydb/library/yql/providers/yt/provider/yql_yt_logical_optimize.cpp @@ -6,7 +6,6 @@ #include <ydb/library/yql/providers/yt/opt/yql_yt_join.h> #include <ydb/library/yql/providers/yt/opt/yql_yt_key_selector.h> #include <ydb/library/yql/providers/yt/common/yql_configuration.h> -#include <ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.h> #include <ydb/library/yql/providers/common/transform/yql_optimize.h> #include <ydb/library/yql/providers/common/provider/yql_provider.h> #include <ydb/library/yql/providers/common/codec/yql_codec_type_flags.h> @@ -65,16 +64,13 @@ public: AddHandler(1, &TCoAggregateBase::Match, HNDL(Aggregate)); AddHandler(1, &TCoExtractMembers::Match, HNDL(ExtractMembers)); AddHandler(1, &TCoExtractMembers::Match, HNDL(ExtractMembersOverContent)); - AddHandler(1, &TCoExtractMembers::Match, HNDL(ExtractMembersOverDqWrap)); AddHandler(1, &TCoRight::Match, HNDL(PushdownReadColumns)); AddHandler(1, &TYtTransientOpBase::Match, HNDL(PushdownOpColumns)); AddHandler(1, &TCoCountBase::Match, HNDL(TakeOrSkip)); - AddHandler(1, &TCoCountBase::Match, HNDL(TakeOrSkipOverDqWrap)); AddHandler(1, &TCoEquiJoin::Match, HNDL(SelfInnerJoinWithSameKeys)); AddHandler(1, &TCoExtendBase::Match, HNDL(ExtendOverSameMap)); AddHandler(1, &TCoFlatMapBase::Match, HNDL(FlatMapOverExtend)); AddHandler(1, &TCoTake::Match, HNDL(TakeOverExtend)); - AddHandler(1, &TCoExtendBase::Match, HNDL(ExtendOverDqWrap)); AddHandler(2, &TCoEquiJoin::Match, HNDL(ConvertToCommonTypeForForcedMergeJoin)); AddHandler(2, &TCoShuffleByKeys::Match, HNDL(ShuffleByKeys)); @@ -1331,29 +1327,6 @@ protected: .Done(); } - TMaybeNode<TExprBase> ExtractMembersOverDqWrap(TExprBase node, TExprContext& ctx) { - auto extract = node.Cast<TCoExtractMembers>(); - - if (auto maybeYtRead = extract.Input().Maybe<TDqReadWrapBase>().Input().Maybe<TYtReadTable>()) { - auto ytRead = maybeYtRead.Cast(); - - TVector<TYtSection> sections; - for (auto section: ytRead.Input()) { - sections.push_back(UpdateInputFields(section, extract.Members(), ctx)); - } - auto updatedRead = Build<TYtReadTable>(ctx, ytRead.Pos()) - .InitFrom(ytRead) - .Input() - .Add(sections) - .Build() - .Done().Ptr(); - - return TExprBase(ctx.ChangeChild(extract.Input().Ref(), TDqReadWrapBase::idx_Input, std::move(updatedRead))); - } - - return node; - } - TMaybeNode<TExprBase> TakeOrSkip(TExprBase node, TExprContext& ctx) const { auto countBase = node.Cast<TCoCountBase>(); auto input = countBase.Input(); @@ -1384,59 +1357,6 @@ protected: return ctx.ChangeChild(countBase.Input().Ref(), TYtTableContent::idx_Input, ConvertContentInputToRead(input, settings, ctx).Ptr()); } - TMaybeNode<TExprBase> TakeOrSkipOverDqWrap(TExprBase node, TExprContext& ctx) { - auto countBase = node.Cast<TCoCountBase>(); - - if (auto maybeYtRead = countBase.Input().Maybe<TDqReadWrapBase>().Input().Maybe<TYtReadTable>()) { - auto ytRead = maybeYtRead.Cast(); - if (ytRead.Input().Size() != 1) { - return node; - } - - TYtDSource dataSource = GetDataSource(ytRead, ctx); - TString cluster = dataSource.Cluster().StringValue(); - - if (!State_->Configuration->_EnableYtPartitioning.Get(cluster).GetOrElse(false)) { - return node; - } - - // TODO: support via precomputes - if (!TCoIntegralCtor::Match(countBase.Count().Raw())) { - return node; - } - - TSyncMap syncList; - if (!IsYtCompleteIsolatedLambda(countBase.Count().Ref(), syncList, cluster, true, false)) { - return node; - } - - TYtSection section = ytRead.Input().Item(0); - if (NYql::HasSetting(section.Settings().Ref(), EYtSettingType::Sample)) { - return node; - } - if (AnyOf(section.Paths(), [](const auto& path) { TYtPathInfo pathInfo(path); return (pathInfo.Table->Meta && pathInfo.Table->Meta->IsDynamic) || pathInfo.Ranges; })) { - return node; - } - - EYtSettingType settingType = node.Maybe<TCoSkip>() ? EYtSettingType::Skip : EYtSettingType::Take; - - auto updatedRead = Build<TYtReadTable>(ctx, ytRead.Pos()) - .InitFrom(ytRead) - .Input() - .Add() - .InitFrom(section) - .Settings(NYql::AddSetting(section.Settings().Ref(), settingType, countBase.Count().Ptr(), ctx)) - .Build() - .Build() - .Done().Ptr(); - - return TExprBase(ctx.ChangeChild(countBase.Input().Ref(), TDqReadWrapBase::idx_Input, std::move(updatedRead))); - - } - - return node; - } - TMaybeNode<TExprBase> BypassCopy(TExprBase node, TExprContext& ctx, const TGetParents& getParents) const { auto srcOut = node.Cast<TYtOutput>(); auto maybeCopy = srcOut.Operation().Maybe<TYtCopy>(); @@ -1713,69 +1633,6 @@ protected: .Done(); } - TMaybeNode<TExprBase> ExtendOverDqWrap(TExprBase node, TExprContext& ctx) const { - auto extend = node.Cast<TCoExtendBase>(); - const TExprNode* flags = nullptr; - const TExprNode* token = nullptr; - TString cluster; - const TExprNode* world = nullptr; - bool first = true; - TVector<TYtPath> paths; - TExprNode::TListType nonDQ; - TMaybe<TExprBase> anyDQ; - for (auto child: extend) { - if (!TDqReadWrapBase::Match(child.Raw())) { - nonDQ.push_back(child.Ptr()); - continue; - } - auto dqReadWrap = child.Cast<TDqReadWrapBase>(); - if (!dqReadWrap.Input().Maybe<TYtReadTable>()) { - nonDQ.push_back(child.Ptr()); - continue; - } - auto ytRead = dqReadWrap.Input().Cast<TYtReadTable>(); - if (ytRead.Input().Size() != 1 || ytRead.Input().Item(0).Settings().Size() != 0) { - nonDQ.push_back(child.Ptr()); - continue; - } - - if (first) { - flags = dqReadWrap.Flags().Raw(); - token = dqReadWrap.Token().Raw(); - cluster = ytRead.DataSource().Cluster().StringValue(); - world = ytRead.World().Raw(); - first = false; - } else if (flags != dqReadWrap.Flags().Raw() || token != dqReadWrap.Token().Raw() || cluster != ytRead.DataSource().Cluster().Value() || world != ytRead.World().Raw()) { - nonDQ.push_back(child.Ptr()); - continue; - } - anyDQ = child; - paths.insert(paths.end(), ytRead.Input().Item(0).Paths().begin(), ytRead.Input().Item(0).Paths().end()); - } - if (!anyDQ || extend.Ref().ChildrenSize() - nonDQ.size() < 2 || (nonDQ.size() && node.Maybe<TCoOrderedExtend>())) { - return node; - } - auto dqReadWrap = *anyDQ; - auto newRead = Build<TYtReadTable>(ctx, extend.Pos()) - .InitFrom(dqReadWrap.Cast<TDqReadWrapBase>().Input().Cast<TYtReadTable>()) - .Input() - .Add() - .Paths() - .Add(paths) - .Build() - .Settings() - .Build() - .Build() - .Build() - .Done().Ptr(); - if (!nonDQ.empty()) { - nonDQ.push_back(ctx.ChangeChild(dqReadWrap.Ref(), TDqReadWrapBase::idx_Input, std::move(newRead))); - return TExprBase(ctx.ChangeChildren(extend.Ref(), std::move(nonDQ))); - } - - return TExprBase(ctx.ChangeChild(dqReadWrap.Ref(), TDqReadWrapBase::idx_Input, std::move(newRead))); - } - TMaybeNode<TExprBase> DirectRowInFlatMap(TExprBase node, TExprContext& ctx) const { if (State_->Configuration->UseSystemColumns.Get().GetOrElse(DEFAULT_USE_SYS_COLUMNS)) { return node; @@ -2610,21 +2467,6 @@ protected: .Build() .Done(); } - } else if (const auto maybeYtRead = unordered.Input().Maybe<TDqReadWrapBase>().Input().Maybe<TYtReadTable>()) { - const auto ytRead = maybeYtRead.Cast(); - TExprNode::TListType sections(ytRead.Input().Size()); - for (auto i = 0U; i < sections.size(); ++i) { - sections[i] = MakeUnorderedSection<true>(ytRead.Input().Item(i), ctx).Ptr(); - } - - auto updatedRead = Build<TYtReadTable>(ctx, ytRead.Pos()) - .InitFrom(ytRead) - .Input() - .Add(std::move(sections)) - .Build() - .Done().Ptr(); - - return TExprBase(ctx.ChangeChild(unordered.Input().Ref(), TDqReadWrapBase::idx_Input, std::move(updatedRead))); } return node; diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_physical_optimize.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_physical_optimize.cpp index 9d80f80afd..d28baa9ef5 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_physical_optimize.cpp +++ b/ydb/library/yql/providers/yt/provider/yql_yt_physical_optimize.cpp @@ -83,7 +83,6 @@ public: } else { AddHandler(0, Names({TYtMap::CallableName(), TYtMapReduce::CallableName()}), HNDL(ExtractKeyRangeLegacy)); } - AddHandler(0, &TCoExtendBase::Match, HNDL(ExtendDqReadWraps)); AddHandler(0, &TCoExtendBase::Match, HNDL(Extend)); AddHandler(0, &TCoAssumeSorted::Match, HNDL(AssumeSorted)); AddHandler(0, &TYtMapReduce::Match, HNDL(AddTrivialMapperForNativeYtTypes)); @@ -104,7 +103,6 @@ public: AddHandler(1, &TYtOutputOpBase::Match, HNDL(TableContentWithSettings)); AddHandler(1, &TYtOutputOpBase::Match, HNDL(NonOptimalTableContent)); AddHandler(1, &TCoRight::Match, HNDL(ReadWithSettings)); - AddHandler(1, &TDqReadWrapBase::Match, HNDL(DqReadWrapWithSettings)); AddHandler(1, &TYtTransientOpBase::Match, HNDL(PushDownKeyExtract)); AddHandler(1, &TYtTransientOpBase::Match, HNDL(TransientOpWithSettings)); AddHandler(1, &TYtSort::Match, HNDL(TopSort)); @@ -3456,77 +3454,6 @@ private: return KeepColumnOrder(res.Ptr(), node.Ref(), ctx, *State_->Types); } - TMaybeNode<TExprBase> ExtendDqReadWraps(TExprBase node, TExprContext& ctx) const { - auto extend = node.Cast<TCoExtendBase>(); - - // TODO: group TYtReadTable by token/settings: - // (Merge (DqReadWrap YtReadTable_group1) (DqReadWrap YtReadTable_group1) (DqReadWrap OtherReads)) - - TExprNode::TPtr dqReadWrapNode; - TExprNode::TPtr dataSource; - TVector<TExprBase> worlds; - TVector<TYtPath> paths; - TString prevToken; - TExprNode::TPtr settings; - - for (auto child: extend) { - auto maybeDqReadWrap = child.Maybe<TDqReadWrap>(); - if (maybeDqReadWrap && !maybeDqReadWrap.Cast().Flags().Size()) { - auto dqReadWrap = maybeDqReadWrap.Cast(); - auto maybeYtReadTable = dqReadWrap.Input().Maybe<TYtReadTable>(); - auto token = dqReadWrap.Token(); - TStringBuf tokenStr = token.IsValid() ? token.Name().Cast().Value() : TStringBuf(); - if (prevToken && prevToken != tokenStr) { - return node; - } - dqReadWrapNode = dqReadWrap.Ptr(); - if (maybeYtReadTable) { - auto ytReadTable = maybeYtReadTable.Cast(); - - if (ytReadTable.Input().Size() != 1) { - return node; - } - - auto section = ytReadTable.Input().Item(0); - if (settings && settings != section.Settings().Ptr()) { - return node; - } - if (dataSource && dataSource != ytReadTable.DataSource().Ptr()) { - return node; - } - dataSource = ytReadTable.DataSource().Ptr(); - settings = section.Settings().Ptr(); - paths.insert(paths.end(), section.Paths().begin(), section.Paths().end()); - worlds.push_back(ytReadTable.World()); - } else { - return node; - } - prevToken = tokenStr; - } else { - return node; - } - } - - auto newWorld = Build<TCoSync>(ctx, node.Pos()).Add(worlds).Done(); - - TYtReadTable read = Build<TYtReadTable>(ctx, node.Pos()) - .World(newWorld) - .DataSource(dataSource) - .Input() - .Add() - .Paths().Add(paths).Build() - .Settings(settings) - .Build() - .Build() - .Done(); - - return Build<TDqReadWrap>(ctx, node.Pos()) - .Input(read) - .Flags().Build() - .Token(TExprBase(dqReadWrapNode).Cast<TDqReadWrap>().Token()) - .Done(); - } - TMaybeNode<TExprBase> Extend(TExprBase node, TExprContext& ctx) const { auto extend = node.Cast<TCoExtendBase>(); @@ -5971,30 +5898,6 @@ private: return node; } - TMaybeNode<TExprBase> DqReadWrapWithSettings(TExprBase node, TExprContext& ctx) const { - auto maybeRead = node.Cast<TDqReadWrapBase>().Input().Maybe<TYtReadTable>(); - if (!maybeRead) { - return node; - } - - TYtDSource dataSource = GetDataSource(maybeRead.Cast(), ctx); - if (!State_->Configuration->_EnableYtPartitioning.Get(dataSource.Cluster().StringValue()).GetOrElse(false)) { - return node; - } - - auto read = maybeRead.Cast().Ptr(); - TSyncMap syncList; - auto ret = OptimizeReadWithSettings(read, false, false, syncList, State_, ctx); - if (ret != read) { - if (ret) { - YQL_ENSURE(syncList.empty()); - return TExprBase(ctx.ChangeChild(node.Ref(), TDqReadWrapBase::idx_Input, std::move(ret))); - } - return {}; - } - return node; - } - TMaybeNode<TExprBase> TransientOpWithSettings(TExprBase node, TExprContext& ctx) const { auto op = node.Cast<TYtTransientOpBase>(); diff --git a/ydb/library/yql/tests/sql/dq_file/part6/canondata/result.json b/ydb/library/yql/tests/sql/dq_file/part6/canondata/result.json index 4c7c031481..0d09422187 100644 --- a/ydb/library/yql/tests/sql/dq_file/part6/canondata/result.json +++ b/ydb/library/yql/tests/sql/dq_file/part6/canondata/result.json @@ -3535,23 +3535,23 @@ ], "test.test[optimizers-group_visit_lambdas--Analyze]": [ { - "checksum": "dbd558e60768523c0770d01fafdf02fd", - "size": 10096, - "uri": "https://storage.yandex-team.ru/get-devtools/1903885/92966575648953dd33617f86cb7d011f63776c02/resource.tar.gz#test.test_optimizers-group_visit_lambdas--Analyze_/plan.txt" + "checksum": "f67d1134a9741f02bf7fd74e3741e806", + "size": 8538, + "uri": "https://storage.yandex-team.ru/get-devtools/1942173/2b6d37b434944472410a121082ca65dee724c848/resource.tar.gz#test.test_optimizers-group_visit_lambdas--Analyze_/plan.txt" } ], "test.test[optimizers-group_visit_lambdas--Debug]": [ { - "checksum": "67b0359fa47918adc10cd9243eb343ea", - "size": 4137, - "uri": "https://storage.yandex-team.ru/get-devtools/1599023/4953cf4336fe0f36631097ce0dd29ae39daef2ac/resource.tar.gz#test.test_optimizers-group_visit_lambdas--Debug_/opt.yql_patched" + "checksum": "125e2c294928f783c1a47a5bc59455a9", + "size": 3885, + "uri": "https://storage.yandex-team.ru/get-devtools/1942415/7197d6f538e589afc0cd6fe1285c07fd138fb450/resource.tar.gz#test.test_optimizers-group_visit_lambdas--Debug_/opt.yql_patched" } ], "test.test[optimizers-group_visit_lambdas--Plan]": [ { - "checksum": "dbd558e60768523c0770d01fafdf02fd", - "size": 10096, - "uri": "https://storage.yandex-team.ru/get-devtools/1942671/bb537deceaffedf5a0b76b9b9b6eef9131895ce9/resource.tar.gz#test.test_optimizers-group_visit_lambdas--Plan_/plan.txt" + "checksum": "f67d1134a9741f02bf7fd74e3741e806", + "size": 8538, + "uri": "https://storage.yandex-team.ru/get-devtools/1942173/2b6d37b434944472410a121082ca65dee724c848/resource.tar.gz#test.test_optimizers-group_visit_lambdas--Plan_/plan.txt" } ], "test.test[optimizers-group_visit_lambdas--Results]": [], @@ -5508,16 +5508,16 @@ ], "test.test[window-row_number_no_part_multi_input-default.txt-Debug]": [ { - "checksum": "5fdeb5b450d9ece72edf13994cc2d0fd", - "size": 4237, - "uri": "https://storage.yandex-team.ru/get-devtools/1814674/64e60e91443ad3deb3ec9f35bbd96fa975cf27f9/resource.tar.gz#test.test_window-row_number_no_part_multi_input-default.txt-Debug_/opt.yql_patched" + "checksum": "9e7aa3db17638cc6db0cb597e79157a9", + "size": 4016, + "uri": "https://storage.yandex-team.ru/get-devtools/1942415/7197d6f538e589afc0cd6fe1285c07fd138fb450/resource.tar.gz#test.test_window-row_number_no_part_multi_input-default.txt-Debug_/opt.yql_patched" } ], "test.test[window-row_number_no_part_multi_input-default.txt-Plan]": [ { - "checksum": "eaf54514644eb358ca323c8011a792c5", - "size": 12630, - "uri": "https://storage.yandex-team.ru/get-devtools/1942671/4db54c8ba9dedccdc8391210d1657c5ca4bd34ec/resource.tar.gz#test.test_window-row_number_no_part_multi_input-default.txt-Plan_/plan.txt" + "checksum": "834e185938e25a9a228ab14b7e9cd4df", + "size": 11231, + "uri": "https://storage.yandex-team.ru/get-devtools/1942173/2b6d37b434944472410a121082ca65dee724c848/resource.tar.gz#test.test_window-row_number_no_part_multi_input-default.txt-Plan_/plan.txt" } ], "test.test[window-row_number_no_part_multi_input-default.txt-Results]": [], diff --git a/ydb/library/yql/tests/sql/dq_file/part7/canondata/result.json b/ydb/library/yql/tests/sql/dq_file/part7/canondata/result.json index 033d81ef7c..4ce6b3959f 100644 --- a/ydb/library/yql/tests/sql/dq_file/part7/canondata/result.json +++ b/ydb/library/yql/tests/sql/dq_file/part7/canondata/result.json @@ -2996,23 +2996,23 @@ "test.test[key_filter-lambda_with_null_filter--Results]": [], "test.test[key_filter-mixed_sort--Analyze]": [ { - "checksum": "82bd104b7140df61dd8704b8cbdf285b", - "size": 12814, - "uri": "https://storage.yandex-team.ru/get-devtools/1931696/0a5f01ad7bf7c863b92eab0e8aff7f87ecb60e51/resource.tar.gz#test.test_key_filter-mixed_sort--Analyze_/plan.txt" + "checksum": "a3f0ef78e59682e3c13e237f4ae1ed53", + "size": 9378, + "uri": "https://storage.yandex-team.ru/get-devtools/1773845/f96aa61ced92a0dfdc80e9b0bdf6f2a5cc2d3078/resource.tar.gz#test.test_key_filter-mixed_sort--Analyze_/plan.txt" } ], "test.test[key_filter-mixed_sort--Debug]": [ { - "checksum": "86dd8a54da71064e756cc19f63330d09", - "size": 4330, - "uri": "https://storage.yandex-team.ru/get-devtools/1931696/0a5f01ad7bf7c863b92eab0e8aff7f87ecb60e51/resource.tar.gz#test.test_key_filter-mixed_sort--Debug_/opt.yql_patched" + "checksum": "9e78d689b65f563bedaee3d0aafbb228", + "size": 3796, + "uri": "https://storage.yandex-team.ru/get-devtools/1773845/f96aa61ced92a0dfdc80e9b0bdf6f2a5cc2d3078/resource.tar.gz#test.test_key_filter-mixed_sort--Debug_/opt.yql_patched" } ], "test.test[key_filter-mixed_sort--Plan]": [ { - "checksum": "82bd104b7140df61dd8704b8cbdf285b", - "size": 12814, - "uri": "https://storage.yandex-team.ru/get-devtools/1937367/24a3ed09a524cab36402a50f39546eeec677142d/resource.tar.gz#test.test_key_filter-mixed_sort--Plan_/plan.txt" + "checksum": "a3f0ef78e59682e3c13e237f4ae1ed53", + "size": 9378, + "uri": "https://storage.yandex-team.ru/get-devtools/1773845/f96aa61ced92a0dfdc80e9b0bdf6f2a5cc2d3078/resource.tar.gz#test.test_key_filter-mixed_sort--Plan_/plan.txt" } ], "test.test[key_filter-mixed_sort--Results]": [], |