aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRoman Udovichenko <rvu@ydb.tech>2025-02-11 15:21:36 +0300
committerGitHub <noreply@github.com>2025-02-11 15:21:36 +0300
commitf3c562b0c975dbfe2d26c852e2cad27d0bf6fd90 (patch)
treee6efbb6960fc75ccbcef0307b2bf57275173f72a
parent0e1fdd6b640f9f771f4a4f5981cb8749f32ceade (diff)
downloadydb-f3c562b0c975dbfe2d26c852e2cad27d0bf6fd90.tar.gz
Move DqReplicate check from type_ann to optimizers (#14426)
Co-authored-by: Roman Udovichenko <udovichenko-r@localhost.localdomain>
-rw-r--r--ydb/library/yql/providers/dq/opt/physical_optimize.cpp11
-rw-r--r--ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp7
-rw-r--r--ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.cpp17
-rw-r--r--ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.h2
4 files changed, 17 insertions, 20 deletions
diff --git a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp
index 98691d51131..e9578c70509 100644
--- a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp
+++ b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp
@@ -20,12 +20,16 @@ using namespace NYql::NNodes;
class TDqsPhysicalOptProposalTransformer : public TOptimizeTransformerBase {
public:
TDqsPhysicalOptProposalTransformer(TTypeAnnotationContext* typeCtx, const TDqConfiguration::TPtr& config)
- : TOptimizeTransformerBase(typeCtx, NLog::EComponent::ProviderDq, {})
+ : TOptimizeTransformerBase(/* TODO: typeCtx*/nullptr, NLog::EComponent::ProviderDq, {})
, Config(config)
{
const bool enablePrecompute = Config->_EnablePrecompute.Get().GetOrElse(false);
+ const bool enableDqReplicate = Config->IsDqReplicateEnabled(*typeCtx);
#define HNDL(name) "DqsPhy-"#name, Hndl(&TDqsPhysicalOptProposalTransformer::name)
+ if (!enableDqReplicate) {
+ AddHandler(0, &TDqReplicate::Match, HNDL(FailOnDqReplicate));
+ }
AddHandler(0, &TDqSourceWrap::Match, HNDL(BuildStageWithSourceWrap));
AddHandler(0, &TDqReadWrap::Match, HNDL(BuildStageWithReadWrap));
AddHandler(0, &TCoSkipNullMembers::Match, HNDL(PushSkipNullMembersToStage<false>));
@@ -104,6 +108,11 @@ public:
}
protected:
+ TMaybeNode<TExprBase> FailOnDqReplicate(TExprBase node, TExprContext& ctx) {
+ ctx.AddError(YqlIssue(ctx.GetPosition(node.Pos()), TIssuesIds::DQ_OPTIMIZE_ERROR, "Reading multiple times from the same source is not supported"));
+ return {};
+ }
+
TMaybeNode<TExprBase> BuildStageWithSourceWrap(TExprBase node, TExprContext& ctx) {
return DqBuildStageWithSourceWrap(node, ctx);
}
diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp
index 8e7f2fdf04d..775fcea8e66 100644
--- a/ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp
+++ b/ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp
@@ -32,12 +32,9 @@ public:
TDqDataProviderSink(const TDqState::TPtr& state)
: State(state)
, LogOptTransformer([state] () { return CreateDqsLogOptTransformer(state->TypeCtx, state->Settings); })
- , PhyOptTransformer([state] () { return CreateDqsPhyOptTransformer(/*TODO*/nullptr, state->Settings); })
+ , PhyOptTransformer([state] () { return CreateDqsPhyOptTransformer(state->TypeCtx, state->Settings); })
, PhysicalFinalizingTransformer([] () { return CreateDqsFinalizingOptTransformer(); })
- , TypeAnnotationTransformer([state] () {
- return CreateDqsDataSinkTypeAnnotationTransformer(
- state->TypeCtx, state->Settings->IsDqReplicateEnabled(*state->TypeCtx));
- })
+ , TypeAnnotationTransformer([state] () { return CreateDqsDataSinkTypeAnnotationTransformer(state->TypeCtx); })
, ConstraintsTransformer([] () { return CreateDqDataSinkConstraintTransformer(); })
, RecaptureTransformer([state] () { return CreateDqsRecaptureTransformer(state); })
{ }
diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.cpp
index adcce6805a7..d565b7eba8b 100644
--- a/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.cpp
+++ b/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.cpp
@@ -14,7 +14,7 @@ namespace {
class TDqsDataSinkTypeAnnotationTransformer : public TVisitorTransformerBase {
public:
- TDqsDataSinkTypeAnnotationTransformer(TTypeAnnotationContext* typeCtx, bool enableDqReplicate)
+ TDqsDataSinkTypeAnnotationTransformer(TTypeAnnotationContext* typeCtx)
: TVisitorTransformerBase(true), TypeCtx(typeCtx)
{
AddHandler({TDqStage::CallableName()}, Hndl(&NDq::AnnotateDqStage));
@@ -28,11 +28,7 @@ public:
AddHandler({TDqCnBroadcast::CallableName()}, Hndl(&NDq::AnnotateDqConnection));
AddHandler({TDqCnValue::CallableName()}, Hndl(&NDq::AnnotateDqCnValue));
AddHandler({TDqCnMerge::CallableName()}, Hndl(&NDq::AnnotateDqCnMerge));
- if (enableDqReplicate) {
- AddHandler({TDqReplicate::CallableName()}, Hndl(&NDq::AnnotateDqReplicate));
- } else {
- AddHandler({TDqReplicate::CallableName()}, Hndl(&TDqsDataSinkTypeAnnotationTransformer::AnnotateDqReplicateAlwaysError));
- }
+ AddHandler({TDqReplicate::CallableName()}, Hndl(&NDq::AnnotateDqReplicate));
AddHandler({TDqJoin::CallableName()}, Hndl(&NDq::AnnotateDqJoin));
AddHandler({TDqPhyGraceJoin::CallableName()}, Hndl(&NDq::AnnotateDqMapOrDictJoin));
AddHandler({TDqPhyMapJoin::CallableName()}, Hndl(&NDq::AnnotateDqMapOrDictJoin));
@@ -47,11 +43,6 @@ public:
}
private:
- TStatus AnnotateDqReplicateAlwaysError(const TExprNode::TPtr& input, TExprContext& ctx) {
- ctx.AddError(YqlIssue(ctx.GetPosition(input->Pos()), TIssuesIds::DQ_OPTIMIZE_ERROR, "Reading multiple times from the same source is not supported"));
- return TStatus::Error;
- }
-
TStatus AnnotateDqWrite(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
if (!EnsureMinArgsCount(*input, 2, ctx)) {
return TStatus::Error;
@@ -86,8 +77,8 @@ private:
} // unnamed
-THolder<TVisitorTransformerBase> CreateDqsDataSinkTypeAnnotationTransformer(TTypeAnnotationContext* typeCtx, bool enableDqReplicate) {
- return THolder(new TDqsDataSinkTypeAnnotationTransformer(typeCtx, enableDqReplicate));
+THolder<TVisitorTransformerBase> CreateDqsDataSinkTypeAnnotationTransformer(TTypeAnnotationContext* typeCtx) {
+ return THolder(new TDqsDataSinkTypeAnnotationTransformer(typeCtx));
}
} // NYql
diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.h b/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.h
index f43af138b52..0347602ba0b 100644
--- a/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.h
+++ b/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.h
@@ -7,6 +7,6 @@
namespace NYql {
-THolder<TVisitorTransformerBase> CreateDqsDataSinkTypeAnnotationTransformer(TTypeAnnotationContext* typeCtx, bool enableDqReplicate);
+THolder<TVisitorTransformerBase> CreateDqsDataSinkTypeAnnotationTransformer(TTypeAnnotationContext* typeCtx);
} // NYql