diff options
author | Roman Udovichenko <rvu@ydb.tech> | 2025-02-11 15:21:36 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-02-11 15:21:36 +0300 |
commit | f3c562b0c975dbfe2d26c852e2cad27d0bf6fd90 (patch) | |
tree | e6efbb6960fc75ccbcef0307b2bf57275173f72a | |
parent | 0e1fdd6b640f9f771f4a4f5981cb8749f32ceade (diff) | |
download | ydb-f3c562b0c975dbfe2d26c852e2cad27d0bf6fd90.tar.gz |
Move DqReplicate check from type_ann to optimizers (#14426)
Co-authored-by: Roman Udovichenko <udovichenko-r@localhost.localdomain>
4 files changed, 17 insertions, 20 deletions
diff --git a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp index 98691d51131..e9578c70509 100644 --- a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp +++ b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp @@ -20,12 +20,16 @@ using namespace NYql::NNodes; class TDqsPhysicalOptProposalTransformer : public TOptimizeTransformerBase { public: TDqsPhysicalOptProposalTransformer(TTypeAnnotationContext* typeCtx, const TDqConfiguration::TPtr& config) - : TOptimizeTransformerBase(typeCtx, NLog::EComponent::ProviderDq, {}) + : TOptimizeTransformerBase(/* TODO: typeCtx*/nullptr, NLog::EComponent::ProviderDq, {}) , Config(config) { const bool enablePrecompute = Config->_EnablePrecompute.Get().GetOrElse(false); + const bool enableDqReplicate = Config->IsDqReplicateEnabled(*typeCtx); #define HNDL(name) "DqsPhy-"#name, Hndl(&TDqsPhysicalOptProposalTransformer::name) + if (!enableDqReplicate) { + AddHandler(0, &TDqReplicate::Match, HNDL(FailOnDqReplicate)); + } AddHandler(0, &TDqSourceWrap::Match, HNDL(BuildStageWithSourceWrap)); AddHandler(0, &TDqReadWrap::Match, HNDL(BuildStageWithReadWrap)); AddHandler(0, &TCoSkipNullMembers::Match, HNDL(PushSkipNullMembersToStage<false>)); @@ -104,6 +108,11 @@ public: } protected: + TMaybeNode<TExprBase> FailOnDqReplicate(TExprBase node, TExprContext& ctx) { + ctx.AddError(YqlIssue(ctx.GetPosition(node.Pos()), TIssuesIds::DQ_OPTIMIZE_ERROR, "Reading multiple times from the same source is not supported")); + return {}; + } + TMaybeNode<TExprBase> BuildStageWithSourceWrap(TExprBase node, TExprContext& ctx) { return DqBuildStageWithSourceWrap(node, ctx); } diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp index 8e7f2fdf04d..775fcea8e66 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp +++ b/ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp @@ -32,12 +32,9 @@ public: TDqDataProviderSink(const TDqState::TPtr& state) : State(state) , LogOptTransformer([state] () { return CreateDqsLogOptTransformer(state->TypeCtx, state->Settings); }) - , PhyOptTransformer([state] () { return CreateDqsPhyOptTransformer(/*TODO*/nullptr, state->Settings); }) + , PhyOptTransformer([state] () { return CreateDqsPhyOptTransformer(state->TypeCtx, state->Settings); }) , PhysicalFinalizingTransformer([] () { return CreateDqsFinalizingOptTransformer(); }) - , TypeAnnotationTransformer([state] () { - return CreateDqsDataSinkTypeAnnotationTransformer( - state->TypeCtx, state->Settings->IsDqReplicateEnabled(*state->TypeCtx)); - }) + , TypeAnnotationTransformer([state] () { return CreateDqsDataSinkTypeAnnotationTransformer(state->TypeCtx); }) , ConstraintsTransformer([] () { return CreateDqDataSinkConstraintTransformer(); }) , RecaptureTransformer([state] () { return CreateDqsRecaptureTransformer(state); }) { } diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.cpp index adcce6805a7..d565b7eba8b 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.cpp +++ b/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.cpp @@ -14,7 +14,7 @@ namespace { class TDqsDataSinkTypeAnnotationTransformer : public TVisitorTransformerBase { public: - TDqsDataSinkTypeAnnotationTransformer(TTypeAnnotationContext* typeCtx, bool enableDqReplicate) + TDqsDataSinkTypeAnnotationTransformer(TTypeAnnotationContext* typeCtx) : TVisitorTransformerBase(true), TypeCtx(typeCtx) { AddHandler({TDqStage::CallableName()}, Hndl(&NDq::AnnotateDqStage)); @@ -28,11 +28,7 @@ public: AddHandler({TDqCnBroadcast::CallableName()}, Hndl(&NDq::AnnotateDqConnection)); AddHandler({TDqCnValue::CallableName()}, Hndl(&NDq::AnnotateDqCnValue)); AddHandler({TDqCnMerge::CallableName()}, Hndl(&NDq::AnnotateDqCnMerge)); - if (enableDqReplicate) { - AddHandler({TDqReplicate::CallableName()}, Hndl(&NDq::AnnotateDqReplicate)); - } else { - AddHandler({TDqReplicate::CallableName()}, Hndl(&TDqsDataSinkTypeAnnotationTransformer::AnnotateDqReplicateAlwaysError)); - } + AddHandler({TDqReplicate::CallableName()}, Hndl(&NDq::AnnotateDqReplicate)); AddHandler({TDqJoin::CallableName()}, Hndl(&NDq::AnnotateDqJoin)); AddHandler({TDqPhyGraceJoin::CallableName()}, Hndl(&NDq::AnnotateDqMapOrDictJoin)); AddHandler({TDqPhyMapJoin::CallableName()}, Hndl(&NDq::AnnotateDqMapOrDictJoin)); @@ -47,11 +43,6 @@ public: } private: - TStatus AnnotateDqReplicateAlwaysError(const TExprNode::TPtr& input, TExprContext& ctx) { - ctx.AddError(YqlIssue(ctx.GetPosition(input->Pos()), TIssuesIds::DQ_OPTIMIZE_ERROR, "Reading multiple times from the same source is not supported")); - return TStatus::Error; - } - TStatus AnnotateDqWrite(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) { if (!EnsureMinArgsCount(*input, 2, ctx)) { return TStatus::Error; @@ -86,8 +77,8 @@ private: } // unnamed -THolder<TVisitorTransformerBase> CreateDqsDataSinkTypeAnnotationTransformer(TTypeAnnotationContext* typeCtx, bool enableDqReplicate) { - return THolder(new TDqsDataSinkTypeAnnotationTransformer(typeCtx, enableDqReplicate)); +THolder<TVisitorTransformerBase> CreateDqsDataSinkTypeAnnotationTransformer(TTypeAnnotationContext* typeCtx) { + return THolder(new TDqsDataSinkTypeAnnotationTransformer(typeCtx)); } } // NYql diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.h b/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.h index f43af138b52..0347602ba0b 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.h +++ b/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.h @@ -7,6 +7,6 @@ namespace NYql { -THolder<TVisitorTransformerBase> CreateDqsDataSinkTypeAnnotationTransformer(TTypeAnnotationContext* typeCtx, bool enableDqReplicate); +THolder<TVisitorTransformerBase> CreateDqsDataSinkTypeAnnotationTransformer(TTypeAnnotationContext* typeCtx); } // NYql |