aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorwhcrc <whcrc@ydb.tech>2022-08-24 11:37:01 +0300
committerwhcrc <whcrc@ydb.tech>2022-08-24 11:37:01 +0300
commit26dcb3705729c807903c7c1d5757cb950e703c1a (patch)
tree3894101a09c9cc256e1127bc2a15b0f72a2e7a71
parentdb2a536de1654fd420176ebde60a29222a546839 (diff)
downloadydb-26dcb3705729c807903c7c1d5757cb950e703c1a.tar.gz
fallback on DqReplicate
-rw-r--r--ydb/library/yql/providers/dq/common/yql_dq_settings.cpp1
-rw-r--r--ydb/library/yql/providers/dq/common/yql_dq_settings.h2
-rw-r--r--ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp5
-rw-r--r--ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.cpp17
-rw-r--r--ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.h2
5 files changed, 21 insertions, 6 deletions
diff --git a/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp b/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp
index 0cfedf42d2b..333edf948c9 100644
--- a/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp
+++ b/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp
@@ -47,6 +47,7 @@ TDqConfiguration::TDqConfiguration() {
REGISTER_SETTING(*this, _FallbackOnRuntimeErrors);
REGISTER_SETTING(*this, WorkerFilter);
REGISTER_SETTING(*this, _EnablePrecompute);
+ REGISTER_SETTING(*this, EnableDqReplicate);
}
} // namespace NYql
diff --git a/ydb/library/yql/providers/dq/common/yql_dq_settings.h b/ydb/library/yql/providers/dq/common/yql_dq_settings.h
index e3dbf3320fe..11b15bbda02 100644
--- a/ydb/library/yql/providers/dq/common/yql_dq_settings.h
+++ b/ydb/library/yql/providers/dq/common/yql_dq_settings.h
@@ -27,6 +27,7 @@ struct TDqSettings {
static constexpr ui64 ChannelBufferSize = 2000_MB;
static constexpr ui64 OutputChunkMaxSize = 4_MB;
static constexpr ui64 ChunkSizeLimit = 128_MB;
+ static constexpr bool EnableDqReplicate = false;
};
using TPtr = std::shared_ptr<TDqSettings>;
@@ -69,6 +70,7 @@ struct TDqSettings {
NCommon::TConfSetting<bool, false> _OneGraphPerQuery;
NCommon::TConfSetting<TString, false> _FallbackOnRuntimeErrors;
NCommon::TConfSetting<bool, false> _EnablePrecompute;
+ NCommon::TConfSetting<bool, false> EnableDqReplicate;
NCommon::TConfSetting<TString, false> WorkerFilter;
diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp
index cdbcc43669e..b136bf2ab18 100644
--- a/ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp
+++ b/ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp
@@ -30,7 +30,10 @@ public:
, LogOptTransformer([state] () { return CreateDqsLogOptTransformer(state->TypeCtx, state->Settings); })
, PhyOptTransformer([state] () { return CreateDqsPhyOptTransformer(/*TODO*/nullptr, state->Settings); })
, PhysicalFinalizingTransformer([] () { return CreateDqsFinalizingOptTransformer(); })
- , TypeAnnotationTransformer([state] () { return CreateDqsDataSinkTypeAnnotationTransformer(state->TypeCtx); })
+ , TypeAnnotationTransformer([state] () {
+ return CreateDqsDataSinkTypeAnnotationTransformer(
+ state->TypeCtx, state->Settings->EnableDqReplicate.Get().GetOrElse(TDqSettings::TDefault::EnableDqReplicate));
+ })
, RecaptureTransformer([state] () { return CreateDqsRecaptureTransformer(state); })
{ }
diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.cpp
index 1c1af7bb077..48553e92527 100644
--- a/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.cpp
+++ b/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.cpp
@@ -13,7 +13,7 @@ namespace {
class TDqsDataSinkTypeAnnotationTransformer : public TVisitorTransformerBase {
public:
- TDqsDataSinkTypeAnnotationTransformer(TTypeAnnotationContext* typeCtx)
+ TDqsDataSinkTypeAnnotationTransformer(TTypeAnnotationContext* typeCtx, bool enableDqReplicate)
: TVisitorTransformerBase(true), TypeCtx(typeCtx)
{
AddHandler({TDqStage::CallableName()}, Hndl(&NDq::AnnotateDqStage));
@@ -26,7 +26,11 @@ public:
AddHandler({TDqCnBroadcast::CallableName()}, Hndl(&NDq::AnnotateDqConnection));
AddHandler({TDqCnValue::CallableName()}, Hndl(&NDq::AnnotateDqCnValue));
AddHandler({TDqCnMerge::CallableName()}, Hndl(&NDq::AnnotateDqCnMerge));
- AddHandler({TDqReplicate::CallableName()}, Hndl(&NDq::AnnotateDqReplicate));
+ if (enableDqReplicate) {
+ AddHandler({TDqReplicate::CallableName()}, Hndl(&NDq::AnnotateDqReplicate));
+ } else {
+ AddHandler({TDqReplicate::CallableName()}, Hndl(&TDqsDataSinkTypeAnnotationTransformer::AnnotateDqReplicateAlwaysError));
+ }
AddHandler({TDqJoin::CallableName()}, Hndl(&NDq::AnnotateDqJoin));
AddHandler({TDqPhyMapJoin::CallableName()}, Hndl(&NDq::AnnotateDqMapOrDictJoin));
AddHandler({TDqPhyCrossJoin::CallableName()}, Hndl(&NDq::AnnotateDqCrossJoin));
@@ -40,6 +44,11 @@ public:
}
private:
+ TStatus AnnotateDqReplicateAlwaysError(const TExprNode::TPtr& input, TExprContext& ctx) {
+ ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), "DqReplicate is not supported by DQ"));
+ return TStatus::Error;
+ }
+
TStatus AnnotateDqWrite(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
if (!EnsureMinArgsCount(*input, 2, ctx)) {
return TStatus::Error;
@@ -74,8 +83,8 @@ private:
} // unnamed
-THolder<TVisitorTransformerBase> CreateDqsDataSinkTypeAnnotationTransformer(TTypeAnnotationContext* typeCtx) {
- return THolder(new TDqsDataSinkTypeAnnotationTransformer(typeCtx));
+THolder<TVisitorTransformerBase> CreateDqsDataSinkTypeAnnotationTransformer(TTypeAnnotationContext* typeCtx, bool enableDqReplicate) {
+ return THolder(new TDqsDataSinkTypeAnnotationTransformer(typeCtx, enableDqReplicate));
}
} // NYql \ No newline at end of file
diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.h b/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.h
index df86def2cd4..5af1d1efa13 100644
--- a/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.h
+++ b/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.h
@@ -7,6 +7,6 @@
namespace NYql {
-THolder<TVisitorTransformerBase> CreateDqsDataSinkTypeAnnotationTransformer(TTypeAnnotationContext* typeCtx);
+THolder<TVisitorTransformerBase> CreateDqsDataSinkTypeAnnotationTransformer(TTypeAnnotationContext* typeCtx, bool enableDqReplicate);
} // NYql