aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorudovichenko-r <rvu@ydb.tech>2023-10-31 13:28:19 +0300
committerudovichenko-r <rvu@ydb.tech>2023-10-31 14:02:11 +0300
commit08b33bb3ed7001c64c5a1fc18371b9065f76e8b3 (patch)
tree0b8451c504eda15c19697a0baecf4542af3e1133
parent0c72d00bdec4f92b4fdf6c170b43d9e22ae57e90 (diff)
downloadydb-08b33bb3ed7001c64c5a1fc18371b9065f76e8b3.tar.gz
[dq] Interface for provider specific optimizers
YQL-16013
-rw-r--r--ydb/library/yql/core/yql_data_provider.h2
-rw-r--r--ydb/library/yql/dq/integration/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/library/yql/dq/integration/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/library/yql/dq/integration/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/library/yql/dq/integration/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/library/yql/dq/integration/ya.make1
-rw-r--r--ydb/library/yql/dq/integration/yql_dq_optimization.cpp1
-rw-r--r--ydb/library/yql/dq/integration/yql_dq_optimization.h75
-rw-r--r--ydb/library/yql/providers/common/dq/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/library/yql/providers/common/dq/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/library/yql/providers/common/dq/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/library/yql/providers/common/dq/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/library/yql/providers/common/dq/ya.make1
-rw-r--r--ydb/library/yql/providers/common/dq/yql_dq_optimization_impl.cpp25
-rw-r--r--ydb/library/yql/providers/common/dq/yql_dq_optimization_impl.h16
-rw-r--r--ydb/library/yql/providers/common/provider/yql_data_provider_impl.cpp4
-rw-r--r--ydb/library/yql/providers/common/provider/yql_data_provider_impl.h1
-rw-r--r--ydb/library/yql/providers/dq/opt/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/library/yql/providers/dq/opt/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/library/yql/providers/dq/opt/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/library/yql/providers/dq/opt/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/library/yql/providers/dq/opt/logical_optimize.cpp184
-rw-r--r--ydb/library/yql/providers/dq/opt/ya.make1
23 files changed, 320 insertions, 3 deletions
diff --git a/ydb/library/yql/core/yql_data_provider.h b/ydb/library/yql/core/yql_data_provider.h
index 2fc835a47a..95d7aee773 100644
--- a/ydb/library/yql/core/yql_data_provider.h
+++ b/ydb/library/yql/core/yql_data_provider.h
@@ -75,6 +75,7 @@ public:
};
class IDqIntegration;
+class IDqOptimization;
class IOptimizationContext;
@@ -174,6 +175,7 @@ public:
// DQ
virtual IDqIntegration* GetDqIntegration() = 0;
+ virtual IDqOptimization* GetDqOptimization() = 0;
};
struct IPipelineConfigurator;
diff --git a/ydb/library/yql/dq/integration/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/dq/integration/CMakeLists.darwin-x86_64.txt
index 2213c8b415..6ff4d7364c 100644
--- a/ydb/library/yql/dq/integration/CMakeLists.darwin-x86_64.txt
+++ b/ydb/library/yql/dq/integration/CMakeLists.darwin-x86_64.txt
@@ -23,4 +23,5 @@ target_link_libraries(yql-dq-integration PUBLIC
)
target_sources(yql-dq-integration PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/integration/yql_dq_integration.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/integration/yql_dq_optimization.cpp
)
diff --git a/ydb/library/yql/dq/integration/CMakeLists.linux-aarch64.txt b/ydb/library/yql/dq/integration/CMakeLists.linux-aarch64.txt
index f01517462e..1d4e492554 100644
--- a/ydb/library/yql/dq/integration/CMakeLists.linux-aarch64.txt
+++ b/ydb/library/yql/dq/integration/CMakeLists.linux-aarch64.txt
@@ -24,4 +24,5 @@ target_link_libraries(yql-dq-integration PUBLIC
)
target_sources(yql-dq-integration PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/integration/yql_dq_integration.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/integration/yql_dq_optimization.cpp
)
diff --git a/ydb/library/yql/dq/integration/CMakeLists.linux-x86_64.txt b/ydb/library/yql/dq/integration/CMakeLists.linux-x86_64.txt
index f01517462e..1d4e492554 100644
--- a/ydb/library/yql/dq/integration/CMakeLists.linux-x86_64.txt
+++ b/ydb/library/yql/dq/integration/CMakeLists.linux-x86_64.txt
@@ -24,4 +24,5 @@ target_link_libraries(yql-dq-integration PUBLIC
)
target_sources(yql-dq-integration PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/integration/yql_dq_integration.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/integration/yql_dq_optimization.cpp
)
diff --git a/ydb/library/yql/dq/integration/CMakeLists.windows-x86_64.txt b/ydb/library/yql/dq/integration/CMakeLists.windows-x86_64.txt
index 2213c8b415..6ff4d7364c 100644
--- a/ydb/library/yql/dq/integration/CMakeLists.windows-x86_64.txt
+++ b/ydb/library/yql/dq/integration/CMakeLists.windows-x86_64.txt
@@ -23,4 +23,5 @@ target_link_libraries(yql-dq-integration PUBLIC
)
target_sources(yql-dq-integration PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/integration/yql_dq_integration.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/integration/yql_dq_optimization.cpp
)
diff --git a/ydb/library/yql/dq/integration/ya.make b/ydb/library/yql/dq/integration/ya.make
index f57695f542..955405344a 100644
--- a/ydb/library/yql/dq/integration/ya.make
+++ b/ydb/library/yql/dq/integration/ya.make
@@ -2,6 +2,7 @@ LIBRARY()
SRCS(
yql_dq_integration.cpp
+ yql_dq_optimization.cpp
)
PEERDIR(
diff --git a/ydb/library/yql/dq/integration/yql_dq_optimization.cpp b/ydb/library/yql/dq/integration/yql_dq_optimization.cpp
new file mode 100644
index 0000000000..b7f55b1f36
--- /dev/null
+++ b/ydb/library/yql/dq/integration/yql_dq_optimization.cpp
@@ -0,0 +1 @@
+#include "yql_dq_optimization.h"
diff --git a/ydb/library/yql/dq/integration/yql_dq_optimization.h b/ydb/library/yql/dq/integration/yql_dq_optimization.h
new file mode 100644
index 0000000000..5177e50c96
--- /dev/null
+++ b/ydb/library/yql/dq/integration/yql_dq_optimization.h
@@ -0,0 +1,75 @@
+#pragma once
+
+#include <ydb/library/yql/ast/yql_expr.h>
+
+namespace NYql {
+
+class IDqOptimization {
+public:
+ virtual ~IDqOptimization() {}
+
+ /**
+ Rewrite provider reader under DqReadWrap
+ Args:
+ * reader - provider specific callable of reader
+ * ctx - expr context
+ Returns one of:
+ * empty TPtr on error
+ * original `reader` if no changes
+ * new reader if any optimizations
+ */
+ virtual TExprNode::TPtr RewriteRead(const TExprNode::TPtr& reader, TExprContext& ctx) = 0;
+
+ /**
+ Apply new members subset for DqReadWrap's underlying provider reader
+ Args:
+ * reader - provider specific callable of reader
+ * members - expr list of atoms with new members
+ * ctx - expr context
+ Returns one of:
+ * empty TPtr on error
+ * original `reader` if no changes
+ * new reader with applyed new members
+ */
+ virtual TExprNode::TPtr ApplyExtractMembers(const TExprNode::TPtr& reader, const TExprNode::TPtr& members, TExprContext& ctx) = 0;
+
+ /**
+ Apply `take` or `skip` setting for DqReadWrap's underlying provider reader
+ Args:
+ * reader - provider specific callable of reader
+ * countBase - `Take`, `Skip` or `Limit` callable
+ * ctx - expr context
+ Returns one of:
+ * empty TPtr on error
+ * original `reader` if no changes
+ * new reader with applyed setting
+ */
+ virtual TExprNode::TPtr ApplyTakeOrSkip(const TExprNode::TPtr& reader, const TExprNode::TPtr& countBase, TExprContext& ctx) = 0;
+
+ /**
+ Apply `unordered` setting for DqReadWrap's underlying provider reader
+ Args:
+ * reader - provider specific callable of reader
+ * ctx - expr context
+ Returns one of:
+ * empty TPtr on error
+ * original `reader` if no changes
+ * new reader with applyed setting
+ */
+ virtual TExprNode::TPtr ApplyUnordered(const TExprNode::TPtr& reader, TExprContext& ctx) = 0;
+
+ /**
+ Optimize list/stream extend for set of DqReadWrap's underlying provider readers
+ Args:
+ * listOfReader - expr list of provider specific readers
+ * ordered - `true` for ordered extend (should keep original order of readres), `false` if readers may be reordred after optimization
+ * ctx - expr context
+ Returns one of:
+ * empty list on error
+ * original `listOfReader` if no changes
+ * new optimized list of readers. Returned list length should not be greater than original `listOfReader` length
+ */
+ virtual TExprNode::TListType ApplyExtend(const TExprNode::TListType& listOfReader, bool ordered, TExprContext& ctx) = 0;
+};
+
+} // namespace NYql
diff --git a/ydb/library/yql/providers/common/dq/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/providers/common/dq/CMakeLists.darwin-x86_64.txt
index 1070128188..54fa168d21 100644
--- a/ydb/library/yql/providers/common/dq/CMakeLists.darwin-x86_64.txt
+++ b/ydb/library/yql/providers/common/dq/CMakeLists.darwin-x86_64.txt
@@ -15,4 +15,5 @@ target_link_libraries(providers-common-dq PUBLIC
)
target_sources(providers-common-dq PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/common/dq/yql_dq_optimization_impl.cpp
)
diff --git a/ydb/library/yql/providers/common/dq/CMakeLists.linux-aarch64.txt b/ydb/library/yql/providers/common/dq/CMakeLists.linux-aarch64.txt
index 3fa3069dbc..a99519cbba 100644
--- a/ydb/library/yql/providers/common/dq/CMakeLists.linux-aarch64.txt
+++ b/ydb/library/yql/providers/common/dq/CMakeLists.linux-aarch64.txt
@@ -16,4 +16,5 @@ target_link_libraries(providers-common-dq PUBLIC
)
target_sources(providers-common-dq PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/common/dq/yql_dq_optimization_impl.cpp
)
diff --git a/ydb/library/yql/providers/common/dq/CMakeLists.linux-x86_64.txt b/ydb/library/yql/providers/common/dq/CMakeLists.linux-x86_64.txt
index 3fa3069dbc..a99519cbba 100644
--- a/ydb/library/yql/providers/common/dq/CMakeLists.linux-x86_64.txt
+++ b/ydb/library/yql/providers/common/dq/CMakeLists.linux-x86_64.txt
@@ -16,4 +16,5 @@ target_link_libraries(providers-common-dq PUBLIC
)
target_sources(providers-common-dq PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/common/dq/yql_dq_optimization_impl.cpp
)
diff --git a/ydb/library/yql/providers/common/dq/CMakeLists.windows-x86_64.txt b/ydb/library/yql/providers/common/dq/CMakeLists.windows-x86_64.txt
index 1070128188..54fa168d21 100644
--- a/ydb/library/yql/providers/common/dq/CMakeLists.windows-x86_64.txt
+++ b/ydb/library/yql/providers/common/dq/CMakeLists.windows-x86_64.txt
@@ -15,4 +15,5 @@ target_link_libraries(providers-common-dq PUBLIC
)
target_sources(providers-common-dq PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/common/dq/yql_dq_optimization_impl.cpp
)
diff --git a/ydb/library/yql/providers/common/dq/ya.make b/ydb/library/yql/providers/common/dq/ya.make
index 769d0fd727..9fe8d72ade 100644
--- a/ydb/library/yql/providers/common/dq/ya.make
+++ b/ydb/library/yql/providers/common/dq/ya.make
@@ -2,6 +2,7 @@ LIBRARY()
SRCS(
yql_dq_integration_impl.cpp
+ yql_dq_optimization_impl.cpp
)
PEERDIR(
diff --git a/ydb/library/yql/providers/common/dq/yql_dq_optimization_impl.cpp b/ydb/library/yql/providers/common/dq/yql_dq_optimization_impl.cpp
new file mode 100644
index 0000000000..8dac570051
--- /dev/null
+++ b/ydb/library/yql/providers/common/dq/yql_dq_optimization_impl.cpp
@@ -0,0 +1,25 @@
+#include "yql_dq_optimization_impl.h"
+
+namespace NYql {
+
+TExprNode::TPtr TDqOptimizationBase::RewriteRead(const TExprNode::TPtr& reader, TExprContext& /*ctx*/) {
+ return reader;
+}
+
+TExprNode::TPtr TDqOptimizationBase::ApplyExtractMembers(const TExprNode::TPtr& reader, const TExprNode::TPtr& /*members*/, TExprContext& /*ctx*/) {
+ return reader;
+}
+
+TExprNode::TPtr TDqOptimizationBase::ApplyTakeOrSkip(const TExprNode::TPtr& reader, const TExprNode::TPtr& /*countBase*/, TExprContext& /*ctx*/) {
+ return reader;
+}
+
+TExprNode::TPtr TDqOptimizationBase::ApplyUnordered(const TExprNode::TPtr& reader, TExprContext& /*ctx*/) {
+ return reader;
+}
+
+TExprNode::TListType TDqOptimizationBase::ApplyExtend(const TExprNode::TListType& listOfReader, bool /*ordered*/, TExprContext& /*ctx*/) {
+ return listOfReader;
+}
+
+} // namespace NYql
diff --git a/ydb/library/yql/providers/common/dq/yql_dq_optimization_impl.h b/ydb/library/yql/providers/common/dq/yql_dq_optimization_impl.h
new file mode 100644
index 0000000000..5817d68250
--- /dev/null
+++ b/ydb/library/yql/providers/common/dq/yql_dq_optimization_impl.h
@@ -0,0 +1,16 @@
+#pragma once
+
+#include <ydb/library/yql/dq/integration/yql_dq_optimization.h>
+
+namespace NYql {
+
+class TDqOptimizationBase: public IDqOptimization {
+public:
+ TExprNode::TPtr RewriteRead(const TExprNode::TPtr& reader, TExprContext& ctx) override;
+ TExprNode::TPtr ApplyExtractMembers(const TExprNode::TPtr& reader, const TExprNode::TPtr& members, TExprContext& ctx) override;
+ TExprNode::TPtr ApplyTakeOrSkip(const TExprNode::TPtr& reader, const TExprNode::TPtr& countBase, TExprContext& ctx) override;
+ TExprNode::TPtr ApplyUnordered(const TExprNode::TPtr& reader, TExprContext& ctx) override;
+ TExprNode::TListType ApplyExtend(const TExprNode::TListType& listOfReader, bool ordered, TExprContext& ctx) override;
+};
+
+} // namespace NYql
diff --git a/ydb/library/yql/providers/common/provider/yql_data_provider_impl.cpp b/ydb/library/yql/providers/common/provider/yql_data_provider_impl.cpp
index f4e8a8ce1b..62d4cb6ae4 100644
--- a/ydb/library/yql/providers/common/provider/yql_data_provider_impl.cpp
+++ b/ydb/library/yql/providers/common/provider/yql_data_provider_impl.cpp
@@ -321,6 +321,10 @@ IDqIntegration* TDataProviderBase::GetDqIntegration() {
return nullptr;
}
+IDqOptimization* TDataProviderBase::GetDqOptimization() {
+ return nullptr;
+}
+
TExprNode::TPtr DefaultCleanupWorld(const TExprNode::TPtr& node, TExprContext& ctx) {
auto root = node;
auto status = OptimizeExpr(root, root, [&](const TExprNode::TPtr& node, TExprContext& ctx) -> TExprNode::TPtr {
diff --git a/ydb/library/yql/providers/common/provider/yql_data_provider_impl.h b/ydb/library/yql/providers/common/provider/yql_data_provider_impl.h
index 445ffa1c18..4c4e88013d 100644
--- a/ydb/library/yql/providers/common/provider/yql_data_provider_impl.h
+++ b/ydb/library/yql/providers/common/provider/yql_data_provider_impl.h
@@ -88,6 +88,7 @@ public:
ITrackableNodeProcessor& GetTrackableNodeProcessor() override;
IGraphTransformer& GetPlanInfoTransformer() override;
IDqIntegration* GetDqIntegration() override;
+ IDqOptimization* GetDqOptimization() override;
protected:
THolder<IGraphTransformer> DefConstraintTransformer_;
diff --git a/ydb/library/yql/providers/dq/opt/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/providers/dq/opt/CMakeLists.darwin-x86_64.txt
index 652509b9b5..d7b3651197 100644
--- a/ydb/library/yql/providers/dq/opt/CMakeLists.darwin-x86_64.txt
+++ b/ydb/library/yql/providers/dq/opt/CMakeLists.darwin-x86_64.txt
@@ -21,6 +21,7 @@ target_link_libraries(providers-dq-opt PUBLIC
yql-utils-log
yql-dq-opt
yql-dq-type_ann
+ yql-dq-integration
library-yql-core
yql-core-peephole_opt
yql-core-type_ann
diff --git a/ydb/library/yql/providers/dq/opt/CMakeLists.linux-aarch64.txt b/ydb/library/yql/providers/dq/opt/CMakeLists.linux-aarch64.txt
index c39d109453..7e6d60a07c 100644
--- a/ydb/library/yql/providers/dq/opt/CMakeLists.linux-aarch64.txt
+++ b/ydb/library/yql/providers/dq/opt/CMakeLists.linux-aarch64.txt
@@ -22,6 +22,7 @@ target_link_libraries(providers-dq-opt PUBLIC
yql-utils-log
yql-dq-opt
yql-dq-type_ann
+ yql-dq-integration
library-yql-core
yql-core-peephole_opt
yql-core-type_ann
diff --git a/ydb/library/yql/providers/dq/opt/CMakeLists.linux-x86_64.txt b/ydb/library/yql/providers/dq/opt/CMakeLists.linux-x86_64.txt
index c39d109453..7e6d60a07c 100644
--- a/ydb/library/yql/providers/dq/opt/CMakeLists.linux-x86_64.txt
+++ b/ydb/library/yql/providers/dq/opt/CMakeLists.linux-x86_64.txt
@@ -22,6 +22,7 @@ target_link_libraries(providers-dq-opt PUBLIC
yql-utils-log
yql-dq-opt
yql-dq-type_ann
+ yql-dq-integration
library-yql-core
yql-core-peephole_opt
yql-core-type_ann
diff --git a/ydb/library/yql/providers/dq/opt/CMakeLists.windows-x86_64.txt b/ydb/library/yql/providers/dq/opt/CMakeLists.windows-x86_64.txt
index 652509b9b5..d7b3651197 100644
--- a/ydb/library/yql/providers/dq/opt/CMakeLists.windows-x86_64.txt
+++ b/ydb/library/yql/providers/dq/opt/CMakeLists.windows-x86_64.txt
@@ -21,6 +21,7 @@ target_link_libraries(providers-dq-opt PUBLIC
yql-utils-log
yql-dq-opt
yql-dq-type_ann
+ yql-dq-integration
library-yql-core
yql-core-peephole_opt
yql-core-type_ann
diff --git a/ydb/library/yql/providers/dq/opt/logical_optimize.cpp b/ydb/library/yql/providers/dq/opt/logical_optimize.cpp
index 90952bd879..64eba04b14 100644
--- a/ydb/library/yql/providers/dq/opt/logical_optimize.cpp
+++ b/ydb/library/yql/providers/dq/opt/logical_optimize.cpp
@@ -5,9 +5,10 @@
#include <ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.h>
#include <ydb/library/yql/providers/common/transform/yql_optimize.h>
#include <ydb/library/yql/dq/opt/dq_opt_join.h>
+#include <ydb/library/yql/dq/integration/yql_dq_optimization.h>
#include <ydb/library/yql/dq/opt/dq_opt_log.h>
-#include <ydb/library/yql/dq/expr_nodes/dq_expr_nodes.h>
#include <ydb/library/yql/dq/opt/dq_opt.h>
+#include <ydb/library/yql/dq/expr_nodes/dq_expr_nodes.h>
#include <ydb/library/yql/core/yql_opt_utils.h>
#include <ydb/library/yql/utils/log/log.h>
#include <ydb/library/yql/core/cbo/cbo_optimizer.h>
@@ -55,6 +56,10 @@ public:
{
#define HNDL(name) "DqsLogical-"#name, Hndl(&TDqsLogicalOptProposalTransformer::name)
AddHandler(0, &TCoUnorderedBase::Match, HNDL(SkipUnordered));
+ AddHandler(0, &TCoUnorderedBase::Match, HNDL(UnorderedOverDqReadWrap));
+ AddHandler(0, &TCoExtractMembers::Match, HNDL(ExtractMembersOverDqReadWrap));
+ AddHandler(0, &TCoCountBase::Match, HNDL(TakeOrSkipOverDqReadWrap));
+ AddHandler(0, &TCoExtendBase::Match, HNDL(ExtendOverDqReadWrap));
AddHandler(0, &TCoAggregateBase::Match, HNDL(RewriteAggregate));
AddHandler(0, &TCoTake::Match, HNDL(RewriteTakeSortToTopSort));
AddHandler(0, &TCoEquiJoin::Match, HNDL(OptimizeEquiJoinWithCosts));
@@ -65,19 +70,168 @@ public:
AddHandler(0, &TCoFlatMapBase::Match, HNDL(FlatMapOverExtend));
AddHandler(0, &TDqQuery::Match, HNDL(MergeQueriesWithSinks));
AddHandler(0, &TCoSqlIn::Match, HNDL(SqlInDropCompact));
+ AddHandler(0, &TDqReadWrapBase::Match, HNDL(DqReadWrapByProvider));
#undef HNDL
}
protected:
TMaybeNode<TExprBase> SkipUnordered(TExprBase node, TExprContext& ctx) {
Y_UNUSED(ctx);
- auto unordered = node.Cast<TCoUnorderedBase>();
+ const auto unordered = node.Cast<TCoUnorderedBase>();
if (unordered.Input().Maybe<TDqConnection>()) {
return unordered.Input();
}
return node;
}
+ TMaybeNode<TExprBase> UnorderedOverDqReadWrap(TExprBase node, TExprContext& ctx, const TGetParents& getParents) const {
+ const auto unordered = node.Cast<TCoUnorderedBase>();
+ if (const auto maybeRead = unordered.Input().Maybe<TDqReadWrapBase>().Input()) {
+ if (Config->EnableDqReplicate.Get().GetOrElse(TDqSettings::TDefault::EnableDqReplicate)) {
+ const TParentsMap* parentsMap = getParents();
+ auto parentsIt = parentsMap->find(unordered.Input().Raw());
+ YQL_ENSURE(parentsIt != parentsMap->cend());
+ if (parentsIt->second.size() > 1) {
+ return node;
+ }
+ }
+ auto providerRead = maybeRead.Cast();
+ if (auto dqOpt = GetDqOptCallback(providerRead)) {
+ auto updatedRead = dqOpt->ApplyUnordered(providerRead.Ptr(), ctx);
+ if (!updatedRead) {
+ return {};
+ }
+ if (updatedRead != providerRead.Ptr()) {
+ return TExprBase(ctx.ChangeChild(unordered.Input().Ref(), TDqReadWrapBase::idx_Input, std::move(updatedRead)));
+ }
+ }
+ }
+
+ return node;
+ }
+
+ TMaybeNode<TExprBase> ExtractMembersOverDqReadWrap(TExprBase node, TExprContext& ctx, const TGetParents& getParents) {
+ auto extract = node.Cast<TCoExtractMembers>();
+ if (const auto maybeRead = extract.Input().Maybe<TDqReadWrap>().Input()) {
+ if (Config->EnableDqReplicate.Get().GetOrElse(TDqSettings::TDefault::EnableDqReplicate)) {
+ const TParentsMap* parentsMap = getParents();
+ auto parentsIt = parentsMap->find(extract.Input().Raw());
+ YQL_ENSURE(parentsIt != parentsMap->cend());
+ if (parentsIt->second.size() > 1) {
+ return node;
+ }
+ }
+
+ auto providerRead = maybeRead.Cast();
+ if (auto dqOpt = GetDqOptCallback(providerRead)) {
+ auto updatedRead = dqOpt->ApplyExtractMembers(providerRead.Ptr(), extract.Members().Ptr(), ctx);
+ if (!updatedRead) {
+ return {};
+ }
+ if (updatedRead != providerRead.Ptr()) {
+ return TExprBase(ctx.ChangeChild(extract.Input().Ref(), TDqReadWrap::idx_Input, std::move(updatedRead)));
+ }
+ }
+ }
+
+ return node;
+ }
+
+ TMaybeNode<TExprBase> TakeOrSkipOverDqReadWrap(TExprBase node, TExprContext& ctx) {
+ auto countBase = node.Cast<TCoCountBase>();
+
+ // TODO: support via precomputes
+ if (!TCoIntegralCtor::Match(countBase.Count().Raw())) {
+ return node;
+ }
+
+ if (const auto maybeRead = countBase.Input().Maybe<TDqReadWrapBase>().Input()) {
+ auto providerRead = maybeRead.Cast();
+ if (auto dqOpt = GetDqOptCallback(providerRead)) {
+ auto updatedRead = dqOpt->ApplyTakeOrSkip(providerRead.Ptr(), countBase.Ptr(), ctx);
+ if (!updatedRead) {
+ return {};
+ }
+ if (updatedRead != providerRead.Ptr()) {
+ return TExprBase(ctx.ChangeChild(countBase.Input().Ref(), TDqReadWrapBase::idx_Input, std::move(updatedRead)));
+ }
+ }
+ }
+
+ return node;
+ }
+
+ TMaybeNode<TExprBase> ExtendOverDqReadWrap(TExprBase node, TExprContext& ctx) const {
+ auto extend = node.Cast<TCoExtendBase>();
+ const bool ordered = node.Maybe<TCoOrderedExtend>().IsValid();
+ const TExprNode* flags = nullptr;
+ const TExprNode* token = nullptr;
+ bool first = true;
+ std::unordered_map<IDqOptimization*, std::vector<std::pair<size_t, TExprNode::TPtr>>> readers;
+ IDqOptimization* prevDqOpt = nullptr;
+ for (size_t i = 0; i < extend.ArgCount(); ++i) {
+ const auto child = extend.Arg(i);
+ if (!TDqReadWrapBase::Match(child.Raw())) {
+ prevDqOpt = nullptr;
+ continue;
+ }
+ auto dqReadWrap = child.Cast<TDqReadWrapBase>();
+
+ if (first) {
+ flags = dqReadWrap.Flags().Raw();
+ token = dqReadWrap.Token().Raw();
+ first = false;
+ } else if (flags != dqReadWrap.Flags().Raw() || token != dqReadWrap.Token().Raw()) {
+ prevDqOpt = nullptr;
+ continue;
+ }
+ IDqOptimization* dqOpt = GetDqOptCallback(dqReadWrap.Input());
+ if (!dqOpt) {
+ prevDqOpt = nullptr;
+ continue;
+ }
+ if (ordered && prevDqOpt != dqOpt) {
+ readers[dqOpt].assign(1, std::make_pair(i, dqReadWrap.Input().Ptr()));
+ } else {
+ readers[dqOpt].emplace_back(i, dqReadWrap.Input().Ptr());
+ }
+ prevDqOpt = dqOpt;
+ }
+
+ if (readers.empty() || AllOf(readers, [](const auto& item) { return item.second.size() < 2; })) {
+ return node;
+ }
+
+ TExprNode::TListType newChildren = extend.Ref().ChildrenList();
+ for (auto& [dqOpt, list]: readers) {
+ if (list.size() > 1) {
+ TExprNode::TListType inReaders;
+ std::transform(list.begin(), list.end(), std::back_inserter(inReaders), [](const auto& item) { return item.second; });
+ TExprNode::TListType outReaders = dqOpt->ApplyExtend(inReaders, ordered, ctx);
+ if (outReaders.empty()) {
+ return {};
+ }
+ if (inReaders != outReaders) {
+ YQL_ENSURE(outReaders.size() <= inReaders.size());
+ }
+ size_t i = 0;
+ for (; i < outReaders.size(); ++i) {
+ newChildren[list[i].first] = ctx.ChangeChild(*newChildren[list[i].first], TDqReadWrapBase::idx_Input, std::move(outReaders[i]));
+ }
+ for (; i < list.size(); ++i) {
+ newChildren[list[i].first] = nullptr;
+ }
+ }
+ }
+ newChildren.erase(std::remove(newChildren.begin(), newChildren.end(), TExprNode::TPtr{}), newChildren.end());
+ YQL_ENSURE(!newChildren.empty());
+ if (newChildren.size() > 1) {
+ return TExprBase(ctx.ChangeChildren(extend.Ref(), std::move(newChildren)));
+ } else {
+ return TExprBase(newChildren.front());
+ }
+ }
+
TMaybeNode<TExprBase> FlatMapOverExtend(TExprBase node, TExprContext& ctx) {
return DqFlatMapOverExtend(node, ctx);
}
@@ -117,7 +271,7 @@ protected:
TMaybeNode<TExprBase> OptimizeEquiJoinWithCosts(TExprBase node, TExprContext& ctx) {
if (TypesCtx.CostBasedOptimizer != ECostBasedOptimizerType::Disable) {
- std::function<void(const TString&)> log = [&](auto str) {
+ std::function<void(const TString&)> log = [&](auto str) {
YQL_CLOG(INFO, ProviderDq) << str;
};
std::function<IOptimizer*(IOptimizer::TInput&&)> factory = [&](auto input) {
@@ -176,6 +330,20 @@ protected:
return DqSqlInDropCompact(node, ctx);
}
+ TMaybeNode<TExprBase> DqReadWrapByProvider(TExprBase node, TExprContext& ctx) const {
+ auto providerRead = node.Cast<TDqReadWrapBase>().Input();
+ if (auto dqOpt = GetDqOptCallback(providerRead)) {
+ auto updatedRead = dqOpt->RewriteRead(providerRead.Ptr(), ctx);
+ if (!updatedRead) {
+ return {};
+ }
+ if (updatedRead != providerRead.Ptr()) {
+ return TExprBase(ctx.ChangeChild(node.Ref(), TDqReadWrapBase::idx_Input, std::move(updatedRead)));
+ }
+ }
+ return node;
+ }
+
private:
TMaybeNode<TExprBase> RewriteAsHoppingWindow(const TExprBase node, TExprContext& ctx, const TDqConnection& input) {
const auto aggregate = node.Cast<TCoAggregate>();
@@ -916,6 +1084,16 @@ private:
return enableWatermarks;
}
+ IDqOptimization* GetDqOptCallback(const TExprBase& providerRead) const {
+ if (providerRead.Ref().ChildrenSize() > 1 && TCoDataSource::Match(providerRead.Ref().Child(1))) {
+ auto dataSourceName = providerRead.Ref().Child(1)->Child(0)->Content();
+ auto datasource = TypesCtx.DataSourceMap.FindPtr(dataSourceName);
+ YQL_ENSURE(datasource);
+ return (*datasource)->GetDqOptimization();
+ }
+ return nullptr;
+ }
+
private:
TDqConfiguration::TPtr Config;
TTypeAnnotationContext& TypesCtx;
diff --git a/ydb/library/yql/providers/dq/opt/ya.make b/ydb/library/yql/providers/dq/opt/ya.make
index 63d88fe7bb..842d413aff 100644
--- a/ydb/library/yql/providers/dq/opt/ya.make
+++ b/ydb/library/yql/providers/dq/opt/ya.make
@@ -8,6 +8,7 @@ PEERDIR(
ydb/library/yql/utils/log
ydb/library/yql/dq/opt
ydb/library/yql/dq/type_ann
+ ydb/library/yql/dq/integration
ydb/library/yql/core
ydb/library/yql/core/peephole_opt
ydb/library/yql/core/type_ann