diff options
author | whcrc <whcrc@ydb.tech> | 2022-08-24 11:37:01 +0300 |
---|---|---|
committer | whcrc <whcrc@ydb.tech> | 2022-08-24 11:37:01 +0300 |
commit | 26dcb3705729c807903c7c1d5757cb950e703c1a (patch) | |
tree | 3894101a09c9cc256e1127bc2a15b0f72a2e7a71 | |
parent | db2a536de1654fd420176ebde60a29222a546839 (diff) | |
download | ydb-26dcb3705729c807903c7c1d5757cb950e703c1a.tar.gz |
fallback on DqReplicate
5 files changed, 21 insertions, 6 deletions
diff --git a/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp b/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp index 0cfedf42d2b..333edf948c9 100644 --- a/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp +++ b/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp @@ -47,6 +47,7 @@ TDqConfiguration::TDqConfiguration() { REGISTER_SETTING(*this, _FallbackOnRuntimeErrors); REGISTER_SETTING(*this, WorkerFilter); REGISTER_SETTING(*this, _EnablePrecompute); + REGISTER_SETTING(*this, EnableDqReplicate); } } // namespace NYql diff --git a/ydb/library/yql/providers/dq/common/yql_dq_settings.h b/ydb/library/yql/providers/dq/common/yql_dq_settings.h index e3dbf3320fe..11b15bbda02 100644 --- a/ydb/library/yql/providers/dq/common/yql_dq_settings.h +++ b/ydb/library/yql/providers/dq/common/yql_dq_settings.h @@ -27,6 +27,7 @@ struct TDqSettings { static constexpr ui64 ChannelBufferSize = 2000_MB; static constexpr ui64 OutputChunkMaxSize = 4_MB; static constexpr ui64 ChunkSizeLimit = 128_MB; + static constexpr bool EnableDqReplicate = false; }; using TPtr = std::shared_ptr<TDqSettings>; @@ -69,6 +70,7 @@ struct TDqSettings { NCommon::TConfSetting<bool, false> _OneGraphPerQuery; NCommon::TConfSetting<TString, false> _FallbackOnRuntimeErrors; NCommon::TConfSetting<bool, false> _EnablePrecompute; + NCommon::TConfSetting<bool, false> EnableDqReplicate; NCommon::TConfSetting<TString, false> WorkerFilter; diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp index cdbcc43669e..b136bf2ab18 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp +++ b/ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp @@ -30,7 +30,10 @@ public: , LogOptTransformer([state] () { return CreateDqsLogOptTransformer(state->TypeCtx, state->Settings); }) , PhyOptTransformer([state] () { return CreateDqsPhyOptTransformer(/*TODO*/nullptr, state->Settings); }) , PhysicalFinalizingTransformer([] () { return CreateDqsFinalizingOptTransformer(); }) - , TypeAnnotationTransformer([state] () { return CreateDqsDataSinkTypeAnnotationTransformer(state->TypeCtx); }) + , TypeAnnotationTransformer([state] () { + return CreateDqsDataSinkTypeAnnotationTransformer( + state->TypeCtx, state->Settings->EnableDqReplicate.Get().GetOrElse(TDqSettings::TDefault::EnableDqReplicate)); + }) , RecaptureTransformer([state] () { return CreateDqsRecaptureTransformer(state); }) { } diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.cpp index 1c1af7bb077..48553e92527 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.cpp +++ b/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.cpp @@ -13,7 +13,7 @@ namespace { class TDqsDataSinkTypeAnnotationTransformer : public TVisitorTransformerBase { public: - TDqsDataSinkTypeAnnotationTransformer(TTypeAnnotationContext* typeCtx) + TDqsDataSinkTypeAnnotationTransformer(TTypeAnnotationContext* typeCtx, bool enableDqReplicate) : TVisitorTransformerBase(true), TypeCtx(typeCtx) { AddHandler({TDqStage::CallableName()}, Hndl(&NDq::AnnotateDqStage)); @@ -26,7 +26,11 @@ public: AddHandler({TDqCnBroadcast::CallableName()}, Hndl(&NDq::AnnotateDqConnection)); AddHandler({TDqCnValue::CallableName()}, Hndl(&NDq::AnnotateDqCnValue)); AddHandler({TDqCnMerge::CallableName()}, Hndl(&NDq::AnnotateDqCnMerge)); - AddHandler({TDqReplicate::CallableName()}, Hndl(&NDq::AnnotateDqReplicate)); + if (enableDqReplicate) { + AddHandler({TDqReplicate::CallableName()}, Hndl(&NDq::AnnotateDqReplicate)); + } else { + AddHandler({TDqReplicate::CallableName()}, Hndl(&TDqsDataSinkTypeAnnotationTransformer::AnnotateDqReplicateAlwaysError)); + } AddHandler({TDqJoin::CallableName()}, Hndl(&NDq::AnnotateDqJoin)); AddHandler({TDqPhyMapJoin::CallableName()}, Hndl(&NDq::AnnotateDqMapOrDictJoin)); AddHandler({TDqPhyCrossJoin::CallableName()}, Hndl(&NDq::AnnotateDqCrossJoin)); @@ -40,6 +44,11 @@ public: } private: + TStatus AnnotateDqReplicateAlwaysError(const TExprNode::TPtr& input, TExprContext& ctx) { + ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), "DqReplicate is not supported by DQ")); + return TStatus::Error; + } + TStatus AnnotateDqWrite(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) { if (!EnsureMinArgsCount(*input, 2, ctx)) { return TStatus::Error; @@ -74,8 +83,8 @@ private: } // unnamed -THolder<TVisitorTransformerBase> CreateDqsDataSinkTypeAnnotationTransformer(TTypeAnnotationContext* typeCtx) { - return THolder(new TDqsDataSinkTypeAnnotationTransformer(typeCtx)); +THolder<TVisitorTransformerBase> CreateDqsDataSinkTypeAnnotationTransformer(TTypeAnnotationContext* typeCtx, bool enableDqReplicate) { + return THolder(new TDqsDataSinkTypeAnnotationTransformer(typeCtx, enableDqReplicate)); } } // NYql
\ No newline at end of file diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.h b/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.h index df86def2cd4..5af1d1efa13 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.h +++ b/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.h @@ -7,6 +7,6 @@ namespace NYql { -THolder<TVisitorTransformerBase> CreateDqsDataSinkTypeAnnotationTransformer(TTypeAnnotationContext* typeCtx); +THolder<TVisitorTransformerBase> CreateDqsDataSinkTypeAnnotationTransformer(TTypeAnnotationContext* typeCtx, bool enableDqReplicate); } // NYql |