diff options
author | udovichenko-r <rvu@ydb.tech> | 2023-10-31 13:28:19 +0300 |
---|---|---|
committer | udovichenko-r <rvu@ydb.tech> | 2023-10-31 14:02:11 +0300 |
commit | 08b33bb3ed7001c64c5a1fc18371b9065f76e8b3 (patch) | |
tree | 0b8451c504eda15c19697a0baecf4542af3e1133 | |
parent | 0c72d00bdec4f92b4fdf6c170b43d9e22ae57e90 (diff) | |
download | ydb-08b33bb3ed7001c64c5a1fc18371b9065f76e8b3.tar.gz |
[dq] Interface for provider specific optimizers
YQL-16013
23 files changed, 320 insertions, 3 deletions
diff --git a/ydb/library/yql/core/yql_data_provider.h b/ydb/library/yql/core/yql_data_provider.h index 2fc835a47a..95d7aee773 100644 --- a/ydb/library/yql/core/yql_data_provider.h +++ b/ydb/library/yql/core/yql_data_provider.h @@ -75,6 +75,7 @@ public: }; class IDqIntegration; +class IDqOptimization; class IOptimizationContext; @@ -174,6 +175,7 @@ public: // DQ virtual IDqIntegration* GetDqIntegration() = 0; + virtual IDqOptimization* GetDqOptimization() = 0; }; struct IPipelineConfigurator; diff --git a/ydb/library/yql/dq/integration/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/dq/integration/CMakeLists.darwin-x86_64.txt index 2213c8b415..6ff4d7364c 100644 --- a/ydb/library/yql/dq/integration/CMakeLists.darwin-x86_64.txt +++ b/ydb/library/yql/dq/integration/CMakeLists.darwin-x86_64.txt @@ -23,4 +23,5 @@ target_link_libraries(yql-dq-integration PUBLIC ) target_sources(yql-dq-integration PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/integration/yql_dq_integration.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/integration/yql_dq_optimization.cpp ) diff --git a/ydb/library/yql/dq/integration/CMakeLists.linux-aarch64.txt b/ydb/library/yql/dq/integration/CMakeLists.linux-aarch64.txt index f01517462e..1d4e492554 100644 --- a/ydb/library/yql/dq/integration/CMakeLists.linux-aarch64.txt +++ b/ydb/library/yql/dq/integration/CMakeLists.linux-aarch64.txt @@ -24,4 +24,5 @@ target_link_libraries(yql-dq-integration PUBLIC ) target_sources(yql-dq-integration PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/integration/yql_dq_integration.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/integration/yql_dq_optimization.cpp ) diff --git a/ydb/library/yql/dq/integration/CMakeLists.linux-x86_64.txt b/ydb/library/yql/dq/integration/CMakeLists.linux-x86_64.txt index f01517462e..1d4e492554 100644 --- a/ydb/library/yql/dq/integration/CMakeLists.linux-x86_64.txt +++ b/ydb/library/yql/dq/integration/CMakeLists.linux-x86_64.txt @@ -24,4 +24,5 @@ target_link_libraries(yql-dq-integration PUBLIC ) target_sources(yql-dq-integration PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/integration/yql_dq_integration.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/integration/yql_dq_optimization.cpp ) diff --git a/ydb/library/yql/dq/integration/CMakeLists.windows-x86_64.txt b/ydb/library/yql/dq/integration/CMakeLists.windows-x86_64.txt index 2213c8b415..6ff4d7364c 100644 --- a/ydb/library/yql/dq/integration/CMakeLists.windows-x86_64.txt +++ b/ydb/library/yql/dq/integration/CMakeLists.windows-x86_64.txt @@ -23,4 +23,5 @@ target_link_libraries(yql-dq-integration PUBLIC ) target_sources(yql-dq-integration PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/integration/yql_dq_integration.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/integration/yql_dq_optimization.cpp ) diff --git a/ydb/library/yql/dq/integration/ya.make b/ydb/library/yql/dq/integration/ya.make index f57695f542..955405344a 100644 --- a/ydb/library/yql/dq/integration/ya.make +++ b/ydb/library/yql/dq/integration/ya.make @@ -2,6 +2,7 @@ LIBRARY() SRCS( yql_dq_integration.cpp + yql_dq_optimization.cpp ) PEERDIR( diff --git a/ydb/library/yql/dq/integration/yql_dq_optimization.cpp b/ydb/library/yql/dq/integration/yql_dq_optimization.cpp new file mode 100644 index 0000000000..b7f55b1f36 --- /dev/null +++ b/ydb/library/yql/dq/integration/yql_dq_optimization.cpp @@ -0,0 +1 @@ +#include "yql_dq_optimization.h" diff --git a/ydb/library/yql/dq/integration/yql_dq_optimization.h b/ydb/library/yql/dq/integration/yql_dq_optimization.h new file mode 100644 index 0000000000..5177e50c96 --- /dev/null +++ b/ydb/library/yql/dq/integration/yql_dq_optimization.h @@ -0,0 +1,75 @@ +#pragma once + +#include <ydb/library/yql/ast/yql_expr.h> + +namespace NYql { + +class IDqOptimization { +public: + virtual ~IDqOptimization() {} + + /** + Rewrite provider reader under DqReadWrap + Args: + * reader - provider specific callable of reader + * ctx - expr context + Returns one of: + * empty TPtr on error + * original `reader` if no changes + * new reader if any optimizations + */ + virtual TExprNode::TPtr RewriteRead(const TExprNode::TPtr& reader, TExprContext& ctx) = 0; + + /** + Apply new members subset for DqReadWrap's underlying provider reader + Args: + * reader - provider specific callable of reader + * members - expr list of atoms with new members + * ctx - expr context + Returns one of: + * empty TPtr on error + * original `reader` if no changes + * new reader with applyed new members + */ + virtual TExprNode::TPtr ApplyExtractMembers(const TExprNode::TPtr& reader, const TExprNode::TPtr& members, TExprContext& ctx) = 0; + + /** + Apply `take` or `skip` setting for DqReadWrap's underlying provider reader + Args: + * reader - provider specific callable of reader + * countBase - `Take`, `Skip` or `Limit` callable + * ctx - expr context + Returns one of: + * empty TPtr on error + * original `reader` if no changes + * new reader with applyed setting + */ + virtual TExprNode::TPtr ApplyTakeOrSkip(const TExprNode::TPtr& reader, const TExprNode::TPtr& countBase, TExprContext& ctx) = 0; + + /** + Apply `unordered` setting for DqReadWrap's underlying provider reader + Args: + * reader - provider specific callable of reader + * ctx - expr context + Returns one of: + * empty TPtr on error + * original `reader` if no changes + * new reader with applyed setting + */ + virtual TExprNode::TPtr ApplyUnordered(const TExprNode::TPtr& reader, TExprContext& ctx) = 0; + + /** + Optimize list/stream extend for set of DqReadWrap's underlying provider readers + Args: + * listOfReader - expr list of provider specific readers + * ordered - `true` for ordered extend (should keep original order of readres), `false` if readers may be reordred after optimization + * ctx - expr context + Returns one of: + * empty list on error + * original `listOfReader` if no changes + * new optimized list of readers. Returned list length should not be greater than original `listOfReader` length + */ + virtual TExprNode::TListType ApplyExtend(const TExprNode::TListType& listOfReader, bool ordered, TExprContext& ctx) = 0; +}; + +} // namespace NYql diff --git a/ydb/library/yql/providers/common/dq/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/providers/common/dq/CMakeLists.darwin-x86_64.txt index 1070128188..54fa168d21 100644 --- a/ydb/library/yql/providers/common/dq/CMakeLists.darwin-x86_64.txt +++ b/ydb/library/yql/providers/common/dq/CMakeLists.darwin-x86_64.txt @@ -15,4 +15,5 @@ target_link_libraries(providers-common-dq PUBLIC ) target_sources(providers-common-dq PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/common/dq/yql_dq_optimization_impl.cpp ) diff --git a/ydb/library/yql/providers/common/dq/CMakeLists.linux-aarch64.txt b/ydb/library/yql/providers/common/dq/CMakeLists.linux-aarch64.txt index 3fa3069dbc..a99519cbba 100644 --- a/ydb/library/yql/providers/common/dq/CMakeLists.linux-aarch64.txt +++ b/ydb/library/yql/providers/common/dq/CMakeLists.linux-aarch64.txt @@ -16,4 +16,5 @@ target_link_libraries(providers-common-dq PUBLIC ) target_sources(providers-common-dq PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/common/dq/yql_dq_optimization_impl.cpp ) diff --git a/ydb/library/yql/providers/common/dq/CMakeLists.linux-x86_64.txt b/ydb/library/yql/providers/common/dq/CMakeLists.linux-x86_64.txt index 3fa3069dbc..a99519cbba 100644 --- a/ydb/library/yql/providers/common/dq/CMakeLists.linux-x86_64.txt +++ b/ydb/library/yql/providers/common/dq/CMakeLists.linux-x86_64.txt @@ -16,4 +16,5 @@ target_link_libraries(providers-common-dq PUBLIC ) target_sources(providers-common-dq PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/common/dq/yql_dq_optimization_impl.cpp ) diff --git a/ydb/library/yql/providers/common/dq/CMakeLists.windows-x86_64.txt b/ydb/library/yql/providers/common/dq/CMakeLists.windows-x86_64.txt index 1070128188..54fa168d21 100644 --- a/ydb/library/yql/providers/common/dq/CMakeLists.windows-x86_64.txt +++ b/ydb/library/yql/providers/common/dq/CMakeLists.windows-x86_64.txt @@ -15,4 +15,5 @@ target_link_libraries(providers-common-dq PUBLIC ) target_sources(providers-common-dq PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/common/dq/yql_dq_optimization_impl.cpp ) diff --git a/ydb/library/yql/providers/common/dq/ya.make b/ydb/library/yql/providers/common/dq/ya.make index 769d0fd727..9fe8d72ade 100644 --- a/ydb/library/yql/providers/common/dq/ya.make +++ b/ydb/library/yql/providers/common/dq/ya.make @@ -2,6 +2,7 @@ LIBRARY() SRCS( yql_dq_integration_impl.cpp + yql_dq_optimization_impl.cpp ) PEERDIR( diff --git a/ydb/library/yql/providers/common/dq/yql_dq_optimization_impl.cpp b/ydb/library/yql/providers/common/dq/yql_dq_optimization_impl.cpp new file mode 100644 index 0000000000..8dac570051 --- /dev/null +++ b/ydb/library/yql/providers/common/dq/yql_dq_optimization_impl.cpp @@ -0,0 +1,25 @@ +#include "yql_dq_optimization_impl.h" + +namespace NYql { + +TExprNode::TPtr TDqOptimizationBase::RewriteRead(const TExprNode::TPtr& reader, TExprContext& /*ctx*/) { + return reader; +} + +TExprNode::TPtr TDqOptimizationBase::ApplyExtractMembers(const TExprNode::TPtr& reader, const TExprNode::TPtr& /*members*/, TExprContext& /*ctx*/) { + return reader; +} + +TExprNode::TPtr TDqOptimizationBase::ApplyTakeOrSkip(const TExprNode::TPtr& reader, const TExprNode::TPtr& /*countBase*/, TExprContext& /*ctx*/) { + return reader; +} + +TExprNode::TPtr TDqOptimizationBase::ApplyUnordered(const TExprNode::TPtr& reader, TExprContext& /*ctx*/) { + return reader; +} + +TExprNode::TListType TDqOptimizationBase::ApplyExtend(const TExprNode::TListType& listOfReader, bool /*ordered*/, TExprContext& /*ctx*/) { + return listOfReader; +} + +} // namespace NYql diff --git a/ydb/library/yql/providers/common/dq/yql_dq_optimization_impl.h b/ydb/library/yql/providers/common/dq/yql_dq_optimization_impl.h new file mode 100644 index 0000000000..5817d68250 --- /dev/null +++ b/ydb/library/yql/providers/common/dq/yql_dq_optimization_impl.h @@ -0,0 +1,16 @@ +#pragma once + +#include <ydb/library/yql/dq/integration/yql_dq_optimization.h> + +namespace NYql { + +class TDqOptimizationBase: public IDqOptimization { +public: + TExprNode::TPtr RewriteRead(const TExprNode::TPtr& reader, TExprContext& ctx) override; + TExprNode::TPtr ApplyExtractMembers(const TExprNode::TPtr& reader, const TExprNode::TPtr& members, TExprContext& ctx) override; + TExprNode::TPtr ApplyTakeOrSkip(const TExprNode::TPtr& reader, const TExprNode::TPtr& countBase, TExprContext& ctx) override; + TExprNode::TPtr ApplyUnordered(const TExprNode::TPtr& reader, TExprContext& ctx) override; + TExprNode::TListType ApplyExtend(const TExprNode::TListType& listOfReader, bool ordered, TExprContext& ctx) override; +}; + +} // namespace NYql diff --git a/ydb/library/yql/providers/common/provider/yql_data_provider_impl.cpp b/ydb/library/yql/providers/common/provider/yql_data_provider_impl.cpp index f4e8a8ce1b..62d4cb6ae4 100644 --- a/ydb/library/yql/providers/common/provider/yql_data_provider_impl.cpp +++ b/ydb/library/yql/providers/common/provider/yql_data_provider_impl.cpp @@ -321,6 +321,10 @@ IDqIntegration* TDataProviderBase::GetDqIntegration() { return nullptr; } +IDqOptimization* TDataProviderBase::GetDqOptimization() { + return nullptr; +} + TExprNode::TPtr DefaultCleanupWorld(const TExprNode::TPtr& node, TExprContext& ctx) { auto root = node; auto status = OptimizeExpr(root, root, [&](const TExprNode::TPtr& node, TExprContext& ctx) -> TExprNode::TPtr { diff --git a/ydb/library/yql/providers/common/provider/yql_data_provider_impl.h b/ydb/library/yql/providers/common/provider/yql_data_provider_impl.h index 445ffa1c18..4c4e88013d 100644 --- a/ydb/library/yql/providers/common/provider/yql_data_provider_impl.h +++ b/ydb/library/yql/providers/common/provider/yql_data_provider_impl.h @@ -88,6 +88,7 @@ public: ITrackableNodeProcessor& GetTrackableNodeProcessor() override; IGraphTransformer& GetPlanInfoTransformer() override; IDqIntegration* GetDqIntegration() override; + IDqOptimization* GetDqOptimization() override; protected: THolder<IGraphTransformer> DefConstraintTransformer_; diff --git a/ydb/library/yql/providers/dq/opt/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/providers/dq/opt/CMakeLists.darwin-x86_64.txt index 652509b9b5..d7b3651197 100644 --- a/ydb/library/yql/providers/dq/opt/CMakeLists.darwin-x86_64.txt +++ b/ydb/library/yql/providers/dq/opt/CMakeLists.darwin-x86_64.txt @@ -21,6 +21,7 @@ target_link_libraries(providers-dq-opt PUBLIC yql-utils-log yql-dq-opt yql-dq-type_ann + yql-dq-integration library-yql-core yql-core-peephole_opt yql-core-type_ann diff --git a/ydb/library/yql/providers/dq/opt/CMakeLists.linux-aarch64.txt b/ydb/library/yql/providers/dq/opt/CMakeLists.linux-aarch64.txt index c39d109453..7e6d60a07c 100644 --- a/ydb/library/yql/providers/dq/opt/CMakeLists.linux-aarch64.txt +++ b/ydb/library/yql/providers/dq/opt/CMakeLists.linux-aarch64.txt @@ -22,6 +22,7 @@ target_link_libraries(providers-dq-opt PUBLIC yql-utils-log yql-dq-opt yql-dq-type_ann + yql-dq-integration library-yql-core yql-core-peephole_opt yql-core-type_ann diff --git a/ydb/library/yql/providers/dq/opt/CMakeLists.linux-x86_64.txt b/ydb/library/yql/providers/dq/opt/CMakeLists.linux-x86_64.txt index c39d109453..7e6d60a07c 100644 --- a/ydb/library/yql/providers/dq/opt/CMakeLists.linux-x86_64.txt +++ b/ydb/library/yql/providers/dq/opt/CMakeLists.linux-x86_64.txt @@ -22,6 +22,7 @@ target_link_libraries(providers-dq-opt PUBLIC yql-utils-log yql-dq-opt yql-dq-type_ann + yql-dq-integration library-yql-core yql-core-peephole_opt yql-core-type_ann diff --git a/ydb/library/yql/providers/dq/opt/CMakeLists.windows-x86_64.txt b/ydb/library/yql/providers/dq/opt/CMakeLists.windows-x86_64.txt index 652509b9b5..d7b3651197 100644 --- a/ydb/library/yql/providers/dq/opt/CMakeLists.windows-x86_64.txt +++ b/ydb/library/yql/providers/dq/opt/CMakeLists.windows-x86_64.txt @@ -21,6 +21,7 @@ target_link_libraries(providers-dq-opt PUBLIC yql-utils-log yql-dq-opt yql-dq-type_ann + yql-dq-integration library-yql-core yql-core-peephole_opt yql-core-type_ann diff --git a/ydb/library/yql/providers/dq/opt/logical_optimize.cpp b/ydb/library/yql/providers/dq/opt/logical_optimize.cpp index 90952bd879..64eba04b14 100644 --- a/ydb/library/yql/providers/dq/opt/logical_optimize.cpp +++ b/ydb/library/yql/providers/dq/opt/logical_optimize.cpp @@ -5,9 +5,10 @@ #include <ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.h> #include <ydb/library/yql/providers/common/transform/yql_optimize.h> #include <ydb/library/yql/dq/opt/dq_opt_join.h> +#include <ydb/library/yql/dq/integration/yql_dq_optimization.h> #include <ydb/library/yql/dq/opt/dq_opt_log.h> -#include <ydb/library/yql/dq/expr_nodes/dq_expr_nodes.h> #include <ydb/library/yql/dq/opt/dq_opt.h> +#include <ydb/library/yql/dq/expr_nodes/dq_expr_nodes.h> #include <ydb/library/yql/core/yql_opt_utils.h> #include <ydb/library/yql/utils/log/log.h> #include <ydb/library/yql/core/cbo/cbo_optimizer.h> @@ -55,6 +56,10 @@ public: { #define HNDL(name) "DqsLogical-"#name, Hndl(&TDqsLogicalOptProposalTransformer::name) AddHandler(0, &TCoUnorderedBase::Match, HNDL(SkipUnordered)); + AddHandler(0, &TCoUnorderedBase::Match, HNDL(UnorderedOverDqReadWrap)); + AddHandler(0, &TCoExtractMembers::Match, HNDL(ExtractMembersOverDqReadWrap)); + AddHandler(0, &TCoCountBase::Match, HNDL(TakeOrSkipOverDqReadWrap)); + AddHandler(0, &TCoExtendBase::Match, HNDL(ExtendOverDqReadWrap)); AddHandler(0, &TCoAggregateBase::Match, HNDL(RewriteAggregate)); AddHandler(0, &TCoTake::Match, HNDL(RewriteTakeSortToTopSort)); AddHandler(0, &TCoEquiJoin::Match, HNDL(OptimizeEquiJoinWithCosts)); @@ -65,19 +70,168 @@ public: AddHandler(0, &TCoFlatMapBase::Match, HNDL(FlatMapOverExtend)); AddHandler(0, &TDqQuery::Match, HNDL(MergeQueriesWithSinks)); AddHandler(0, &TCoSqlIn::Match, HNDL(SqlInDropCompact)); + AddHandler(0, &TDqReadWrapBase::Match, HNDL(DqReadWrapByProvider)); #undef HNDL } protected: TMaybeNode<TExprBase> SkipUnordered(TExprBase node, TExprContext& ctx) { Y_UNUSED(ctx); - auto unordered = node.Cast<TCoUnorderedBase>(); + const auto unordered = node.Cast<TCoUnorderedBase>(); if (unordered.Input().Maybe<TDqConnection>()) { return unordered.Input(); } return node; } + TMaybeNode<TExprBase> UnorderedOverDqReadWrap(TExprBase node, TExprContext& ctx, const TGetParents& getParents) const { + const auto unordered = node.Cast<TCoUnorderedBase>(); + if (const auto maybeRead = unordered.Input().Maybe<TDqReadWrapBase>().Input()) { + if (Config->EnableDqReplicate.Get().GetOrElse(TDqSettings::TDefault::EnableDqReplicate)) { + const TParentsMap* parentsMap = getParents(); + auto parentsIt = parentsMap->find(unordered.Input().Raw()); + YQL_ENSURE(parentsIt != parentsMap->cend()); + if (parentsIt->second.size() > 1) { + return node; + } + } + auto providerRead = maybeRead.Cast(); + if (auto dqOpt = GetDqOptCallback(providerRead)) { + auto updatedRead = dqOpt->ApplyUnordered(providerRead.Ptr(), ctx); + if (!updatedRead) { + return {}; + } + if (updatedRead != providerRead.Ptr()) { + return TExprBase(ctx.ChangeChild(unordered.Input().Ref(), TDqReadWrapBase::idx_Input, std::move(updatedRead))); + } + } + } + + return node; + } + + TMaybeNode<TExprBase> ExtractMembersOverDqReadWrap(TExprBase node, TExprContext& ctx, const TGetParents& getParents) { + auto extract = node.Cast<TCoExtractMembers>(); + if (const auto maybeRead = extract.Input().Maybe<TDqReadWrap>().Input()) { + if (Config->EnableDqReplicate.Get().GetOrElse(TDqSettings::TDefault::EnableDqReplicate)) { + const TParentsMap* parentsMap = getParents(); + auto parentsIt = parentsMap->find(extract.Input().Raw()); + YQL_ENSURE(parentsIt != parentsMap->cend()); + if (parentsIt->second.size() > 1) { + return node; + } + } + + auto providerRead = maybeRead.Cast(); + if (auto dqOpt = GetDqOptCallback(providerRead)) { + auto updatedRead = dqOpt->ApplyExtractMembers(providerRead.Ptr(), extract.Members().Ptr(), ctx); + if (!updatedRead) { + return {}; + } + if (updatedRead != providerRead.Ptr()) { + return TExprBase(ctx.ChangeChild(extract.Input().Ref(), TDqReadWrap::idx_Input, std::move(updatedRead))); + } + } + } + + return node; + } + + TMaybeNode<TExprBase> TakeOrSkipOverDqReadWrap(TExprBase node, TExprContext& ctx) { + auto countBase = node.Cast<TCoCountBase>(); + + // TODO: support via precomputes + if (!TCoIntegralCtor::Match(countBase.Count().Raw())) { + return node; + } + + if (const auto maybeRead = countBase.Input().Maybe<TDqReadWrapBase>().Input()) { + auto providerRead = maybeRead.Cast(); + if (auto dqOpt = GetDqOptCallback(providerRead)) { + auto updatedRead = dqOpt->ApplyTakeOrSkip(providerRead.Ptr(), countBase.Ptr(), ctx); + if (!updatedRead) { + return {}; + } + if (updatedRead != providerRead.Ptr()) { + return TExprBase(ctx.ChangeChild(countBase.Input().Ref(), TDqReadWrapBase::idx_Input, std::move(updatedRead))); + } + } + } + + return node; + } + + TMaybeNode<TExprBase> ExtendOverDqReadWrap(TExprBase node, TExprContext& ctx) const { + auto extend = node.Cast<TCoExtendBase>(); + const bool ordered = node.Maybe<TCoOrderedExtend>().IsValid(); + const TExprNode* flags = nullptr; + const TExprNode* token = nullptr; + bool first = true; + std::unordered_map<IDqOptimization*, std::vector<std::pair<size_t, TExprNode::TPtr>>> readers; + IDqOptimization* prevDqOpt = nullptr; + for (size_t i = 0; i < extend.ArgCount(); ++i) { + const auto child = extend.Arg(i); + if (!TDqReadWrapBase::Match(child.Raw())) { + prevDqOpt = nullptr; + continue; + } + auto dqReadWrap = child.Cast<TDqReadWrapBase>(); + + if (first) { + flags = dqReadWrap.Flags().Raw(); + token = dqReadWrap.Token().Raw(); + first = false; + } else if (flags != dqReadWrap.Flags().Raw() || token != dqReadWrap.Token().Raw()) { + prevDqOpt = nullptr; + continue; + } + IDqOptimization* dqOpt = GetDqOptCallback(dqReadWrap.Input()); + if (!dqOpt) { + prevDqOpt = nullptr; + continue; + } + if (ordered && prevDqOpt != dqOpt) { + readers[dqOpt].assign(1, std::make_pair(i, dqReadWrap.Input().Ptr())); + } else { + readers[dqOpt].emplace_back(i, dqReadWrap.Input().Ptr()); + } + prevDqOpt = dqOpt; + } + + if (readers.empty() || AllOf(readers, [](const auto& item) { return item.second.size() < 2; })) { + return node; + } + + TExprNode::TListType newChildren = extend.Ref().ChildrenList(); + for (auto& [dqOpt, list]: readers) { + if (list.size() > 1) { + TExprNode::TListType inReaders; + std::transform(list.begin(), list.end(), std::back_inserter(inReaders), [](const auto& item) { return item.second; }); + TExprNode::TListType outReaders = dqOpt->ApplyExtend(inReaders, ordered, ctx); + if (outReaders.empty()) { + return {}; + } + if (inReaders != outReaders) { + YQL_ENSURE(outReaders.size() <= inReaders.size()); + } + size_t i = 0; + for (; i < outReaders.size(); ++i) { + newChildren[list[i].first] = ctx.ChangeChild(*newChildren[list[i].first], TDqReadWrapBase::idx_Input, std::move(outReaders[i])); + } + for (; i < list.size(); ++i) { + newChildren[list[i].first] = nullptr; + } + } + } + newChildren.erase(std::remove(newChildren.begin(), newChildren.end(), TExprNode::TPtr{}), newChildren.end()); + YQL_ENSURE(!newChildren.empty()); + if (newChildren.size() > 1) { + return TExprBase(ctx.ChangeChildren(extend.Ref(), std::move(newChildren))); + } else { + return TExprBase(newChildren.front()); + } + } + TMaybeNode<TExprBase> FlatMapOverExtend(TExprBase node, TExprContext& ctx) { return DqFlatMapOverExtend(node, ctx); } @@ -117,7 +271,7 @@ protected: TMaybeNode<TExprBase> OptimizeEquiJoinWithCosts(TExprBase node, TExprContext& ctx) { if (TypesCtx.CostBasedOptimizer != ECostBasedOptimizerType::Disable) { - std::function<void(const TString&)> log = [&](auto str) { + std::function<void(const TString&)> log = [&](auto str) { YQL_CLOG(INFO, ProviderDq) << str; }; std::function<IOptimizer*(IOptimizer::TInput&&)> factory = [&](auto input) { @@ -176,6 +330,20 @@ protected: return DqSqlInDropCompact(node, ctx); } + TMaybeNode<TExprBase> DqReadWrapByProvider(TExprBase node, TExprContext& ctx) const { + auto providerRead = node.Cast<TDqReadWrapBase>().Input(); + if (auto dqOpt = GetDqOptCallback(providerRead)) { + auto updatedRead = dqOpt->RewriteRead(providerRead.Ptr(), ctx); + if (!updatedRead) { + return {}; + } + if (updatedRead != providerRead.Ptr()) { + return TExprBase(ctx.ChangeChild(node.Ref(), TDqReadWrapBase::idx_Input, std::move(updatedRead))); + } + } + return node; + } + private: TMaybeNode<TExprBase> RewriteAsHoppingWindow(const TExprBase node, TExprContext& ctx, const TDqConnection& input) { const auto aggregate = node.Cast<TCoAggregate>(); @@ -916,6 +1084,16 @@ private: return enableWatermarks; } + IDqOptimization* GetDqOptCallback(const TExprBase& providerRead) const { + if (providerRead.Ref().ChildrenSize() > 1 && TCoDataSource::Match(providerRead.Ref().Child(1))) { + auto dataSourceName = providerRead.Ref().Child(1)->Child(0)->Content(); + auto datasource = TypesCtx.DataSourceMap.FindPtr(dataSourceName); + YQL_ENSURE(datasource); + return (*datasource)->GetDqOptimization(); + } + return nullptr; + } + private: TDqConfiguration::TPtr Config; TTypeAnnotationContext& TypesCtx; diff --git a/ydb/library/yql/providers/dq/opt/ya.make b/ydb/library/yql/providers/dq/opt/ya.make index 63d88fe7bb..842d413aff 100644 --- a/ydb/library/yql/providers/dq/opt/ya.make +++ b/ydb/library/yql/providers/dq/opt/ya.make @@ -8,6 +8,7 @@ PEERDIR( ydb/library/yql/utils/log ydb/library/yql/dq/opt ydb/library/yql/dq/type_ann + ydb/library/yql/dq/integration ydb/library/yql/core ydb/library/yql/core/peephole_opt ydb/library/yql/core/type_ann |