aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorssmike <ssmike@ydb.tech>2023-03-20 15:39:27 +0300
committerssmike <ssmike@ydb.tech>2023-03-20 15:39:27 +0300
commit9f02f8dc82cbb72bff77b003b2ca8d79c52792a3 (patch)
tree97df8c549df0ffeec7b3dedf621df142774bd774
parentaac6932b47498fd2f8304a434d80e368484dece6 (diff)
downloadydb-9f02f8dc82cbb72bff77b003b2ca8d79c52792a3.tar.gz
Prepare to enable extract predicate
-rw-r--r--ydb/core/kqp/compile_service/kqp_compile_actor.cpp2
-rw-r--r--ydb/core/kqp/compile_service/kqp_compile_service.cpp7
-rw-r--r--ydb/core/kqp/opt/kqp_opt_kql.cpp4
-rw-r--r--ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp12
-rw-r--r--ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp2
-rw-r--r--ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp2
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_settings.h2
-rw-r--r--ydb/core/kqp/ut/opt/kqp_ne_ut.cpp36
-rw-r--r--ydb/core/kqp/ut/scan/kqp_scan_ut.cpp14
-rw-r--r--ydb/core/kqp/ut/scan/kqp_split_ut.cpp4
-rw-r--r--ydb/core/protos/config.proto6
-rw-r--r--ydb/core/testlib/basics/feature_flags.h1
-rw-r--r--ydb/library/yql/dq/opt/dq_opt.cpp20
-rw-r--r--ydb/library/yql/dq/opt/dq_opt.h2
-rw-r--r--ydb/library/yql/dq/opt/dq_opt_phy.cpp19
15 files changed, 78 insertions, 55 deletions
diff --git a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp
index c66f0817b5f..605f64fdb7c 100644
--- a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp
+++ b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp
@@ -352,6 +352,8 @@ void ApplyServiceConfig(TKikimrConfiguration& kqpConfig, const TTableServiceConf
kqpConfig.EnableKqpScanQueryStreamLookup = serviceConfig.GetEnableKqpScanQueryStreamLookup();
kqpConfig.EnableKqpDataQueryStreamPointLookup = serviceConfig.GetEnableKqpDataQueryStreamPointLookup();
kqpConfig.EnableKqpScanQueryStreamIdxLookupJoin = serviceConfig.GetEnableKqpScanQueryStreamIdxLookupJoin();
+ kqpConfig.EnablePredicateExtractForDataQuery = serviceConfig.GetEnablePredicateExtractForDataQueries();
+ kqpConfig.EnablePredicateExtractForScanQuery = serviceConfig.GetEnablePredicateExtractForScanQueries();
}
IActor* CreateKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstPtr& kqpSettings,
diff --git a/ydb/core/kqp/compile_service/kqp_compile_service.cpp b/ydb/core/kqp/compile_service/kqp_compile_service.cpp
index 711b9e927bc..0949fc95a1a 100644
--- a/ydb/core/kqp/compile_service/kqp_compile_service.cpp
+++ b/ydb/core/kqp/compile_service/kqp_compile_service.cpp
@@ -368,6 +368,9 @@ private:
bool enableKqpDataQuerySourceRead = Config.GetEnableKqpDataQuerySourceRead();
bool enableKqpScanQuerySourceRead = Config.GetEnableKqpScanQuerySourceRead();
+ bool enableKqpDataQueryPredicateExtract = Config.GetEnablePredicateExtractForDataQueries();
+ bool enableKqpScanQueryPredicateExtract = Config.GetEnablePredicateExtractForScanQueries();
+
Config.Swap(event.MutableConfig()->MutableTableServiceConfig());
LOG_INFO(*TlsActivationContext, NKikimrServices::KQP_COMPILE_SERVICE, "Updated config");
@@ -378,7 +381,9 @@ private:
Config.GetEnableKqpScanQueryStreamLookup() != enableKqpScanQueryStreamLookup ||
Config.GetEnableKqpScanQueryStreamIdxLookupJoin() != enableKqpScanQueryStreamIdxLookupJoin ||
Config.GetEnableKqpDataQuerySourceRead() != enableKqpDataQuerySourceRead ||
- Config.GetEnableKqpScanQuerySourceRead() != enableKqpScanQuerySourceRead) {
+ Config.GetEnableKqpScanQuerySourceRead() != enableKqpScanQuerySourceRead ||
+ Config.GetEnablePredicateExtractForDataQueries() != enableKqpDataQueryPredicateExtract ||
+ Config.GetEnablePredicateExtractForScanQueries() != enableKqpScanQueryPredicateExtract) {
LOG_NOTICE_S(*TlsActivationContext, NKikimrServices::KQP_COMPILE_SERVICE,
"Iterator read flags was changed. StreamLookup from " << enableKqpDataQueryStreamLookup <<
diff --git a/ydb/core/kqp/opt/kqp_opt_kql.cpp b/ydb/core/kqp/opt/kqp_opt_kql.cpp
index 957255b6710..1e03a8de445 100644
--- a/ydb/core/kqp/opt/kqp_opt_kql.cpp
+++ b/ydb/core/kqp/opt/kqp_opt_kql.cpp
@@ -88,11 +88,11 @@ bool UseReadTableRanges(const TKikimrTableDescription& tableData, const TIntrusi
return predicateExtractSetting == EOptionalFlag::Enabled;
}
- if (kqpCtx->IsScanQuery() && kqpCtx->Config->FeatureFlags.GetEnablePredicateExtractForScanQueries()) {
+ if (kqpCtx->IsScanQuery() && kqpCtx->Config->EnablePredicateExtractForScanQuery) {
return true;
}
- if (kqpCtx->IsDataQuery() && kqpCtx->Config->FeatureFlags.GetEnablePredicateExtractForDataQueries()) {
+ if (kqpCtx->IsDataQuery() && kqpCtx->Config->EnablePredicateExtractForDataQuery) {
return true;
}
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp
index 37f9c2e9e39..107cc3d410b 100644
--- a/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp
+++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp
@@ -312,6 +312,7 @@ TExprBase KqpBuildReadTableStage(TExprBase node, TExprContext& ctx, const TKqpOp
.Done();
}
+
TExprBase KqpBuildReadTableRangesStage(TExprBase node, TExprContext& ctx,
const TKqpOptimizeContext& kqpCtx, const TParentsMap& parents)
{
@@ -357,7 +358,12 @@ TExprBase KqpBuildReadTableRangesStage(TExprBase node, TExprContext& ctx,
.BuildNode(ctx, read.Pos()))
.Done();
} else {
- auto connections = FindDqConnections(node);
+ TVector<TDqConnection> connections;
+ bool isPure;
+ FindDqConnections(node, connections, isPure);
+ if (!isPure) {
+ return node;
+ }
YQL_ENSURE(!connections.empty());
TVector<TDqConnection> inputs;
TVector<TExprBase> stageInputs;
@@ -370,8 +376,8 @@ TExprBase KqpBuildReadTableRangesStage(TExprBase node, TExprContext& ctx,
return node;
}
- if (!IsSingleConsumerConnection(input, parents, false)) {
- continue;
+ if (!IsSingleConsumerConnection(input, parents, true)) {
+ return node;
}
inputs.push_back(input);
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp
index 333f78d3f0f..a32ef61a338 100644
--- a/ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp
+++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp
@@ -176,7 +176,7 @@ TExprBase KqpApplyLimitToReadTable(TExprBase node, TExprContext& ctx, const TKqp
auto input = maybeSkip ? maybeSkip.Cast().Input() : take.Input();
bool isReadTable = input.Maybe<TKqpReadTable>().IsValid();
- bool isReadTableRanges = input.Maybe<TKqlReadTableRangesBase>().IsValid();
+ bool isReadTableRanges = input.Maybe<TKqpReadTableRanges>().IsValid() || input.Maybe<TKqpReadOlapTableRanges>().IsValid() ;
if (!isReadTable && !isReadTableRanges) {
return node;
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp
index a5718063ea2..4b09bf06556 100644
--- a/ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp
+++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp
@@ -169,7 +169,7 @@ TExprBase KqpRemoveRedundantSortByPk(TExprBase node, TExprContext& ctx, const TK
return KqpRemoveRedundantSortByPkBase(node, ctx, kqpCtx,
[&](TExprBase input) -> TMaybe<TTableData> {
bool isReadTable = input.Maybe<TKqpReadTable>().IsValid();
- bool isReadTableRanges = input.Maybe<TKqlReadTableRangesBase>().IsValid();
+ bool isReadTableRanges = input.Maybe<TKqpReadTableRanges>().IsValid() || input.Maybe<TKqpReadOlapTableRanges>().IsValid() ;
if (!isReadTable && !isReadTableRanges) {
return Nothing();
}
diff --git a/ydb/core/kqp/provider/yql_kikimr_settings.h b/ydb/core/kqp/provider/yql_kikimr_settings.h
index f10ddec1a8d..08127e4c3d8 100644
--- a/ydb/core/kqp/provider/yql_kikimr_settings.h
+++ b/ydb/core/kqp/provider/yql_kikimr_settings.h
@@ -135,6 +135,8 @@ struct TKikimrConfiguration : public TKikimrSettings, public NCommon::TSettingDi
bool EnableKqpDataQueryStreamLookup = false;
bool EnableKqpDataQueryStreamPointLookup = false;
bool EnableKqpScanQueryStreamIdxLookupJoin = false;
+ bool EnablePredicateExtractForScanQuery = true;
+ bool EnablePredicateExtractForDataQuery = false;
};
}
diff --git a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp
index e4f54b8ba98..81acae2b567 100644
--- a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp
+++ b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp
@@ -3273,8 +3273,10 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
}
Y_UNIT_TEST(PushFlatmapInnerConnectionsToStageInput) {
+ NKikimrConfig::TAppConfig app;
+ app.MutableTableServiceConfig()->SetEnablePredicateExtractForDataQueries(true);
auto settings = TKikimrSettings()
- .SetEnablePredicateExtractForDataQueries(false);
+ .SetAppConfig(app);
TKikimrRunner kikimr{settings};
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
@@ -3376,8 +3378,10 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
}
Y_UNIT_TEST(MultiUsageInnerConnection) {
+ NKikimrConfig::TAppConfig app;
+ app.MutableTableServiceConfig()->SetEnablePredicateExtractForDataQueries(true);
auto settings = TKikimrSettings()
- .SetEnablePredicateExtractForDataQueries(false);
+ .SetAppConfig(app);
TKikimrRunner kikimr{settings};
auto db = kikimr.GetTableClient();
@@ -3480,8 +3484,10 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
}
Y_UNIT_TEST(FlatmapLambdaMutiusedConnections) {
+ NKikimrConfig::TAppConfig app;
+ app.MutableTableServiceConfig()->SetEnablePredicateExtractForDataQueries(true);
auto settings = TKikimrSettings()
- .SetEnablePredicateExtractForDataQueries(false);
+ .SetAppConfig(app);
TKikimrRunner kikimr{settings};
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
@@ -3526,8 +3532,10 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
}
Y_UNIT_TEST(FlatMapLambdaInnerPrecompute) {
+ NKikimrConfig::TAppConfig app;
+ app.MutableTableServiceConfig()->SetEnablePredicateExtractForDataQueries(true);
auto settings = TKikimrSettings()
- .SetEnablePredicateExtractForDataQueries(false);
+ .SetAppConfig(app);
TKikimrRunner kikimr{settings};
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
@@ -3549,10 +3557,8 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
TKikimrSettings settings;
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(true);
+ appConfig.MutableTableServiceConfig()->SetEnablePredicateExtractForDataQueries(true);
settings.SetDomainRoot(KikimrDefaultUtDomainRoot);
- TFeatureFlags flags;
- flags.SetEnablePredicateExtractForDataQueries(true);
- settings.SetFeatureFlags(flags);
settings.SetAppConfig(appConfig);
TKikimrRunner kikimr(settings);
@@ -3582,9 +3588,7 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(true);
settings.SetDomainRoot(KikimrDefaultUtDomainRoot);
- TFeatureFlags flags;
- flags.SetEnablePredicateExtractForDataQueries(true);
- settings.SetFeatureFlags(flags);
+ appConfig.MutableTableServiceConfig()->SetEnablePredicateExtractForDataQueries(true);
settings.SetAppConfig(appConfig);
TKikimrRunner kikimr(settings);
@@ -3605,9 +3609,7 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(true);
settings.SetDomainRoot(KikimrDefaultUtDomainRoot);
- TFeatureFlags flags;
- flags.SetEnablePredicateExtractForDataQueries(true);
- settings.SetFeatureFlags(flags);
+ appConfig.MutableTableServiceConfig()->SetEnablePredicateExtractForDataQueries(true);
settings.SetAppConfig(appConfig);
TKikimrRunner kikimr(settings);
@@ -3638,10 +3640,8 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
TKikimrSettings settings;
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(true);
+ appConfig.MutableTableServiceConfig()->SetEnablePredicateExtractForDataQueries(true);
settings.SetDomainRoot(KikimrDefaultUtDomainRoot);
- TFeatureFlags flags;
- flags.SetEnablePredicateExtractForDataQueries(true);
- settings.SetFeatureFlags(flags);
settings.SetAppConfig(appConfig);
TKikimrRunner kikimr(settings);
@@ -3668,10 +3668,8 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
TKikimrSettings settings;
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(true);
+ appConfig.MutableTableServiceConfig()->SetEnablePredicateExtractForDataQueries(true);
settings.SetDomainRoot(KikimrDefaultUtDomainRoot);
- TFeatureFlags flags;
- flags.SetEnablePredicateExtractForDataQueries(true);
- settings.SetFeatureFlags(flags);
settings.SetAppConfig(appConfig);
TKikimrRunner kikimr(settings);
diff --git a/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp b/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp
index c5b85b3cab6..f2797e3df87 100644
--- a/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp
+++ b/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp
@@ -1137,7 +1137,7 @@ Y_UNIT_TEST_SUITE(KqpScan) {
Y_UNIT_TEST_TWIN(PrunePartitionsByLiteral, WithPredicatesExtract) {
auto cfg = AppCfg();
- cfg.MutableFeatureFlags()->SetEnablePredicateExtractForScanQueries(WithPredicatesExtract);
+ cfg.MutableTableServiceConfig()->SetEnablePredicateExtractForScanQueries(WithPredicatesExtract);
auto kikimr = DefaultKikimrRunner({}, cfg);
auto db = kikimr.GetTableClient();
@@ -1979,10 +1979,8 @@ Y_UNIT_TEST_SUITE(KqpScan) {
TKikimrSettings settings;
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableKqpScanQuerySourceRead(true);
+ appConfig.MutableTableServiceConfig()->SetEnablePredicateExtractForScanQueries(true);
settings.SetDomainRoot(KikimrDefaultUtDomainRoot);
- TFeatureFlags flags;
- flags.SetEnablePredicateExtractForScanQueries(true);
- settings.SetFeatureFlags(flags);
settings.SetAppConfig(appConfig);
TKikimrRunner kikimr(settings);
@@ -2001,10 +1999,8 @@ Y_UNIT_TEST_SUITE(KqpScan) {
TKikimrSettings settings;
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableKqpScanQuerySourceRead(true);
+ appConfig.MutableTableServiceConfig()->SetEnablePredicateExtractForScanQueries(true);
settings.SetDomainRoot(KikimrDefaultUtDomainRoot);
- TFeatureFlags flags;
- flags.SetEnablePredicateExtractForScanQueries(true);
- settings.SetFeatureFlags(flags);
settings.SetAppConfig(appConfig);
TKikimrRunner kikimr(settings);
@@ -2024,10 +2020,8 @@ Y_UNIT_TEST_SUITE(KqpScan) {
TKikimrSettings settings;
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableKqpScanQuerySourceRead(true);
+ appConfig.MutableTableServiceConfig()->SetEnablePredicateExtractForDataQueries(true);
settings.SetDomainRoot(KikimrDefaultUtDomainRoot);
- TFeatureFlags flags;
- flags.SetEnablePredicateExtractForDataQueries(true);
- settings.SetFeatureFlags(flags);
settings.SetAppConfig(appConfig);
TKikimrRunner kikimr(settings);
diff --git a/ydb/core/kqp/ut/scan/kqp_split_ut.cpp b/ydb/core/kqp/ut/scan/kqp_split_ut.cpp
index 302090aa6c5..bb977755f08 100644
--- a/ydb/core/kqp/ut/scan/kqp_split_ut.cpp
+++ b/ydb/core/kqp/ut/scan/kqp_split_ut.cpp
@@ -380,10 +380,8 @@ Y_UNIT_TEST_SUITE(KqpSplit) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableKqpScanQuerySourceRead(true);
appConfig.MutableTableServiceConfig()->SetEnableKqpScanQueryStreamLookup(false);
+ appConfig.MutableTableServiceConfig()->SetEnablePredicateExtractForScanQueries(true);
settings.SetDomainRoot(KikimrDefaultUtDomainRoot);
- TFeatureFlags flags;
- flags.SetEnablePredicateExtractForScanQueries(true);
- settings.SetFeatureFlags(flags);
settings.SetAppConfig(appConfig);
Kikimr.ConstructInPlace(settings);
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index 96bc141bd9c..0997fb53d81 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -762,13 +762,13 @@ message TFeatureFlags {
optional bool EnablePublicApiExternalBlobs = 59 [default = false];
optional bool EnablePublicApiKeepInMemory = 60 [default = false];
optional bool EnableImplicitScanQueryInScripts = 61 [default = true];
- optional bool EnablePredicateExtractForScanQueries = 62 [default = true];
+ reserved 62; // EnablePredicateExtractForScanQueries
optional bool AllowVDiskDefrag = 63 [default = true];
optional bool EnableAsyncHttpMon = 64 [default = true];
optional bool EnableChangefeeds = 65 [default = true];
reserved 66; // EnableKqpScanQueryStreamLookup
optional bool EnableKqpScanQueryMultipleOlapShardsReads = 67 [default = false];
- optional bool EnablePredicateExtractForDataQueries = 68 [default = true];
+ reserved 68; // EnablePredicateExtractForDataQueries;
reserved 69; // optional bool EnableKqpPatternCacheLiteral = 69 [default = false];
optional bool EnableMoveIndex = 70 [default = true];
// enable http handle for self termination
@@ -1270,6 +1270,8 @@ message TTableServiceConfig {
optional bool EnableKqpDataQueryStreamPointLookup = 33 [default = false];
optional bool EnablePublishKqpProxyByRM = 34 [default = false];
optional bool EnableKqpScanQueryStreamIdxLookupJoin = 35 [default = false];
+ optional bool EnablePredicateExtractForScanQueries = 36 [default = true];
+ optional bool EnablePredicateExtractForDataQueries = 37 [default = true];
};
// Config describes immediate controls and allows
diff --git a/ydb/core/testlib/basics/feature_flags.h b/ydb/core/testlib/basics/feature_flags.h
index 4f2c8993cbf..a4b36faf0da 100644
--- a/ydb/core/testlib/basics/feature_flags.h
+++ b/ydb/core/testlib/basics/feature_flags.h
@@ -34,7 +34,6 @@ public:
FEATURE_FLAG_SETTER(EnableBulkUpsertToAsyncIndexedTables)
FEATURE_FLAG_SETTER(EnableChangefeeds)
FEATURE_FLAG_SETTER(EnableMoveIndex)
- FEATURE_FLAG_SETTER(EnablePredicateExtractForDataQueries)
FEATURE_FLAG_SETTER(EnableNotNullDataColumns)
FEATURE_FLAG_SETTER(EnableArrowFormatAtDatashard)
FEATURE_FLAG_SETTER(EnableGrpcAudit)
diff --git a/ydb/library/yql/dq/opt/dq_opt.cpp b/ydb/library/yql/dq/opt/dq_opt.cpp
index bd8bfd6abe9..f27f3949f5a 100644
--- a/ydb/library/yql/dq/opt/dq_opt.cpp
+++ b/ydb/library/yql/dq/opt/dq_opt.cpp
@@ -133,10 +133,15 @@ ui32 GetStageOutputsCount(const TDqStageBase& stage) {
return resultsTypeTuple->GetSize();
}
-TVector<TDqConnection> FindDqConnections(const TExprBase& node) {
- TVector<TDqConnection> connections;
+bool IsDqPureNode(const TExprBase& node) {
+ return !node.Maybe<TDqSource>() &&
+ !node.Maybe<TDqConnection>() &&
+ !node.Maybe<TDqPrecompute>();
+}
- VisitExpr(node.Ptr(), [&connections](const TExprNode::TPtr& exprNode) {
+void FindDqConnections(const TExprBase& node, TVector<TDqConnection>& connections, bool& isPure) {
+ isPure = true;
+ VisitExpr(node.Ptr(), [&](const TExprNode::TPtr& exprNode) {
TExprBase node(exprNode);
if (node.Maybe<TDqPhyPrecompute>()) {
@@ -149,10 +154,12 @@ TVector<TDqConnection> FindDqConnections(const TExprBase& node) {
return false;
}
+ if (!IsDqPureNode(node)) {
+ isPure = false;
+ }
+
return true;
});
-
- return connections;
}
bool IsDqPureExpr(const TExprBase& node, bool isPrecomputePure) {
@@ -161,8 +168,7 @@ bool IsDqPureExpr(const TExprBase& node, bool isPrecomputePure) {
};
auto predicate = [](const TExprNode::TPtr& node) {
- return TMaybeNode<TDqSource>(node).IsValid() ||
- TMaybeNode<TDqConnection>(node).IsValid();
+ return !IsDqPureNode(TExprBase(node));
};
if (isPrecomputePure) {
diff --git a/ydb/library/yql/dq/opt/dq_opt.h b/ydb/library/yql/dq/opt/dq_opt.h
index a659a7ddab0..c49a97266ea 100644
--- a/ydb/library/yql/dq/opt/dq_opt.h
+++ b/ydb/library/yql/dq/opt/dq_opt.h
@@ -47,7 +47,7 @@ bool IsSingleConsumerConnection(const NNodes::TDqConnection& node, const TParent
ui32 GetStageOutputsCount(const NNodes::TDqStageBase& stage);
-TVector<NNodes::TDqConnection> FindDqConnections(const NNodes::TExprBase& node);
+void FindDqConnections(const NNodes::TExprBase& node, TVector<NNodes::TDqConnection>&, bool&);
bool IsDqPureExpr(const NNodes::TExprBase& node, bool isPrecomputePure = true);
bool IsDqSelfContainedExpr(const NNodes::TExprBase& node);
bool IsDqDependsOnStage(const NNodes::TExprBase& node, const NNodes::TDqStageBase& stage);
diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.cpp b/ydb/library/yql/dq/opt/dq_opt_phy.cpp
index c3f43485571..7a37df04858 100644
--- a/ydb/library/yql/dq/opt/dq_opt_phy.cpp
+++ b/ydb/library/yql/dq/opt/dq_opt_phy.cpp
@@ -734,8 +734,10 @@ TExprBase DqBuildPureFlatmapStage(TExprBase node, TExprContext& ctx) {
return node;
}
- auto innerConnections = FindDqConnections(flatmap.Lambda());
- if (innerConnections.empty()) {
+ bool isPure;
+ TVector<TDqConnection> innerConnections;
+ FindDqConnections(flatmap.Lambda(), innerConnections, isPure);
+ if (!isPure || innerConnections.empty()) {
return node;
}
@@ -776,7 +778,12 @@ TExprBase DqBuildFlatmapStage(TExprBase node, TExprContext& ctx, IOptimizationCo
return node;
}
- auto innerConnections = FindDqConnections(flatmap.Lambda());
+ bool isPure;
+ TVector<TDqConnection> innerConnections;
+ FindDqConnections(flatmap.Lambda(), innerConnections, isPure);
+ if (!isPure) {
+ return node;
+ }
TMaybeNode<TDqStage> flatmapStage;
if (!innerConnections.empty()) {
@@ -2634,15 +2641,19 @@ TExprBase DqPrecomputeToInput(const TExprBase& node, TExprContext& ctx) {
TExprNode::TListType newArgs;
TNodeOnNodeOwnedMap replaces;
+ TNodeOnNodeOwnedMap inputsReplaces;
+
for (ui64 i = 0; i < stage.Inputs().Size(); ++i) {
newInputs.push_back(stage.Inputs().Item(i).Ptr());
auto arg = stage.Program().Args().Arg(i).Raw();
newArgs.push_back(ctx.NewArgument(arg->Pos(), arg->Content()));
replaces[arg] = newArgs.back();
+
+ inputsReplaces[arg] = stage.Inputs().Item(i).Ptr();
}
for (auto& precompute: innerPrecomputes) {
- newInputs.push_back(precompute);
+ newInputs.push_back(ctx.ReplaceNodes(TExprNode::TPtr(precompute), inputsReplaces));
newArgs.push_back(ctx.NewArgument(precompute->Pos(), TStringBuilder() << "_dq_precompute_" << newArgs.size()));
replaces[precompute.Get()] = newArgs.back();
}