diff options
author | a-romanov <Anton.Romanov@ydb.tech> | 2022-12-19 17:52:52 +0300 |
---|---|---|
committer | a-romanov <Anton.Romanov@ydb.tech> | 2022-12-19 17:52:52 +0300 |
commit | bc9bfa700f812cea3b635c19f46b6db627b6d41e (patch) | |
tree | abf826a0ace7b711751d29ee1980a8b84cb9e0ae | |
parent | 417bca7d9cf2fe1432e3354ade750c6187bbf396 (diff) | |
download | ydb-bc9bfa700f812cea3b635c19f46b6db627b6d41e.tar.gz |
Add sorted for DqCnMerge.
6 files changed, 57 insertions, 33 deletions
diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp index fe84536e0c4..094c786fb53 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp +++ b/ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp @@ -24,9 +24,11 @@ namespace NYql { using namespace NNodes; +namespace { + class TDqDataProviderSink: public TDataProviderBase { public: - TDqDataProviderSink(const TDqStatePtr& state) + TDqDataProviderSink(const TDqState::TPtr& state) : State(state) , LogOptTransformer([state] () { return CreateDqsLogOptTransformer(state->TypeCtx, state->Settings); }) , PhyOptTransformer([state] () { return CreateDqsPhyOptTransformer(/*TODO*/nullptr, state->Settings, state->TypeCtx->UseBlocks ); }) @@ -35,7 +37,7 @@ public: return CreateDqsDataSinkTypeAnnotationTransformer( state->TypeCtx, state->Settings->EnableDqReplicate.Get().GetOrElse(TDqSettings::TDefault::EnableDqReplicate)); }) - , ConstraintsTransformer([state] () { return CreateDqDataSinkConstraintTransformer(state); }) + , ConstraintsTransformer([] () { return CreateDqDataSinkConstraintTransformer(); }) , RecaptureTransformer([state] () { return CreateDqsRecaptureTransformer(state); }) { } @@ -275,7 +277,7 @@ public: } } - TDqStatePtr State; + const TDqState::TPtr State; TLazyInitHolder<IGraphTransformer> LogOptTransformer; TLazyInitHolder<IGraphTransformer> PhyOptTransformer; @@ -285,7 +287,9 @@ public: TLazyInitHolder<IGraphTransformer> RecaptureTransformer; }; -TIntrusivePtr<IDataProvider> CreateDqDataSink(const TDqStatePtr& state) { +} + +TIntrusivePtr<IDataProvider> CreateDqDataSink(const TDqState::TPtr& state) { return new TDqDataProviderSink(state); } diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_datasink_constraints.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_datasink_constraints.cpp index 306e230f8fb..e201a05d13c 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_datasink_constraints.cpp +++ b/ydb/library/yql/providers/dq/provider/yql_dq_datasink_constraints.cpp @@ -11,11 +11,27 @@ using namespace NNodes; namespace { +template <class... Other> +struct TCopyConstraint; + +template <> +struct TCopyConstraint<> { + static void Do(const TExprNode&, const TExprNode::TPtr&) {} +}; + +template <class TConstraint, class... Other> +struct TCopyConstraint<TConstraint, Other...> { + static void Do(const TExprNode& from, const TExprNode::TPtr& to) { + if (const auto c = from.GetConstraint<TConstraint>()) + to->AddConstraint(c); + TCopyConstraint<Other...>::Do(from, to); + } +}; + class TDqDataSinkConstraintTransformer : public TVisitorTransformerBase { public: - TDqDataSinkConstraintTransformer(TDqState::TPtr state) + TDqDataSinkConstraintTransformer() : TVisitorTransformerBase(true) - , State_(std::move(state)) { AddHandler({TDqStage::CallableName(), TDqPhyStage::CallableName()}, Hndl(&TDqDataSinkConstraintTransformer::HandleStage)); AddHandler({TDqOutput::CallableName()}, Hndl(&TDqDataSinkConstraintTransformer::HandleOutput)); @@ -24,10 +40,10 @@ public: TDqCnBroadcast::CallableName(), TDqCnMap::CallableName(), TDqCnHashShuffle::CallableName(), - TDqCnMerge::CallableName(), TDqCnResult::CallableName(), TDqCnValue::CallableName() }, Hndl(&TDqDataSinkConstraintTransformer::HandleConnection)); + AddHandler({TDqCnMerge::CallableName()}, Hndl(&TDqDataSinkConstraintTransformer::HandleMerge)); AddHandler({TDqReplicate::CallableName()}, Hndl(&TDqDataSinkConstraintTransformer::HandleReplicate)); AddHandler({ TDqJoin::CallableName(), @@ -61,16 +77,25 @@ public: if (const auto set = multi->GetItem(FromString<ui32>(output.Index().Value()))) input.Ptr()->SetConstraints(*set); } else - input.Ptr()->CopyConstraints(output.Stage().Program().Body().Ref()); // TODO: Copy onli limited set of constraints. + input.Ptr()->CopyConstraints(output.Stage().Program().Body().Ref()); return TStatus::Ok; } TStatus HandleConnection(TExprBase input, TExprContext&) { const auto output = input.Cast<TDqConnection>().Output(); - if (const auto u = output.Ref().GetConstraint<TUniqueConstraintNode>()) - input.Ptr()->AddConstraint(u); - if (const auto e = output.Ref().GetConstraint<TEmptyConstraintNode>()) - input.Ptr()->AddConstraint(e); + TCopyConstraint<TUniqueConstraintNode, TEmptyConstraintNode>::Do(output.Ref(), input.Ptr()); + return TStatus::Ok; + } + + TStatus HandleMerge(TExprBase input, TExprContext& ctx) { + const auto output = input.Cast<TDqCnMerge>().Output(); + if (const auto outSorted = output.Ref().GetConstraint<TSortedConstraintNode>()) + input.Ptr()->AddConstraint(outSorted); + else { + ctx.AddError(TIssue(ctx.GetPosition(input.Pos()), "Expected sorted constraint on stage output.")); + return TStatus::Error; + } + TCopyConstraint<TUniqueConstraintNode, TEmptyConstraintNode>::Do(output.Ref(), input.Ptr()); return TStatus::Ok; } @@ -97,14 +122,12 @@ public: return TStatus::Ok; } -private: - const TDqState::TPtr State_; }; } -THolder<IGraphTransformer> CreateDqDataSinkConstraintTransformer(TDqState::TPtr state) { - return THolder<IGraphTransformer>(new TDqDataSinkConstraintTransformer(std::move(state))); +THolder<IGraphTransformer> CreateDqDataSinkConstraintTransformer() { + return THolder<IGraphTransformer>(new TDqDataSinkConstraintTransformer()); } } diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_datasink_constraints.h b/ydb/library/yql/providers/dq/provider/yql_dq_datasink_constraints.h index 1f5976f958d..f78355f4c12 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_datasink_constraints.h +++ b/ydb/library/yql/providers/dq/provider/yql_dq_datasink_constraints.h @@ -1,11 +1,9 @@ #pragma once -#include "yql_dq_state.h" - #include <ydb/library/yql/core/yql_graph_transformer.h> namespace NYql { -THolder<IGraphTransformer> CreateDqDataSinkConstraintTransformer(TDqState::TPtr state); +THolder<IGraphTransformer> CreateDqDataSinkConstraintTransformer(); } // NYql diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_datasource.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_datasource.cpp index 6db7a0fe8ac..b617b7da7c0 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_datasource.cpp +++ b/ydb/library/yql/providers/dq/provider/yql_dq_datasource.cpp @@ -35,16 +35,18 @@ using namespace NKikimr::NMiniKQL; using namespace NNodes; using namespace NDq; +namespace { + class TDqDataProviderSource: public TDataProviderBase { public: - TDqDataProviderSource(const TDqStatePtr& state, TExecTransformerFactory execTransformerFactory) + TDqDataProviderSource(const TDqState::TPtr& state, TExecTransformerFactory execTransformerFactory) : State(state) , ConfigurationTransformer([this]() { return MakeHolder<NCommon::TProviderConfigurationTransformer>(State->Settings, *State->TypeCtx, TString{DqProviderName}); }) , ExecTransformer([this, execTransformerFactory] () { return THolder<IGraphTransformer>(execTransformerFactory(State)); }) , TypeAnnotationTransformer([] () { return CreateDqsDataSourceTypeAnnotationTransformer(); }) - , ConstraintsTransformer([state] () { return CreateDqDataSourceConstraintTransformer(state); }) + , ConstraintsTransformer([] () { return CreateDqDataSourceConstraintTransformer(); }) { } TStringBuf GetName() const override { @@ -233,18 +235,21 @@ public: if (ExecTransformer) { ExecTransformer->Rewind(); TypeAnnotationTransformer->Rewind(); + ConstraintsTransformer->Rewind(); } } private: - TDqStatePtr State; + const TDqState::TPtr State; TLazyInitHolder<IGraphTransformer> ConfigurationTransformer; TLazyInitHolder<IGraphTransformer> ExecTransformer; TLazyInitHolder<TVisitorTransformerBase> TypeAnnotationTransformer; TLazyInitHolder<IGraphTransformer> ConstraintsTransformer; }; -TIntrusivePtr<IDataProvider> CreateDqDataSource(const TDqStatePtr& state, TExecTransformerFactory execTransformerFactory) { +} + +TIntrusivePtr<IDataProvider> CreateDqDataSource(const TDqState::TPtr& state, TExecTransformerFactory execTransformerFactory) { return new TDqDataProviderSource(state, execTransformerFactory); } diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_datasource_constraints.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_datasource_constraints.cpp index 8dc3a60870f..914c9156259 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_datasource_constraints.cpp +++ b/ydb/library/yql/providers/dq/provider/yql_dq_datasource_constraints.cpp @@ -13,9 +13,8 @@ namespace { class TDqDataSourceConstraintTransformer : public TVisitorTransformerBase { public: - TDqDataSourceConstraintTransformer(TDqState::TPtr state) + TDqDataSourceConstraintTransformer() : TVisitorTransformerBase(true) - , State_(std::move(state)) { AddHandler({ TCoConfigure::CallableName(), @@ -31,14 +30,12 @@ public: TStatus HandleDefault(TExprBase, TExprContext&) { return TStatus::Ok; } -private: - const TDqState::TPtr State_; }; } -THolder<IGraphTransformer> CreateDqDataSourceConstraintTransformer(TDqState::TPtr state) { - return THolder<IGraphTransformer>(new TDqDataSourceConstraintTransformer(std::move(state))); +THolder<IGraphTransformer> CreateDqDataSourceConstraintTransformer() { + return THolder<IGraphTransformer>(new TDqDataSourceConstraintTransformer()); } } diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_datasource_constraints.h b/ydb/library/yql/providers/dq/provider/yql_dq_datasource_constraints.h index 22aa5716e9c..06639abfe45 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_datasource_constraints.h +++ b/ydb/library/yql/providers/dq/provider/yql_dq_datasource_constraints.h @@ -1,12 +1,9 @@ #pragma once -#include "yql_dq_state.h" - #include <ydb/library/yql/core/yql_graph_transformer.h> namespace NYql { -THolder<IGraphTransformer> CreateDqDataSourceConstraintTransformer(TDqState::TPtr state); +THolder<IGraphTransformer> CreateDqDataSourceConstraintTransformer(); } // NYql - |