diff options
author | aozeritsky <aozeritsky@ydb.tech> | 2023-10-02 11:56:24 +0300 |
---|---|---|
committer | aozeritsky <aozeritsky@ydb.tech> | 2023-10-02 12:24:06 +0300 |
commit | a6afe47d107404d22b8a6c592dccdc913d6130c3 (patch) | |
tree | 72558aab92be822f179dbf29527adaf3ff8f0859 | |
parent | 0674445585038abd4dfc60b2f709cd29f18b65fe (diff) | |
download | ydb-a6afe47d107404d22b8a6c592dccdc913d6130c3.tar.gz |
Create statistics transformer base class
-rw-r--r-- | ydb/core/kqp/opt/kqp_statistics_transformer.cpp | 133 | ||||
-rw-r--r-- | ydb/core/kqp/opt/kqp_statistics_transformer.h | 16 | ||||
-rw-r--r-- | ydb/library/yql/dq/opt/CMakeLists.darwin-x86_64.txt | 1 | ||||
-rw-r--r-- | ydb/library/yql/dq/opt/CMakeLists.linux-aarch64.txt | 1 | ||||
-rw-r--r-- | ydb/library/yql/dq/opt/CMakeLists.linux-x86_64.txt | 1 | ||||
-rw-r--r-- | ydb/library/yql/dq/opt/CMakeLists.windows-x86_64.txt | 1 | ||||
-rw-r--r-- | ydb/library/yql/dq/opt/dq_opt_stat_transformer_base.cpp | 107 | ||||
-rw-r--r-- | ydb/library/yql/dq/opt/dq_opt_stat_transformer_base.h | 26 | ||||
-rw-r--r-- | ydb/library/yql/dq/opt/ya.make | 1 | ||||
-rw-r--r-- | ydb/library/yql/providers/dq/provider/yql_dq_statistics.cpp | 95 |
10 files changed, 217 insertions, 165 deletions
diff --git a/ydb/core/kqp/opt/kqp_statistics_transformer.cpp b/ydb/core/kqp/opt/kqp_statistics_transformer.cpp index a10a9328c5f..5680bc3c830 100644 --- a/ydb/core/kqp/opt/kqp_statistics_transformer.cpp +++ b/ydb/core/kqp/opt/kqp_statistics_transformer.cpp @@ -186,108 +186,55 @@ void AppendTxStats(const TExprNode::TPtr& input, TTypeAnnotationContext* typeCtx IGraphTransformer::TStatus TKqpStatisticsTransformer::DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) { - Y_UNUSED(ctx); output = input; if (!Config->HasOptEnableCostBasedOptimization()) { return IGraphTransformer::TStatus::Ok; } - TOptimizeExprSettings settings(nullptr); - - TVector<TVector<std::shared_ptr<TOptimizerStatistics>>> txStats; - - VisitExprLambdasLast( - input, [*this, &txStats](const TExprNode::TPtr& input) { - - // Generic matchers - if (TCoFilterBase::Match(input.Get())){ - InferStatisticsForFilter(input, TypeCtx); - } - else if(TCoSkipNullMembers::Match(input.Get())){ - InferStatisticsForSkipNullMembers(input, TypeCtx); - } - else if(TCoExtractMembers::Match(input.Get())){ - InferStatisticsForExtractMembers(input, TypeCtx); - } - else if(TCoAggregateCombine::Match(input.Get())){ - InferStatisticsForAggregateCombine(input, TypeCtx); - } - else if(TCoAggregateMergeFinalize::Match(input.Get())){ - InferStatisticsForAggregateMergeFinalize(input, TypeCtx); - } - - // KQP Matchers - else if(TKqlReadTableBase::Match(input.Get()) || TKqlReadTableRangesBase::Match(input.Get())){ - InferStatisticsForReadTable(input, TypeCtx, KqpCtx); - } - else if(TKqlLookupTableBase::Match(input.Get())) { - InferStatisticsForLookupTable(input, TypeCtx); - } - else if(TKqlLookupIndexBase::Match(input.Get())){ - InferStatisticsForIndexLookup(input, TypeCtx); - } - else if(TKqpTable::Match(input.Get())) { - InferStatisticsForKqpTable(input, TypeCtx, KqpCtx); - } - else if (TKqpReadRangesSourceSettings::Match(input.Get())) { - InferStatisticsForRowsSourceSettings(input, TypeCtx); - } - - // Join matchers - else if(TCoMapJoinCore::Match(input.Get())) { - InferStatisticsForMapJoin(input, TypeCtx); - } - else if(TCoGraceJoinCore::Match(input.Get())) { - InferStatisticsForGraceJoin(input, TypeCtx); - } - - // Do nothing in case of EquiJoin, otherwise the EquiJoin rule won't fire - else if(TCoEquiJoin::Match(input.Get())){ - } - - // In case of DqSource, propagate the statistics from the correct argument - else if (TDqSource::Match(input.Get())) { - InferStatisticsForDqSource(input, TypeCtx); - } - - // Match a result binding atom and connect it to a stage - else if(TCoParameter::Match(input.Get())) { - InferStatisticsForResultBinding(input, TypeCtx, txStats); - } - - // Finally, use a default rule to propagate the statistics and costs - else { - - // default sum propagation - if (input->ChildrenSize() >= 1) { - auto stats = TypeCtx->GetStats(input->ChildRef(0).Get()); - if (stats) { - TypeCtx->SetStats(input.Get(), stats); - } - } - } + TxStats.clear(); + return TDqStatisticsTransformerBase::DoTransform(input, output, ctx); +} - // We have a separate rule for all callables that may use a lambda - // we need to take each generic callable and see if it includes a lambda - // if so - we will map the input to the callable to the argument of the lambda - if (input->IsCallable()) { - PropagateStatisticsToLambdaArgument(input, TypeCtx); - } +bool TKqpStatisticsTransformer::BeforeLambdasSpecific(const TExprNode::TPtr& input, TExprContext& ctx) { + Y_UNUSED(ctx); + bool matched = true; + // KQP Matchers + if(TKqlReadTableBase::Match(input.Get()) || TKqlReadTableRangesBase::Match(input.Get())){ + InferStatisticsForReadTable(input, TypeCtx, KqpCtx); + } + else if(TKqlLookupTableBase::Match(input.Get())) { + InferStatisticsForLookupTable(input, TypeCtx); + } + else if(TKqlLookupIndexBase::Match(input.Get())){ + InferStatisticsForIndexLookup(input, TypeCtx); + } + else if(TKqpTable::Match(input.Get())) { + InferStatisticsForKqpTable(input, TypeCtx, KqpCtx); + } + else if (TKqpReadRangesSourceSettings::Match(input.Get())) { + InferStatisticsForRowsSourceSettings(input, TypeCtx); + } - return true; }, + // Match a result binding atom and connect it to a stage + else if(TCoParameter::Match(input.Get())) { + InferStatisticsForResultBinding(input, TypeCtx, TxStats); + } + else { + matched = false; + } - [*this, &txStats](const TExprNode::TPtr& input) { - if (TDqStageBase::Match(input.Get())) { - InferStatisticsForStage(input, TypeCtx); - } else if (TKqpPhysicalTx::Match(input.Get())) { - AppendTxStats(input, TypeCtx, txStats); - } else if (TCoFlatMapBase::Match(input.Get())) { - InferStatisticsForFlatMap(input, TypeCtx); - } + return matched; +} - return true; - }); - return IGraphTransformer::TStatus::Ok; +bool TKqpStatisticsTransformer::AfterLambdasSpecific(const TExprNode::TPtr& input, TExprContext& ctx) { + Y_UNUSED(ctx); + bool matched = true; + if (TKqpPhysicalTx::Match(input.Get())) { + AppendTxStats(input, TypeCtx, TxStats); + } else { + matched = false; + } + return matched; } TAutoPtr<IGraphTransformer> NKikimr::NKqp::CreateKqpStatisticsTransformer(const TIntrusivePtr<TKqpOptimizeContext>& kqpCtx, diff --git a/ydb/core/kqp/opt/kqp_statistics_transformer.h b/ydb/core/kqp/opt/kqp_statistics_transformer.h index d1715e461e0..3f4d9a3a39c 100644 --- a/ydb/core/kqp/opt/kqp_statistics_transformer.h +++ b/ydb/core/kqp/opt/kqp_statistics_transformer.h @@ -10,6 +10,7 @@ #include <ydb/library/yql/core/yql_expr_type_annotation.h> #include <ydb/core/kqp/provider/yql_kikimr_provider_impl.h> #include <ydb/library/yql/core/yql_opt_utils.h> +#include <ydb/library/yql/dq/opt/dq_opt_stat_transformer_base.h> namespace NKikimr { namespace NKqp { @@ -24,26 +25,25 @@ using namespace NOpt; * but will simply stop propagation if in encounters an operator that it has no rules for. * One of such operators is EquiJoin, but there is a special rule to handle EquiJoin. */ -class TKqpStatisticsTransformer : public TSyncTransformerBase { +class TKqpStatisticsTransformer : public NYql::NDq::TDqStatisticsTransformerBase { - TTypeAnnotationContext* TypeCtx; const TKikimrConfiguration::TPtr& Config; const TKqpOptimizeContext& KqpCtx; + TVector<TVector<std::shared_ptr<TOptimizerStatistics>>> TxStats; public: TKqpStatisticsTransformer(const TIntrusivePtr<TKqpOptimizeContext>& kqpCtx, TTypeAnnotationContext& typeCtx, const TKikimrConfiguration::TPtr& config) : - TypeCtx(&typeCtx), + TDqStatisticsTransformerBase(&typeCtx), Config(config), KqpCtx(*kqpCtx) {} // Main method of the transformer IGraphTransformer::TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final; - - // Rewind currently does nothing - void Rewind() { - - } + + private: + bool BeforeLambdasSpecific(const TExprNode::TPtr& input, TExprContext& ctx) final; + bool AfterLambdasSpecific(const TExprNode::TPtr& input, TExprContext& ctx) final; }; TAutoPtr<IGraphTransformer> CreateKqpStatisticsTransformer(const TIntrusivePtr<TKqpOptimizeContext>& kqpCtx, diff --git a/ydb/library/yql/dq/opt/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/dq/opt/CMakeLists.darwin-x86_64.txt index ee9362aeb62..288c9393507 100644 --- a/ydb/library/yql/dq/opt/CMakeLists.darwin-x86_64.txt +++ b/ydb/library/yql/dq/opt/CMakeLists.darwin-x86_64.txt @@ -33,6 +33,7 @@ target_sources(yql-dq-opt PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/opt/dq_opt_phy_finalizing.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/opt/dq_opt_phy.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/opt/dq_opt_stat.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/opt/dq_opt_stat_transformer_base.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/opt/dq_opt_join_cost_based.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/opt/dq_opt_join_cost_based_generic.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/opt/dq_opt_predicate_selectivity.cpp diff --git a/ydb/library/yql/dq/opt/CMakeLists.linux-aarch64.txt b/ydb/library/yql/dq/opt/CMakeLists.linux-aarch64.txt index d538669abb5..7768a5d0dc4 100644 --- a/ydb/library/yql/dq/opt/CMakeLists.linux-aarch64.txt +++ b/ydb/library/yql/dq/opt/CMakeLists.linux-aarch64.txt @@ -34,6 +34,7 @@ target_sources(yql-dq-opt PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/opt/dq_opt_phy_finalizing.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/opt/dq_opt_phy.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/opt/dq_opt_stat.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/opt/dq_opt_stat_transformer_base.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/opt/dq_opt_join_cost_based.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/opt/dq_opt_join_cost_based_generic.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/opt/dq_opt_predicate_selectivity.cpp diff --git a/ydb/library/yql/dq/opt/CMakeLists.linux-x86_64.txt b/ydb/library/yql/dq/opt/CMakeLists.linux-x86_64.txt index d538669abb5..7768a5d0dc4 100644 --- a/ydb/library/yql/dq/opt/CMakeLists.linux-x86_64.txt +++ b/ydb/library/yql/dq/opt/CMakeLists.linux-x86_64.txt @@ -34,6 +34,7 @@ target_sources(yql-dq-opt PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/opt/dq_opt_phy_finalizing.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/opt/dq_opt_phy.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/opt/dq_opt_stat.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/opt/dq_opt_stat_transformer_base.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/opt/dq_opt_join_cost_based.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/opt/dq_opt_join_cost_based_generic.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/opt/dq_opt_predicate_selectivity.cpp diff --git a/ydb/library/yql/dq/opt/CMakeLists.windows-x86_64.txt b/ydb/library/yql/dq/opt/CMakeLists.windows-x86_64.txt index ee9362aeb62..288c9393507 100644 --- a/ydb/library/yql/dq/opt/CMakeLists.windows-x86_64.txt +++ b/ydb/library/yql/dq/opt/CMakeLists.windows-x86_64.txt @@ -33,6 +33,7 @@ target_sources(yql-dq-opt PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/opt/dq_opt_phy_finalizing.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/opt/dq_opt_phy.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/opt/dq_opt_stat.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/opt/dq_opt_stat_transformer_base.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/opt/dq_opt_join_cost_based.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/opt/dq_opt_join_cost_based_generic.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/opt/dq_opt_predicate_selectivity.cpp diff --git a/ydb/library/yql/dq/opt/dq_opt_stat_transformer_base.cpp b/ydb/library/yql/dq/opt/dq_opt_stat_transformer_base.cpp new file mode 100644 index 00000000000..4284d30ceca --- /dev/null +++ b/ydb/library/yql/dq/opt/dq_opt_stat_transformer_base.cpp @@ -0,0 +1,107 @@ +#include "dq_opt_stat_transformer_base.h" + +#include <ydb/library/yql/dq/opt/dq_opt_stat.h> +#include <ydb/library/yql/core/yql_expr_optimize.h> + +namespace NYql::NDq { + +using namespace NNodes; + +TDqStatisticsTransformerBase::TDqStatisticsTransformerBase(TTypeAnnotationContext* typeCtx) + : TypeCtx(typeCtx) +{ } + +IGraphTransformer::TStatus TDqStatisticsTransformerBase::DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) { + output = input; + VisitExprLambdasLast( + input, [&](const TExprNode::TPtr& input) { + BeforeLambdas(input, ctx) || BeforeLambdasSpecific(input, ctx) || BeforeLambdasUnmatched(input, ctx); + + // We have a separate rule for all callables that may use a lambda + // we need to take each generic callable and see if it includes a lambda + // if so - we will map the input to the callable to the argument of the lambda + if (input->IsCallable()) { + PropagateStatisticsToLambdaArgument(input, TypeCtx); + } + + return true; + }, + [&](const TExprNode::TPtr& input) { + return AfterLambdas(input, ctx) || AfterLambdasSpecific(input, ctx) || true; + }); + return IGraphTransformer::TStatus::Ok; +} + +bool TDqStatisticsTransformerBase::BeforeLambdas(const TExprNode::TPtr& input, TExprContext& ctx) +{ + Y_UNUSED(ctx); + bool matched = true; + // Generic matchers + if (TCoFilterBase::Match(input.Get())){ + InferStatisticsForFilter(input, TypeCtx); + } + else if(TCoSkipNullMembers::Match(input.Get())){ + InferStatisticsForSkipNullMembers(input, TypeCtx); + } + else if(TCoExtractMembers::Match(input.Get())){ + InferStatisticsForExtractMembers(input, TypeCtx); + } + else if(TCoAggregateCombine::Match(input.Get())){ + InferStatisticsForAggregateCombine(input, TypeCtx); + } + else if(TCoAggregateMergeFinalize::Match(input.Get())){ + InferStatisticsForAggregateMergeFinalize(input, TypeCtx); + } + + // Join matchers + else if(TCoMapJoinCore::Match(input.Get())) { + InferStatisticsForMapJoin(input, TypeCtx); + } + else if(TCoGraceJoinCore::Match(input.Get())) { + InferStatisticsForGraceJoin(input, TypeCtx); + } + + // Do nothing in case of EquiJoin, otherwise the EquiJoin rule won't fire + else if(TCoEquiJoin::Match(input.Get())){ + } + + // In case of DqSource, propagate the statistics from the correct argument + else if (TDqSource::Match(input.Get())) { + InferStatisticsForDqSource(input, TypeCtx); + } + else { + matched = false; + } + + return matched; +} + +bool TDqStatisticsTransformerBase::BeforeLambdasUnmatched(const TExprNode::TPtr& input, TExprContext& ctx) +{ + Y_UNUSED(ctx); + if (input->ChildrenSize() >= 1) { + auto stats = TypeCtx->GetStats(input->ChildRef(0).Get()); + if (stats) { + TypeCtx->SetStats(input.Get(), stats); + } + } + return true; +} + +bool TDqStatisticsTransformerBase::AfterLambdas(const TExprNode::TPtr& input, TExprContext& ctx) { + Y_UNUSED(ctx); + bool matched = true; + if (TDqStageBase::Match(input.Get())) { + InferStatisticsForStage(input, TypeCtx); + } else if (TCoFlatMapBase::Match(input.Get())) { + InferStatisticsForFlatMap(input, TypeCtx); + } else { + matched = false; + } + return matched; +} + +void TDqStatisticsTransformerBase::Rewind() { } + +} // namespace NYql::NDq + diff --git a/ydb/library/yql/dq/opt/dq_opt_stat_transformer_base.h b/ydb/library/yql/dq/opt/dq_opt_stat_transformer_base.h new file mode 100644 index 00000000000..45ff27acc31 --- /dev/null +++ b/ydb/library/yql/dq/opt/dq_opt_stat_transformer_base.h @@ -0,0 +1,26 @@ +#pragma once + +#include <ydb/library/yql/core/yql_graph_transformer.h> +#include <ydb/library/yql/core/yql_type_annotation.h> + +namespace NYql::NDq { + +class TDqStatisticsTransformerBase : public TSyncTransformerBase { +public: + TDqStatisticsTransformerBase(TTypeAnnotationContext* typeCtx); + + IGraphTransformer::TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) override; + void Rewind() override; + +protected: + virtual bool BeforeLambdasSpecific(const TExprNode::TPtr& input, TExprContext& ctx) = 0; + virtual bool AfterLambdasSpecific(const TExprNode::TPtr& input, TExprContext& ctx) = 0; + + bool BeforeLambdasUnmatched(const TExprNode::TPtr& input, TExprContext& ctx); + bool BeforeLambdas(const TExprNode::TPtr& input, TExprContext& ctx); + bool AfterLambdas(const TExprNode::TPtr& input, TExprContext& ctx); + + TTypeAnnotationContext* TypeCtx; +}; + +} // namespace NYql::NDq diff --git a/ydb/library/yql/dq/opt/ya.make b/ydb/library/yql/dq/opt/ya.make index 4efc0728544..15c6c03dafa 100644 --- a/ydb/library/yql/dq/opt/ya.make +++ b/ydb/library/yql/dq/opt/ya.make @@ -20,6 +20,7 @@ SRCS( dq_opt_phy_finalizing.cpp dq_opt_phy.cpp dq_opt_stat.cpp + dq_opt_stat_transformer_base.cpp dq_opt_join_cost_based.cpp dq_opt_join_cost_based_generic.cpp dq_opt_predicate_selectivity.cpp diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_statistics.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_statistics.cpp index c6d91e97206..4d44e7cd03f 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_statistics.cpp +++ b/ydb/library/yql/providers/dq/provider/yql_dq_statistics.cpp @@ -2,6 +2,7 @@ #include "yql_dq_state.h" #include <ydb/library/yql/dq/opt/dq_opt_stat.h> +#include <ydb/library/yql/dq/opt/dq_opt_stat_transformer_base.h> #include <ydb/library/yql/dq/integration/yql_dq_integration.h> #include <ydb/library/yql/core/yql_expr_optimize.h> @@ -11,78 +12,44 @@ namespace NYql { using namespace NNodes; -class TDqsStatisticsTransformer : public TSyncTransformerBase { +class TDqsStatisticsTransformer : public NDq::TDqStatisticsTransformerBase { public: TDqsStatisticsTransformer(const TDqStatePtr& state) - : State(state) + : NDq::TDqStatisticsTransformerBase(state->TypeCtx) + , State(state) { } - IGraphTransformer::TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final { - output = input; - - if (!State->TypeCtx->CostBasedOptimizerType) { - return IGraphTransformer::TStatus::Ok; - } - - TOptimizeExprSettings settings(nullptr); - - auto ret = OptimizeExpr(input, output, [*this](const TExprNode::TPtr& input, TExprContext& ctx) { - Y_UNUSED(ctx); - auto output = input; - bool hasDqSource = false; - - if (TCoFlatMap::Match(input.Get())) { - NDq::InferStatisticsForFlatMap(input, State->TypeCtx); - } else if(TCoSkipNullMembers::Match(input.Get())) { - NDq::InferStatisticsForSkipNullMembers(input, State->TypeCtx); - } else if(TCoExtractMembers::Match(input.Get())){ - NDq::InferStatisticsForExtractMembers(input, State->TypeCtx); - } - else if(TCoAggregateCombine::Match(input.Get())){ - NDq::InferStatisticsForAggregateCombine(input, State->TypeCtx); - } - else if(TCoAggregateMergeFinalize::Match(input.Get())){ - NDq::InferStatisticsForAggregateMergeFinalize(input, State->TypeCtx); - } else if (TDqReadWrapBase::Match(input.Get()) || (hasDqSource = TDqSourceWrapBase::Match(input.Get()))) { - auto node = hasDqSource - ? input - : input->Child(TDqReadWrapBase::idx_Input); - auto dataSourceChildIndex = 1; - YQL_ENSURE(node->ChildrenSize() > 1); - YQL_ENSURE(node->Child(dataSourceChildIndex)->IsCallable("DataSource")); - auto dataSourceName = node->Child(dataSourceChildIndex)->Child(0)->Content(); - auto datasource = State->TypeCtx->DataSourceMap.FindPtr(dataSourceName); - YQL_ENSURE(datasource); - if (auto dqIntegration = (*datasource)->GetDqIntegration()) { - auto stat = dqIntegration->ReadStatistics(node, ctx); - if (stat) { - State->TypeCtx->SetStats(input.Get(), std::move(std::make_shared<TOptimizerStatistics>(*stat))); - } + bool BeforeLambdasSpecific(const TExprNode::TPtr& input, TExprContext& ctx) override { + bool matched = true; + bool hasDqSource = false; + + if (TDqReadWrapBase::Match(input.Get()) || (hasDqSource = TDqSourceWrapBase::Match(input.Get()))) { + auto node = hasDqSource + ? input + : input->Child(TDqReadWrapBase::idx_Input); + auto dataSourceChildIndex = 1; + YQL_ENSURE(node->ChildrenSize() > 1); + YQL_ENSURE(node->Child(dataSourceChildIndex)->IsCallable("DataSource")); + auto dataSourceName = node->Child(dataSourceChildIndex)->Child(0)->Content(); + auto datasource = State->TypeCtx->DataSourceMap.FindPtr(dataSourceName); + YQL_ENSURE(datasource); + if (auto dqIntegration = (*datasource)->GetDqIntegration()) { + auto stat = dqIntegration->ReadStatistics(node, ctx); + if (stat) { + State->TypeCtx->SetStats(input.Get(), std::move(std::make_shared<TOptimizerStatistics>(*stat))); } } - // Don't propagate statistics and costs in case of EquiJoin, join reordering only works if costs have - // not been propagated yet - else if (TCoEquiJoin::Match(input.Get())){ - } else { - // default sum propagation - TOptimizerStatistics stat; - for (const auto& child : input->Children()) { - auto chStat = State->TypeCtx->GetStats(child.Get()); - if (chStat) { - stat += *chStat; - } - } - if (!stat.Empty()) { - State->TypeCtx->SetStats(input.Get(), std::move(std::make_shared<TOptimizerStatistics>(stat))); - } - } - return output; - }, ctx, settings); - - return ret; + } else { + matched = false; + } + return matched; } - void Rewind() { } + bool AfterLambdasSpecific(const TExprNode::TPtr& input, TExprContext& ctx) override { + Y_UNUSED(input); + Y_UNUSED(ctx); + return false; + } private: TDqStatePtr State; |