aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorudovichenko-r <rvu@ydb.tech>2023-11-02 19:24:28 +0300
committerudovichenko-r <rvu@ydb.tech>2023-11-02 19:57:18 +0300
commita528d5d25d42706fe385120b27e1df3a257823fb (patch)
treebaf2feb973b022dc9aa2e01f6740b08b9ca668ef
parent27204b42f48b633cfb3eecc9f13b4331d32516b3 (diff)
downloadydb-a528d5d25d42706fe385120b27e1df3a257823fb.tar.gz
[dq] YT specific optimizers for DQ
YQL-16013
-rw-r--r--ydb/library/yql/providers/dq/opt/logical_optimize.cpp17
-rw-r--r--ydb/library/yql/providers/yt/provider/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/library/yql/providers/yt/provider/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/library/yql/providers/yt/provider/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/library/yql/providers/yt/provider/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/library/yql/providers/yt/provider/ya.make1
-rw-r--r--ydb/library/yql/providers/yt/provider/yql_yt_datasource.cpp7
-rw-r--r--ydb/library/yql/providers/yt/provider/yql_yt_dq_optimize.cpp197
-rw-r--r--ydb/library/yql/providers/yt/provider/yql_yt_dq_optimize.h13
-rw-r--r--ydb/library/yql/providers/yt/provider/yql_yt_logical_optimize.cpp158
-rw-r--r--ydb/library/yql/providers/yt/provider/yql_yt_physical_optimize.cpp97
-rw-r--r--ydb/library/yql/tests/sql/dq_file/part6/canondata/result.json30
-rw-r--r--ydb/library/yql/tests/sql/dq_file/part7/canondata/result.json18
13 files changed, 257 insertions, 285 deletions
diff --git a/ydb/library/yql/providers/dq/opt/logical_optimize.cpp b/ydb/library/yql/providers/dq/opt/logical_optimize.cpp
index f1681187ce..5c2f0628e4 100644
--- a/ydb/library/yql/providers/dq/opt/logical_optimize.cpp
+++ b/ydb/library/yql/providers/dq/opt/logical_optimize.cpp
@@ -71,7 +71,8 @@ public:
AddHandler(0, &TDqQuery::Match, HNDL(MergeQueriesWithSinks));
AddHandler(0, &TCoSqlIn::Match, HNDL(SqlInDropCompact));
AddHandler(0, &TDqReplicate::Match, HNDL(ReplicateFieldSubset));
- AddHandler(0, &TDqReadWrapBase::Match, HNDL(DqReadWrapByProvider));
+
+ AddHandler(1, &TDqReadWrapBase::Match, HNDL(DqReadWrapByProvider));
#undef HNDL
}
@@ -85,9 +86,10 @@ protected:
return node;
}
- TMaybeNode<TExprBase> UnorderedOverDqReadWrap(TExprBase node, TExprContext& ctx, const TGetParents& getParents) const {
+ TMaybeNode<TExprBase> UnorderedOverDqReadWrap(TExprBase node, TExprContext& ctx, const TGetParents& /*getParents*/) const {
const auto unordered = node.Cast<TCoUnorderedBase>();
if (const auto maybeRead = unordered.Input().Maybe<TDqReadWrapBase>().Input()) {
+ /*
if (Config->EnableDqReplicate.Get().GetOrElse(TDqSettings::TDefault::EnableDqReplicate)) {
const TParentsMap* parentsMap = getParents();
auto parentsIt = parentsMap->find(unordered.Input().Raw());
@@ -96,6 +98,7 @@ protected:
return node;
}
}
+ */
auto providerRead = maybeRead.Cast();
if (auto dqOpt = GetDqOptCallback(providerRead)) {
auto updatedRead = dqOpt->ApplyUnordered(providerRead.Ptr(), ctx);
@@ -111,9 +114,10 @@ protected:
return node;
}
- TMaybeNode<TExprBase> ExtractMembersOverDqReadWrap(TExprBase node, TExprContext& ctx, const TGetParents& getParents) {
+ TMaybeNode<TExprBase> ExtractMembersOverDqReadWrap(TExprBase node, TExprContext& ctx, const TGetParents& /*getParents*/) {
auto extract = node.Cast<TCoExtractMembers>();
if (const auto maybeRead = extract.Input().Maybe<TDqReadWrap>().Input()) {
+ /*
if (Config->EnableDqReplicate.Get().GetOrElse(TDqSettings::TDefault::EnableDqReplicate)) {
const TParentsMap* parentsMap = getParents();
auto parentsIt = parentsMap->find(extract.Input().Raw());
@@ -122,7 +126,7 @@ protected:
return node;
}
}
-
+ */
auto providerRead = maybeRead.Cast();
if (auto dqOpt = GetDqOptCallback(providerRead)) {
auto updatedRead = dqOpt->ApplyExtractMembers(providerRead.Ptr(), extract.Members().Ptr(), ctx);
@@ -212,9 +216,10 @@ protected:
if (outReaders.empty()) {
return {};
}
- if (inReaders != outReaders) {
- YQL_ENSURE(outReaders.size() <= inReaders.size());
+ if (inReaders == outReaders) {
+ return node;
}
+ YQL_ENSURE(outReaders.size() <= inReaders.size());
size_t i = 0;
for (; i < outReaders.size(); ++i) {
newChildren[list[i].first] = ctx.ChangeChild(*newChildren[list[i].first], TDqReadWrapBase::idx_Input, std::move(outReaders[i]));
diff --git a/ydb/library/yql/providers/yt/provider/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/providers/yt/provider/CMakeLists.darwin-x86_64.txt
index dd6cc43535..bcd890489a 100644
--- a/ydb/library/yql/providers/yt/provider/CMakeLists.darwin-x86_64.txt
+++ b/ydb/library/yql/providers/yt/provider/CMakeLists.darwin-x86_64.txt
@@ -105,6 +105,7 @@ target_sources(providers-yt-provider PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_table_desc.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_table.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_dq_optimize.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_wide_flow.cpp
)
diff --git a/ydb/library/yql/providers/yt/provider/CMakeLists.linux-aarch64.txt b/ydb/library/yql/providers/yt/provider/CMakeLists.linux-aarch64.txt
index 6d095ca3b0..4fb7023356 100644
--- a/ydb/library/yql/providers/yt/provider/CMakeLists.linux-aarch64.txt
+++ b/ydb/library/yql/providers/yt/provider/CMakeLists.linux-aarch64.txt
@@ -106,6 +106,7 @@ target_sources(providers-yt-provider PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_table_desc.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_table.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_dq_optimize.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_wide_flow.cpp
)
diff --git a/ydb/library/yql/providers/yt/provider/CMakeLists.linux-x86_64.txt b/ydb/library/yql/providers/yt/provider/CMakeLists.linux-x86_64.txt
index 6d095ca3b0..4fb7023356 100644
--- a/ydb/library/yql/providers/yt/provider/CMakeLists.linux-x86_64.txt
+++ b/ydb/library/yql/providers/yt/provider/CMakeLists.linux-x86_64.txt
@@ -106,6 +106,7 @@ target_sources(providers-yt-provider PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_table_desc.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_table.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_dq_optimize.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_wide_flow.cpp
)
diff --git a/ydb/library/yql/providers/yt/provider/CMakeLists.windows-x86_64.txt b/ydb/library/yql/providers/yt/provider/CMakeLists.windows-x86_64.txt
index dd6cc43535..bcd890489a 100644
--- a/ydb/library/yql/providers/yt/provider/CMakeLists.windows-x86_64.txt
+++ b/ydb/library/yql/providers/yt/provider/CMakeLists.windows-x86_64.txt
@@ -105,6 +105,7 @@ target_sources(providers-yt-provider PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_table_desc.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_table.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_dq_optimize.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_wide_flow.cpp
)
diff --git a/ydb/library/yql/providers/yt/provider/ya.make b/ydb/library/yql/providers/yt/provider/ya.make
index c0a858a1b3..744053b65e 100644
--- a/ydb/library/yql/providers/yt/provider/ya.make
+++ b/ydb/library/yql/providers/yt/provider/ya.make
@@ -37,6 +37,7 @@ SRCS(
yql_yt_table_desc.cpp
yql_yt_table.cpp
yql_yt_dq_integration.cpp
+ yql_yt_dq_optimize.cpp
yql_yt_dq_hybrid.cpp
yql_yt_wide_flow.cpp
)
diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_datasource.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_datasource.cpp
index f11e0f1eff..4d96812bab 100644
--- a/ydb/library/yql/providers/yt/provider/yql_yt_datasource.cpp
+++ b/ydb/library/yql/providers/yt/provider/yql_yt_datasource.cpp
@@ -4,6 +4,7 @@
#include "yql_yt_key.h"
#include "yql_yt_op_settings.h"
#include "yql_yt_dq_integration.h"
+#include "yql_yt_dq_optimize.h"
#include <ydb/library/yql/providers/yt/expr_nodes/yql_yt_expr_nodes.h>
#include <ydb/library/yql/providers/result/expr_nodes/yql_res_expr_nodes.h>
@@ -87,6 +88,7 @@ public:
bool collectNodes = mode == EReleaseTempDataMode::Immediate;
return MakeHolder<TYtDataSourceTrackableNodeProcessor>(collectNodes);
})
+ , DqOptimizer_([this]() { return CreateYtDqOptimizers(State_); })
{
}
@@ -483,6 +485,10 @@ public:
return State_->DqIntegration_.Get();
}
+ IDqOptimization* GetDqOptimization() override {
+ return DqOptimizer_.Get();
+ }
+
private:
TExprNode::TPtr InjectUdfRemapperOrView(TYtRead readNode, TExprContext& ctx, bool fromReadSchema) {
const bool weakConcat = NYql::HasSetting(readNode.Arg(4).Ref(), EYtSettingType::WeakConcat);
@@ -845,6 +851,7 @@ private:
TLazyInitHolder<IGraphTransformer> ConstraintTransformer_;
TLazyInitHolder<TExecTransformerBase> ExecTransformer_;
TLazyInitHolder<TYtDataSourceTrackableNodeProcessor> TrackableNodeProcessor_;
+ TLazyInitHolder<IDqOptimization> DqOptimizer_;
};
TIntrusivePtr<IDataProvider> CreateYtDataSource(TYtState::TPtr state) {
diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_dq_optimize.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_dq_optimize.cpp
new file mode 100644
index 0000000000..db75d69c2e
--- /dev/null
+++ b/ydb/library/yql/providers/yt/provider/yql_yt_dq_optimize.cpp
@@ -0,0 +1,197 @@
+#include "yql_yt_dq_optimize.h"
+#include "yql_yt_helpers.h"
+#include "yql_yt_optimize.h"
+
+#include <ydb/library/yql/providers/yt/expr_nodes/yql_yt_expr_nodes.h>
+#include <ydb/library/yql/providers/common/dq/yql_dq_optimization_impl.h>
+#include <ydb/library/yql/core/yql_expr_optimize.h>
+#include <ydb/library/yql/utils/log/log.h>
+
+
+namespace NYql {
+
+using namespace NNodes;
+
+class TYtDqOptimizers: public TDqOptimizationBase {
+public:
+ TYtDqOptimizers(TYtState::TPtr state)
+ : State_(state)
+ {
+ }
+
+ TExprNode::TPtr RewriteRead(const TExprNode::TPtr& reader, TExprContext& ctx) override {
+ if (auto disabledOpts = State_->Configuration->DisableOptimizers.Get(); disabledOpts && disabledOpts->contains(TStringBuilder() << "YtDqOptimizers-" << __FUNCTION__)) {
+ return reader;
+ }
+ auto ytReader = TYtReadTable(reader);
+
+ TYtDSource dataSource = GetDataSource(ytReader, ctx);
+ if (!State_->Configuration->_EnableYtPartitioning.Get(dataSource.Cluster().StringValue()).GetOrElse(false)) {
+ return reader;
+ }
+
+ TSyncMap syncList;
+ auto ret = OptimizeReadWithSettings(reader, false, false, syncList, State_, ctx);
+ YQL_ENSURE(syncList.empty());
+ if (ret && ret != reader) {
+ YQL_CLOG(DEBUG, ProviderYt) << __FUNCTION__;
+ }
+ return ret;
+ }
+
+ TExprNode::TPtr ApplyExtractMembers(const TExprNode::TPtr& reader, const TExprNode::TPtr& members, TExprContext& ctx) override {
+ if (auto disabledOpts = State_->Configuration->DisableOptimizers.Get(); disabledOpts && disabledOpts->contains(TStringBuilder() << "YtDqOptimizers-" << __FUNCTION__)) {
+ return reader;
+ }
+ auto ytReader = TYtReadTable(reader);
+
+ TVector<TYtSection> sections;
+ for (auto section: ytReader.Input()) {
+ sections.push_back(UpdateInputFields(section, TExprBase(members), ctx));
+ }
+ YQL_CLOG(DEBUG, ProviderYt) << __FUNCTION__;
+ return Build<TYtReadTable>(ctx, ytReader.Pos())
+ .InitFrom(ytReader)
+ .Input()
+ .Add(sections)
+ .Build()
+ .Done().Ptr();
+ }
+
+ TExprNode::TPtr ApplyTakeOrSkip(const TExprNode::TPtr& reader, const TExprNode::TPtr& countBase, TExprContext& ctx) override {
+ if (auto disabledOpts = State_->Configuration->DisableOptimizers.Get(); disabledOpts && disabledOpts->contains(TStringBuilder() << "YtDqOptimizers-" << __FUNCTION__)) {
+ return reader;
+ }
+ auto ytReader = TYtReadTable(reader);
+ auto count = TCoCountBase(countBase);
+
+ if (ytReader.Input().Size() != 1) {
+ return reader;
+ }
+
+ TYtDSource dataSource = GetDataSource(ytReader, ctx);
+ TString cluster = dataSource.Cluster().StringValue();
+
+ if (!State_->Configuration->_EnableYtPartitioning.Get(cluster).GetOrElse(false)) {
+ return reader;
+ }
+
+ TSyncMap syncList;
+ if (!IsYtCompleteIsolatedLambda(count.Count().Ref(), syncList, cluster, true, false)) {
+ return reader;
+ }
+
+ TYtSection section = ytReader.Input().Item(0);
+ if (NYql::HasSetting(section.Settings().Ref(), EYtSettingType::Sample)) {
+ return reader;
+ }
+ if (AnyOf(section.Paths(), [](const auto& path) { TYtPathInfo pathInfo(path); return (pathInfo.Table->Meta && pathInfo.Table->Meta->IsDynamic) || pathInfo.Ranges; })) {
+ return reader;
+ }
+
+ YQL_CLOG(DEBUG, ProviderYt) << __FUNCTION__;
+ EYtSettingType settingType = count.Maybe<TCoSkip>() ? EYtSettingType::Skip : EYtSettingType::Take;
+ return Build<TYtReadTable>(ctx, ytReader.Pos())
+ .InitFrom(ytReader)
+ .Input()
+ .Add()
+ .InitFrom(section)
+ .Settings(NYql::AddSetting(section.Settings().Ref(), settingType, count.Count().Ptr(), ctx))
+ .Build()
+ .Build()
+ .Done().Ptr();
+ }
+
+ TExprNode::TPtr ApplyUnordered(const TExprNode::TPtr& reader, TExprContext& ctx) override {
+ if (auto disabledOpts = State_->Configuration->DisableOptimizers.Get(); disabledOpts && disabledOpts->contains(TStringBuilder() << "YtDqOptimizers-" << __FUNCTION__)) {
+ return reader;
+ }
+ auto ytReader = TYtReadTable(reader);
+
+ TExprNode::TListType sections(ytReader.Input().Size());
+ for (auto i = 0U; i < sections.size(); ++i) {
+ sections[i] = MakeUnorderedSection<true>(ytReader.Input().Item(i), ctx).Ptr();
+ }
+
+ YQL_CLOG(DEBUG, ProviderYt) << __FUNCTION__;
+ return Build<TYtReadTable>(ctx, ytReader.Pos())
+ .InitFrom(ytReader)
+ .Input()
+ .Add(std::move(sections))
+ .Build()
+ .Done().Ptr();
+ }
+
+ TExprNode::TListType ApplyExtend(const TExprNode::TListType& listOfReader, bool /*ordered*/, TExprContext& ctx) override {
+ if (auto disabledOpts = State_->Configuration->DisableOptimizers.Get(); disabledOpts && disabledOpts->contains(TStringBuilder() << "YtDqOptimizers-" << __FUNCTION__)) {
+ return listOfReader;
+ }
+
+ // Group readres by cluster/settings
+ THashMap<std::pair<TStringBuf, TExprNode::TPtr>, std::vector<std::pair<size_t, TYtReadTable>>> readers;
+ for (size_t i = 0; i < listOfReader.size(); ++i) {
+ const auto child = listOfReader[i];
+ auto ytRead = TYtReadTable(child);
+ if (ytRead.Input().Size() != 1) {
+ continue;
+ }
+ if (NYql::HasAnySetting(ytRead.Input().Item(0).Settings().Ref(), EYtSettingType::Take | EYtSettingType::Skip)) {
+ continue;
+ }
+ readers[std::make_pair(ytRead.DataSource().Cluster().Value(), ytRead.Input().Item(0).Settings().Ptr())].emplace_back(i, ytRead);
+ }
+
+ if (readers.empty() || AllOf(readers, [](const auto& item) { return item.second.size() < 2; })) {
+ return listOfReader;
+ }
+
+ YQL_CLOG(DEBUG, ProviderYt) << __FUNCTION__;
+
+ TExprNode::TListType newListOfReader = listOfReader;
+ for (auto& item: readers) {
+ if (item.second.size() > 1) {
+ TSyncMap syncList;
+ TVector<TYtPath> paths;
+ for (auto& r: item.second) {
+ if (!r.second.World().Ref().IsWorld()) {
+ syncList.emplace(r.second.World().Ptr(), syncList.size());
+ }
+ paths.insert(paths.end(), r.second.Input().Item(0).Paths().begin(), r.second.Input().Item(0).Paths().end());
+ }
+ const auto ndx = item.second.front().first;
+ const auto& firstYtReader = item.second.front().second;
+ newListOfReader[ndx] = Build<TYtReadTable>(ctx, firstYtReader.Pos())
+ .InitFrom(firstYtReader)
+ .World(ApplySyncListToWorld(ctx.NewWorld(firstYtReader.Pos()), syncList, ctx))
+ .Input()
+ .Add()
+ .Paths()
+ .Add(paths)
+ .Build()
+ .Settings(firstYtReader.Input().Item(0).Settings())
+ .Build()
+ .Build()
+ .Done().Ptr();
+
+ for (size_t i = 1; i < item.second.size(); ++i) {
+ newListOfReader[item.second[i].first] = nullptr;
+ }
+ }
+ }
+
+ newListOfReader.erase(std::remove(newListOfReader.begin(), newListOfReader.end(), TExprNode::TPtr{}), newListOfReader.end());
+ YQL_ENSURE(!newListOfReader.empty());
+
+ return newListOfReader;
+ }
+
+private:
+ TYtState::TPtr State_;
+};
+
+THolder<IDqOptimization> CreateYtDqOptimizers(TYtState::TPtr state) {
+ Y_ABORT_UNLESS(state);
+ return MakeHolder<TYtDqOptimizers>(state);
+}
+
+}
diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_dq_optimize.h b/ydb/library/yql/providers/yt/provider/yql_yt_dq_optimize.h
new file mode 100644
index 0000000000..91bb6c7b98
--- /dev/null
+++ b/ydb/library/yql/providers/yt/provider/yql_yt_dq_optimize.h
@@ -0,0 +1,13 @@
+#pragma once
+
+#include "yql_yt_provider.h"
+
+#include <ydb/library/yql/dq/integration/yql_dq_optimization.h>
+
+#include <util/generic/ptr.h>
+
+namespace NYql {
+
+THolder<IDqOptimization> CreateYtDqOptimizers(TYtState::TPtr state);
+
+}
diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_logical_optimize.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_logical_optimize.cpp
index 13b0525e8e..dbfcd6d135 100644
--- a/ydb/library/yql/providers/yt/provider/yql_yt_logical_optimize.cpp
+++ b/ydb/library/yql/providers/yt/provider/yql_yt_logical_optimize.cpp
@@ -6,7 +6,6 @@
#include <ydb/library/yql/providers/yt/opt/yql_yt_join.h>
#include <ydb/library/yql/providers/yt/opt/yql_yt_key_selector.h>
#include <ydb/library/yql/providers/yt/common/yql_configuration.h>
-#include <ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.h>
#include <ydb/library/yql/providers/common/transform/yql_optimize.h>
#include <ydb/library/yql/providers/common/provider/yql_provider.h>
#include <ydb/library/yql/providers/common/codec/yql_codec_type_flags.h>
@@ -65,16 +64,13 @@ public:
AddHandler(1, &TCoAggregateBase::Match, HNDL(Aggregate));
AddHandler(1, &TCoExtractMembers::Match, HNDL(ExtractMembers));
AddHandler(1, &TCoExtractMembers::Match, HNDL(ExtractMembersOverContent));
- AddHandler(1, &TCoExtractMembers::Match, HNDL(ExtractMembersOverDqWrap));
AddHandler(1, &TCoRight::Match, HNDL(PushdownReadColumns));
AddHandler(1, &TYtTransientOpBase::Match, HNDL(PushdownOpColumns));
AddHandler(1, &TCoCountBase::Match, HNDL(TakeOrSkip));
- AddHandler(1, &TCoCountBase::Match, HNDL(TakeOrSkipOverDqWrap));
AddHandler(1, &TCoEquiJoin::Match, HNDL(SelfInnerJoinWithSameKeys));
AddHandler(1, &TCoExtendBase::Match, HNDL(ExtendOverSameMap));
AddHandler(1, &TCoFlatMapBase::Match, HNDL(FlatMapOverExtend));
AddHandler(1, &TCoTake::Match, HNDL(TakeOverExtend));
- AddHandler(1, &TCoExtendBase::Match, HNDL(ExtendOverDqWrap));
AddHandler(2, &TCoEquiJoin::Match, HNDL(ConvertToCommonTypeForForcedMergeJoin));
AddHandler(2, &TCoShuffleByKeys::Match, HNDL(ShuffleByKeys));
@@ -1331,29 +1327,6 @@ protected:
.Done();
}
- TMaybeNode<TExprBase> ExtractMembersOverDqWrap(TExprBase node, TExprContext& ctx) {
- auto extract = node.Cast<TCoExtractMembers>();
-
- if (auto maybeYtRead = extract.Input().Maybe<TDqReadWrapBase>().Input().Maybe<TYtReadTable>()) {
- auto ytRead = maybeYtRead.Cast();
-
- TVector<TYtSection> sections;
- for (auto section: ytRead.Input()) {
- sections.push_back(UpdateInputFields(section, extract.Members(), ctx));
- }
- auto updatedRead = Build<TYtReadTable>(ctx, ytRead.Pos())
- .InitFrom(ytRead)
- .Input()
- .Add(sections)
- .Build()
- .Done().Ptr();
-
- return TExprBase(ctx.ChangeChild(extract.Input().Ref(), TDqReadWrapBase::idx_Input, std::move(updatedRead)));
- }
-
- return node;
- }
-
TMaybeNode<TExprBase> TakeOrSkip(TExprBase node, TExprContext& ctx) const {
auto countBase = node.Cast<TCoCountBase>();
auto input = countBase.Input();
@@ -1384,59 +1357,6 @@ protected:
return ctx.ChangeChild(countBase.Input().Ref(), TYtTableContent::idx_Input, ConvertContentInputToRead(input, settings, ctx).Ptr());
}
- TMaybeNode<TExprBase> TakeOrSkipOverDqWrap(TExprBase node, TExprContext& ctx) {
- auto countBase = node.Cast<TCoCountBase>();
-
- if (auto maybeYtRead = countBase.Input().Maybe<TDqReadWrapBase>().Input().Maybe<TYtReadTable>()) {
- auto ytRead = maybeYtRead.Cast();
- if (ytRead.Input().Size() != 1) {
- return node;
- }
-
- TYtDSource dataSource = GetDataSource(ytRead, ctx);
- TString cluster = dataSource.Cluster().StringValue();
-
- if (!State_->Configuration->_EnableYtPartitioning.Get(cluster).GetOrElse(false)) {
- return node;
- }
-
- // TODO: support via precomputes
- if (!TCoIntegralCtor::Match(countBase.Count().Raw())) {
- return node;
- }
-
- TSyncMap syncList;
- if (!IsYtCompleteIsolatedLambda(countBase.Count().Ref(), syncList, cluster, true, false)) {
- return node;
- }
-
- TYtSection section = ytRead.Input().Item(0);
- if (NYql::HasSetting(section.Settings().Ref(), EYtSettingType::Sample)) {
- return node;
- }
- if (AnyOf(section.Paths(), [](const auto& path) { TYtPathInfo pathInfo(path); return (pathInfo.Table->Meta && pathInfo.Table->Meta->IsDynamic) || pathInfo.Ranges; })) {
- return node;
- }
-
- EYtSettingType settingType = node.Maybe<TCoSkip>() ? EYtSettingType::Skip : EYtSettingType::Take;
-
- auto updatedRead = Build<TYtReadTable>(ctx, ytRead.Pos())
- .InitFrom(ytRead)
- .Input()
- .Add()
- .InitFrom(section)
- .Settings(NYql::AddSetting(section.Settings().Ref(), settingType, countBase.Count().Ptr(), ctx))
- .Build()
- .Build()
- .Done().Ptr();
-
- return TExprBase(ctx.ChangeChild(countBase.Input().Ref(), TDqReadWrapBase::idx_Input, std::move(updatedRead)));
-
- }
-
- return node;
- }
-
TMaybeNode<TExprBase> BypassCopy(TExprBase node, TExprContext& ctx, const TGetParents& getParents) const {
auto srcOut = node.Cast<TYtOutput>();
auto maybeCopy = srcOut.Operation().Maybe<TYtCopy>();
@@ -1713,69 +1633,6 @@ protected:
.Done();
}
- TMaybeNode<TExprBase> ExtendOverDqWrap(TExprBase node, TExprContext& ctx) const {
- auto extend = node.Cast<TCoExtendBase>();
- const TExprNode* flags = nullptr;
- const TExprNode* token = nullptr;
- TString cluster;
- const TExprNode* world = nullptr;
- bool first = true;
- TVector<TYtPath> paths;
- TExprNode::TListType nonDQ;
- TMaybe<TExprBase> anyDQ;
- for (auto child: extend) {
- if (!TDqReadWrapBase::Match(child.Raw())) {
- nonDQ.push_back(child.Ptr());
- continue;
- }
- auto dqReadWrap = child.Cast<TDqReadWrapBase>();
- if (!dqReadWrap.Input().Maybe<TYtReadTable>()) {
- nonDQ.push_back(child.Ptr());
- continue;
- }
- auto ytRead = dqReadWrap.Input().Cast<TYtReadTable>();
- if (ytRead.Input().Size() != 1 || ytRead.Input().Item(0).Settings().Size() != 0) {
- nonDQ.push_back(child.Ptr());
- continue;
- }
-
- if (first) {
- flags = dqReadWrap.Flags().Raw();
- token = dqReadWrap.Token().Raw();
- cluster = ytRead.DataSource().Cluster().StringValue();
- world = ytRead.World().Raw();
- first = false;
- } else if (flags != dqReadWrap.Flags().Raw() || token != dqReadWrap.Token().Raw() || cluster != ytRead.DataSource().Cluster().Value() || world != ytRead.World().Raw()) {
- nonDQ.push_back(child.Ptr());
- continue;
- }
- anyDQ = child;
- paths.insert(paths.end(), ytRead.Input().Item(0).Paths().begin(), ytRead.Input().Item(0).Paths().end());
- }
- if (!anyDQ || extend.Ref().ChildrenSize() - nonDQ.size() < 2 || (nonDQ.size() && node.Maybe<TCoOrderedExtend>())) {
- return node;
- }
- auto dqReadWrap = *anyDQ;
- auto newRead = Build<TYtReadTable>(ctx, extend.Pos())
- .InitFrom(dqReadWrap.Cast<TDqReadWrapBase>().Input().Cast<TYtReadTable>())
- .Input()
- .Add()
- .Paths()
- .Add(paths)
- .Build()
- .Settings()
- .Build()
- .Build()
- .Build()
- .Done().Ptr();
- if (!nonDQ.empty()) {
- nonDQ.push_back(ctx.ChangeChild(dqReadWrap.Ref(), TDqReadWrapBase::idx_Input, std::move(newRead)));
- return TExprBase(ctx.ChangeChildren(extend.Ref(), std::move(nonDQ)));
- }
-
- return TExprBase(ctx.ChangeChild(dqReadWrap.Ref(), TDqReadWrapBase::idx_Input, std::move(newRead)));
- }
-
TMaybeNode<TExprBase> DirectRowInFlatMap(TExprBase node, TExprContext& ctx) const {
if (State_->Configuration->UseSystemColumns.Get().GetOrElse(DEFAULT_USE_SYS_COLUMNS)) {
return node;
@@ -2610,21 +2467,6 @@ protected:
.Build()
.Done();
}
- } else if (const auto maybeYtRead = unordered.Input().Maybe<TDqReadWrapBase>().Input().Maybe<TYtReadTable>()) {
- const auto ytRead = maybeYtRead.Cast();
- TExprNode::TListType sections(ytRead.Input().Size());
- for (auto i = 0U; i < sections.size(); ++i) {
- sections[i] = MakeUnorderedSection<true>(ytRead.Input().Item(i), ctx).Ptr();
- }
-
- auto updatedRead = Build<TYtReadTable>(ctx, ytRead.Pos())
- .InitFrom(ytRead)
- .Input()
- .Add(std::move(sections))
- .Build()
- .Done().Ptr();
-
- return TExprBase(ctx.ChangeChild(unordered.Input().Ref(), TDqReadWrapBase::idx_Input, std::move(updatedRead)));
}
return node;
diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_physical_optimize.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_physical_optimize.cpp
index 9d80f80afd..d28baa9ef5 100644
--- a/ydb/library/yql/providers/yt/provider/yql_yt_physical_optimize.cpp
+++ b/ydb/library/yql/providers/yt/provider/yql_yt_physical_optimize.cpp
@@ -83,7 +83,6 @@ public:
} else {
AddHandler(0, Names({TYtMap::CallableName(), TYtMapReduce::CallableName()}), HNDL(ExtractKeyRangeLegacy));
}
- AddHandler(0, &TCoExtendBase::Match, HNDL(ExtendDqReadWraps));
AddHandler(0, &TCoExtendBase::Match, HNDL(Extend));
AddHandler(0, &TCoAssumeSorted::Match, HNDL(AssumeSorted));
AddHandler(0, &TYtMapReduce::Match, HNDL(AddTrivialMapperForNativeYtTypes));
@@ -104,7 +103,6 @@ public:
AddHandler(1, &TYtOutputOpBase::Match, HNDL(TableContentWithSettings));
AddHandler(1, &TYtOutputOpBase::Match, HNDL(NonOptimalTableContent));
AddHandler(1, &TCoRight::Match, HNDL(ReadWithSettings));
- AddHandler(1, &TDqReadWrapBase::Match, HNDL(DqReadWrapWithSettings));
AddHandler(1, &TYtTransientOpBase::Match, HNDL(PushDownKeyExtract));
AddHandler(1, &TYtTransientOpBase::Match, HNDL(TransientOpWithSettings));
AddHandler(1, &TYtSort::Match, HNDL(TopSort));
@@ -3456,77 +3454,6 @@ private:
return KeepColumnOrder(res.Ptr(), node.Ref(), ctx, *State_->Types);
}
- TMaybeNode<TExprBase> ExtendDqReadWraps(TExprBase node, TExprContext& ctx) const {
- auto extend = node.Cast<TCoExtendBase>();
-
- // TODO: group TYtReadTable by token/settings:
- // (Merge (DqReadWrap YtReadTable_group1) (DqReadWrap YtReadTable_group1) (DqReadWrap OtherReads))
-
- TExprNode::TPtr dqReadWrapNode;
- TExprNode::TPtr dataSource;
- TVector<TExprBase> worlds;
- TVector<TYtPath> paths;
- TString prevToken;
- TExprNode::TPtr settings;
-
- for (auto child: extend) {
- auto maybeDqReadWrap = child.Maybe<TDqReadWrap>();
- if (maybeDqReadWrap && !maybeDqReadWrap.Cast().Flags().Size()) {
- auto dqReadWrap = maybeDqReadWrap.Cast();
- auto maybeYtReadTable = dqReadWrap.Input().Maybe<TYtReadTable>();
- auto token = dqReadWrap.Token();
- TStringBuf tokenStr = token.IsValid() ? token.Name().Cast().Value() : TStringBuf();
- if (prevToken && prevToken != tokenStr) {
- return node;
- }
- dqReadWrapNode = dqReadWrap.Ptr();
- if (maybeYtReadTable) {
- auto ytReadTable = maybeYtReadTable.Cast();
-
- if (ytReadTable.Input().Size() != 1) {
- return node;
- }
-
- auto section = ytReadTable.Input().Item(0);
- if (settings && settings != section.Settings().Ptr()) {
- return node;
- }
- if (dataSource && dataSource != ytReadTable.DataSource().Ptr()) {
- return node;
- }
- dataSource = ytReadTable.DataSource().Ptr();
- settings = section.Settings().Ptr();
- paths.insert(paths.end(), section.Paths().begin(), section.Paths().end());
- worlds.push_back(ytReadTable.World());
- } else {
- return node;
- }
- prevToken = tokenStr;
- } else {
- return node;
- }
- }
-
- auto newWorld = Build<TCoSync>(ctx, node.Pos()).Add(worlds).Done();
-
- TYtReadTable read = Build<TYtReadTable>(ctx, node.Pos())
- .World(newWorld)
- .DataSource(dataSource)
- .Input()
- .Add()
- .Paths().Add(paths).Build()
- .Settings(settings)
- .Build()
- .Build()
- .Done();
-
- return Build<TDqReadWrap>(ctx, node.Pos())
- .Input(read)
- .Flags().Build()
- .Token(TExprBase(dqReadWrapNode).Cast<TDqReadWrap>().Token())
- .Done();
- }
-
TMaybeNode<TExprBase> Extend(TExprBase node, TExprContext& ctx) const {
auto extend = node.Cast<TCoExtendBase>();
@@ -5971,30 +5898,6 @@ private:
return node;
}
- TMaybeNode<TExprBase> DqReadWrapWithSettings(TExprBase node, TExprContext& ctx) const {
- auto maybeRead = node.Cast<TDqReadWrapBase>().Input().Maybe<TYtReadTable>();
- if (!maybeRead) {
- return node;
- }
-
- TYtDSource dataSource = GetDataSource(maybeRead.Cast(), ctx);
- if (!State_->Configuration->_EnableYtPartitioning.Get(dataSource.Cluster().StringValue()).GetOrElse(false)) {
- return node;
- }
-
- auto read = maybeRead.Cast().Ptr();
- TSyncMap syncList;
- auto ret = OptimizeReadWithSettings(read, false, false, syncList, State_, ctx);
- if (ret != read) {
- if (ret) {
- YQL_ENSURE(syncList.empty());
- return TExprBase(ctx.ChangeChild(node.Ref(), TDqReadWrapBase::idx_Input, std::move(ret)));
- }
- return {};
- }
- return node;
- }
-
TMaybeNode<TExprBase> TransientOpWithSettings(TExprBase node, TExprContext& ctx) const {
auto op = node.Cast<TYtTransientOpBase>();
diff --git a/ydb/library/yql/tests/sql/dq_file/part6/canondata/result.json b/ydb/library/yql/tests/sql/dq_file/part6/canondata/result.json
index 4c7c031481..0d09422187 100644
--- a/ydb/library/yql/tests/sql/dq_file/part6/canondata/result.json
+++ b/ydb/library/yql/tests/sql/dq_file/part6/canondata/result.json
@@ -3535,23 +3535,23 @@
],
"test.test[optimizers-group_visit_lambdas--Analyze]": [
{
- "checksum": "dbd558e60768523c0770d01fafdf02fd",
- "size": 10096,
- "uri": "https://storage.yandex-team.ru/get-devtools/1903885/92966575648953dd33617f86cb7d011f63776c02/resource.tar.gz#test.test_optimizers-group_visit_lambdas--Analyze_/plan.txt"
+ "checksum": "f67d1134a9741f02bf7fd74e3741e806",
+ "size": 8538,
+ "uri": "https://storage.yandex-team.ru/get-devtools/1942173/2b6d37b434944472410a121082ca65dee724c848/resource.tar.gz#test.test_optimizers-group_visit_lambdas--Analyze_/plan.txt"
}
],
"test.test[optimizers-group_visit_lambdas--Debug]": [
{
- "checksum": "67b0359fa47918adc10cd9243eb343ea",
- "size": 4137,
- "uri": "https://storage.yandex-team.ru/get-devtools/1599023/4953cf4336fe0f36631097ce0dd29ae39daef2ac/resource.tar.gz#test.test_optimizers-group_visit_lambdas--Debug_/opt.yql_patched"
+ "checksum": "125e2c294928f783c1a47a5bc59455a9",
+ "size": 3885,
+ "uri": "https://storage.yandex-team.ru/get-devtools/1942415/7197d6f538e589afc0cd6fe1285c07fd138fb450/resource.tar.gz#test.test_optimizers-group_visit_lambdas--Debug_/opt.yql_patched"
}
],
"test.test[optimizers-group_visit_lambdas--Plan]": [
{
- "checksum": "dbd558e60768523c0770d01fafdf02fd",
- "size": 10096,
- "uri": "https://storage.yandex-team.ru/get-devtools/1942671/bb537deceaffedf5a0b76b9b9b6eef9131895ce9/resource.tar.gz#test.test_optimizers-group_visit_lambdas--Plan_/plan.txt"
+ "checksum": "f67d1134a9741f02bf7fd74e3741e806",
+ "size": 8538,
+ "uri": "https://storage.yandex-team.ru/get-devtools/1942173/2b6d37b434944472410a121082ca65dee724c848/resource.tar.gz#test.test_optimizers-group_visit_lambdas--Plan_/plan.txt"
}
],
"test.test[optimizers-group_visit_lambdas--Results]": [],
@@ -5508,16 +5508,16 @@
],
"test.test[window-row_number_no_part_multi_input-default.txt-Debug]": [
{
- "checksum": "5fdeb5b450d9ece72edf13994cc2d0fd",
- "size": 4237,
- "uri": "https://storage.yandex-team.ru/get-devtools/1814674/64e60e91443ad3deb3ec9f35bbd96fa975cf27f9/resource.tar.gz#test.test_window-row_number_no_part_multi_input-default.txt-Debug_/opt.yql_patched"
+ "checksum": "9e7aa3db17638cc6db0cb597e79157a9",
+ "size": 4016,
+ "uri": "https://storage.yandex-team.ru/get-devtools/1942415/7197d6f538e589afc0cd6fe1285c07fd138fb450/resource.tar.gz#test.test_window-row_number_no_part_multi_input-default.txt-Debug_/opt.yql_patched"
}
],
"test.test[window-row_number_no_part_multi_input-default.txt-Plan]": [
{
- "checksum": "eaf54514644eb358ca323c8011a792c5",
- "size": 12630,
- "uri": "https://storage.yandex-team.ru/get-devtools/1942671/4db54c8ba9dedccdc8391210d1657c5ca4bd34ec/resource.tar.gz#test.test_window-row_number_no_part_multi_input-default.txt-Plan_/plan.txt"
+ "checksum": "834e185938e25a9a228ab14b7e9cd4df",
+ "size": 11231,
+ "uri": "https://storage.yandex-team.ru/get-devtools/1942173/2b6d37b434944472410a121082ca65dee724c848/resource.tar.gz#test.test_window-row_number_no_part_multi_input-default.txt-Plan_/plan.txt"
}
],
"test.test[window-row_number_no_part_multi_input-default.txt-Results]": [],
diff --git a/ydb/library/yql/tests/sql/dq_file/part7/canondata/result.json b/ydb/library/yql/tests/sql/dq_file/part7/canondata/result.json
index 033d81ef7c..4ce6b3959f 100644
--- a/ydb/library/yql/tests/sql/dq_file/part7/canondata/result.json
+++ b/ydb/library/yql/tests/sql/dq_file/part7/canondata/result.json
@@ -2996,23 +2996,23 @@
"test.test[key_filter-lambda_with_null_filter--Results]": [],
"test.test[key_filter-mixed_sort--Analyze]": [
{
- "checksum": "82bd104b7140df61dd8704b8cbdf285b",
- "size": 12814,
- "uri": "https://storage.yandex-team.ru/get-devtools/1931696/0a5f01ad7bf7c863b92eab0e8aff7f87ecb60e51/resource.tar.gz#test.test_key_filter-mixed_sort--Analyze_/plan.txt"
+ "checksum": "a3f0ef78e59682e3c13e237f4ae1ed53",
+ "size": 9378,
+ "uri": "https://storage.yandex-team.ru/get-devtools/1773845/f96aa61ced92a0dfdc80e9b0bdf6f2a5cc2d3078/resource.tar.gz#test.test_key_filter-mixed_sort--Analyze_/plan.txt"
}
],
"test.test[key_filter-mixed_sort--Debug]": [
{
- "checksum": "86dd8a54da71064e756cc19f63330d09",
- "size": 4330,
- "uri": "https://storage.yandex-team.ru/get-devtools/1931696/0a5f01ad7bf7c863b92eab0e8aff7f87ecb60e51/resource.tar.gz#test.test_key_filter-mixed_sort--Debug_/opt.yql_patched"
+ "checksum": "9e78d689b65f563bedaee3d0aafbb228",
+ "size": 3796,
+ "uri": "https://storage.yandex-team.ru/get-devtools/1773845/f96aa61ced92a0dfdc80e9b0bdf6f2a5cc2d3078/resource.tar.gz#test.test_key_filter-mixed_sort--Debug_/opt.yql_patched"
}
],
"test.test[key_filter-mixed_sort--Plan]": [
{
- "checksum": "82bd104b7140df61dd8704b8cbdf285b",
- "size": 12814,
- "uri": "https://storage.yandex-team.ru/get-devtools/1937367/24a3ed09a524cab36402a50f39546eeec677142d/resource.tar.gz#test.test_key_filter-mixed_sort--Plan_/plan.txt"
+ "checksum": "a3f0ef78e59682e3c13e237f4ae1ed53",
+ "size": 9378,
+ "uri": "https://storage.yandex-team.ru/get-devtools/1773845/f96aa61ced92a0dfdc80e9b0bdf6f2a5cc2d3078/resource.tar.gz#test.test_key_filter-mixed_sort--Plan_/plan.txt"
}
],
"test.test[key_filter-mixed_sort--Results]": [],