diff options
author | a-romanov <Anton.Romanov@ydb.tech> | 2022-12-13 16:28:35 +0300 |
---|---|---|
committer | a-romanov <Anton.Romanov@ydb.tech> | 2022-12-13 16:28:35 +0300 |
commit | d60a1bdddafea6dea38cdbcbb74adabb7689999d (patch) | |
tree | 634c1fddb85bd141ef838cbd899e26c7ad93001d | |
parent | f2bea70bea01921ec43846224d100f2c70dd5719 (diff) | |
download | ydb-d60a1bdddafea6dea38cdbcbb74adabb7689999d.tar.gz |
Initial support constraints in DQ.
9 files changed, 202 insertions, 0 deletions
diff --git a/ydb/library/yql/ast/yql_constraint.h b/ydb/library/yql/ast/yql_constraint.h index ec5928d6f1e..6b9a0b570b8 100644 --- a/ydb/library/yql/ast/yql_constraint.h +++ b/ydb/library/yql/ast/yql_constraint.h @@ -119,6 +119,10 @@ public: Constraints_.clear(); } + explicit operator bool() const { + return !Constraints_.empty(); + } + bool operator ==(const TConstraintSet& s) const { return Constraints_ == s.Constraints_; } diff --git a/ydb/library/yql/providers/dq/provider/CMakeLists.txt b/ydb/library/yql/providers/dq/provider/CMakeLists.txt index 79c518f259f..80deccd07a3 100644 --- a/ydb/library/yql/providers/dq/provider/CMakeLists.txt +++ b/ydb/library/yql/providers/dq/provider/CMakeLists.txt @@ -42,7 +42,9 @@ target_link_libraries(providers-dq-provider PUBLIC ) target_sources(providers-dq-provider PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/provider/yql_dq_control.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/provider/yql_dq_datasink_constraints.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/provider/yql_dq_datasource_constraints.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/provider/yql_dq_datasource_type_ann.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/provider/yql_dq_provider.cpp diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp index f8fdd0b63e1..fe84536e0c4 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp +++ b/ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp @@ -1,5 +1,6 @@ #include "yql_dq_datasink.h" #include "yql_dq_state.h" +#include "yql_dq_datasink_constraints.h" #include "yql_dq_datasink_type_ann.h" #include "yql_dq_recapture.h" @@ -34,6 +35,7 @@ public: return CreateDqsDataSinkTypeAnnotationTransformer( state->TypeCtx, state->Settings->EnableDqReplicate.Get().GetOrElse(TDqSettings::TDefault::EnableDqReplicate)); }) + , ConstraintsTransformer([state] () { return CreateDqDataSinkConstraintTransformer(state); }) , RecaptureTransformer([state] () { return CreateDqsRecaptureTransformer(state); }) { } @@ -189,6 +191,11 @@ public: return *TypeAnnotationTransformer; } + IGraphTransformer& GetConstraintTransformer(bool instantOnly, bool subGraph) override { + Y_UNUSED(instantOnly && subGraph); + return *ConstraintsTransformer; + } + IGraphTransformer& GetRecaptureOptProposalTransformer() override { return *RecaptureTransformer; } @@ -274,6 +281,7 @@ public: TLazyInitHolder<IGraphTransformer> PhyOptTransformer; TLazyInitHolder<IGraphTransformer> PhysicalFinalizingTransformer; TLazyInitHolder<TVisitorTransformerBase> TypeAnnotationTransformer; + TLazyInitHolder<IGraphTransformer> ConstraintsTransformer; TLazyInitHolder<IGraphTransformer> RecaptureTransformer; }; diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_datasink_constraints.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_datasink_constraints.cpp new file mode 100644 index 00000000000..306e230f8fb --- /dev/null +++ b/ydb/library/yql/providers/dq/provider/yql_dq_datasink_constraints.cpp @@ -0,0 +1,111 @@ +#include "yql_dq_state.h" + +#include <ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.h> +#include <ydb/library/yql/providers/common/transform/yql_visit.h> +#include <ydb/library/yql/core/yql_expr_constraint.h> +#include <ydb/library/yql/ast/yql_constraint.h> + +namespace NYql { + +using namespace NNodes; + +namespace { + +class TDqDataSinkConstraintTransformer : public TVisitorTransformerBase { +public: + TDqDataSinkConstraintTransformer(TDqState::TPtr state) + : TVisitorTransformerBase(true) + , State_(std::move(state)) + { + AddHandler({TDqStage::CallableName(), TDqPhyStage::CallableName()}, Hndl(&TDqDataSinkConstraintTransformer::HandleStage)); + AddHandler({TDqOutput::CallableName()}, Hndl(&TDqDataSinkConstraintTransformer::HandleOutput)); + AddHandler({ + TDqCnUnionAll::CallableName(), + TDqCnBroadcast::CallableName(), + TDqCnMap::CallableName(), + TDqCnHashShuffle::CallableName(), + TDqCnMerge::CallableName(), + TDqCnResult::CallableName(), + TDqCnValue::CallableName() + }, Hndl(&TDqDataSinkConstraintTransformer::HandleConnection)); + AddHandler({TDqReplicate::CallableName()}, Hndl(&TDqDataSinkConstraintTransformer::HandleReplicate)); + AddHandler({ + TDqJoin::CallableName(), + TDqPhyMapJoin::CallableName(), + TDqPhyCrossJoin::CallableName(), + TDqPhyJoinDict::CallableName(), + TDqSink::CallableName(), + TDqWrite::CallableName(), + TDqQuery::CallableName(), + TDqPrecompute::CallableName(), + TDqPhyPrecompute::CallableName(), + TDqTransform::CallableName() + }, Hndl(&TDqDataSinkConstraintTransformer::HandleDefault)); + } + + TStatus HandleDefault(TExprBase, TExprContext&) { + return TStatus::Ok; + } + + TStatus HandleStage(TExprBase input, TExprContext& ctx) { + const auto stage = input.Cast<TDqStageBase>(); + TSmallVec<TConstraintNode::TListType> argConstraints(stage.Inputs().Size()); + for (auto i = 0U; i < argConstraints.size(); ++i) + argConstraints[i] = stage.Inputs().Item(i).Ref().GetAllConstraints(); + return UpdateLambdaConstraints(stage.Ptr()->ChildRef(TDqStageBase::idx_Program), ctx, argConstraints); + } + + TStatus HandleOutput(TExprBase input, TExprContext&) { + const auto output = input.Cast<TDqOutput>(); + if (const auto multi = output.Stage().Program().Body().Ref().GetConstraint<TMultiConstraintNode>()) { + if (const auto set = multi->GetItem(FromString<ui32>(output.Index().Value()))) + input.Ptr()->SetConstraints(*set); + } else + input.Ptr()->CopyConstraints(output.Stage().Program().Body().Ref()); // TODO: Copy onli limited set of constraints. + return TStatus::Ok; + } + + TStatus HandleConnection(TExprBase input, TExprContext&) { + const auto output = input.Cast<TDqConnection>().Output(); + if (const auto u = output.Ref().GetConstraint<TUniqueConstraintNode>()) + input.Ptr()->AddConstraint(u); + if (const auto e = output.Ref().GetConstraint<TEmptyConstraintNode>()) + input.Ptr()->AddConstraint(e); + return TStatus::Ok; + } + + TStatus HandleReplicate(TExprBase input, TExprContext& ctx) { + const auto replicate = input.Cast<TDqReplicate>(); + TSmallVec<TConstraintNode::TListType> argConstraints(1U, replicate.Input().Ref().GetAllConstraints()); + TStatus status = TStatus::Ok; + + for (auto i = 1U; i < replicate.Ref().ChildrenSize(); ++i) + status = status.Combine(UpdateLambdaConstraints(replicate.Ptr()->ChildRef(i), ctx, argConstraints)); + + if (status != TStatus::Ok) + return status; + + TMultiConstraintNode::TMapType map; + map.reserve(replicate.Ref().ChildrenSize() - 1U); + + for (auto i = 1U; i < replicate.Ref().ChildrenSize(); ++i) + if (auto constraints = replicate.Ref().Child(i)->Tail().GetConstraintSet()) + map.insert_unique(std::make_pair(i - 1U, std::move(constraints))); + + if (!map.empty()) + input.Ptr()->AddConstraint(ctx.MakeConstraint<TMultiConstraintNode>(std::move(map))); + + return TStatus::Ok; + } +private: + const TDqState::TPtr State_; +}; + +} + +THolder<IGraphTransformer> CreateDqDataSinkConstraintTransformer(TDqState::TPtr state) { + return THolder<IGraphTransformer>(new TDqDataSinkConstraintTransformer(std::move(state))); +} + +} + diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_datasink_constraints.h b/ydb/library/yql/providers/dq/provider/yql_dq_datasink_constraints.h new file mode 100644 index 00000000000..1f5976f958d --- /dev/null +++ b/ydb/library/yql/providers/dq/provider/yql_dq_datasink_constraints.h @@ -0,0 +1,11 @@ +#pragma once + +#include "yql_dq_state.h" + +#include <ydb/library/yql/core/yql_graph_transformer.h> + +namespace NYql { + +THolder<IGraphTransformer> CreateDqDataSinkConstraintTransformer(TDqState::TPtr state); + +} // NYql diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_datasource.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_datasource.cpp index 22ea4dad940..6db7a0fe8ac 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_datasource.cpp +++ b/ydb/library/yql/providers/dq/provider/yql_dq_datasource.cpp @@ -1,4 +1,5 @@ #include "yql_dq_datasource.h" +#include "yql_dq_datasource_constraints.h" #include "yql_dq_datasource_type_ann.h" #include "yql_dq_state.h" @@ -43,6 +44,7 @@ public: }) , ExecTransformer([this, execTransformerFactory] () { return THolder<IGraphTransformer>(execTransformerFactory(State)); }) , TypeAnnotationTransformer([] () { return CreateDqsDataSourceTypeAnnotationTransformer(); }) + , ConstraintsTransformer([state] () { return CreateDqDataSourceConstraintTransformer(state); }) { } TStringBuf GetName() const override { @@ -54,6 +56,11 @@ public: return *TypeAnnotationTransformer; } + IGraphTransformer& GetConstraintTransformer(bool instantOnly, bool subGraph) override { + Y_UNUSED(instantOnly && subGraph); + return *ConstraintsTransformer; + } + IGraphTransformer& GetConfigurationTransformer() override { return *ConfigurationTransformer; } @@ -234,6 +241,7 @@ private: TLazyInitHolder<IGraphTransformer> ConfigurationTransformer; TLazyInitHolder<IGraphTransformer> ExecTransformer; TLazyInitHolder<TVisitorTransformerBase> TypeAnnotationTransformer; + TLazyInitHolder<IGraphTransformer> ConstraintsTransformer; }; TIntrusivePtr<IDataProvider> CreateDqDataSource(const TDqStatePtr& state, TExecTransformerFactory execTransformerFactory) { diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_datasource_constraints.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_datasource_constraints.cpp new file mode 100644 index 00000000000..8dc3a60870f --- /dev/null +++ b/ydb/library/yql/providers/dq/provider/yql_dq_datasource_constraints.cpp @@ -0,0 +1,44 @@ +#include "yql_dq_state.h" + +#include <ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.h> +#include <ydb/library/yql/providers/common/transform/yql_visit.h> +#include <ydb/library/yql/core/yql_expr_constraint.h> +#include <ydb/library/yql/ast/yql_constraint.h> + +namespace NYql { + +using namespace NNodes; + +namespace { + +class TDqDataSourceConstraintTransformer : public TVisitorTransformerBase { +public: + TDqDataSourceConstraintTransformer(TDqState::TPtr state) + : TVisitorTransformerBase(true) + , State_(std::move(state)) + { + AddHandler({ + TCoConfigure::CallableName(), + TDqReadWrap::CallableName(), + TDqReadWideWrap::CallableName(), + TDqSource::CallableName(), + TDqSourceWrap::CallableName(), + TDqSourceWideWrap::CallableName(), + TDqSourceWideBlockWrap::CallableName() + }, Hndl(&TDqDataSourceConstraintTransformer::HandleDefault)); + } + + TStatus HandleDefault(TExprBase, TExprContext&) { + return TStatus::Ok; + } +private: + const TDqState::TPtr State_; +}; + +} + +THolder<IGraphTransformer> CreateDqDataSourceConstraintTransformer(TDqState::TPtr state) { + return THolder<IGraphTransformer>(new TDqDataSourceConstraintTransformer(std::move(state))); +} + +} diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_datasource_constraints.h b/ydb/library/yql/providers/dq/provider/yql_dq_datasource_constraints.h new file mode 100644 index 00000000000..22aa5716e9c --- /dev/null +++ b/ydb/library/yql/providers/dq/provider/yql_dq_datasource_constraints.h @@ -0,0 +1,12 @@ +#pragma once + +#include "yql_dq_state.h" + +#include <ydb/library/yql/core/yql_graph_transformer.h> + +namespace NYql { + +THolder<IGraphTransformer> CreateDqDataSourceConstraintTransformer(TDqState::TPtr state); + +} // NYql + diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_state.h b/ydb/library/yql/providers/dq/provider/yql_dq_state.h index 4c4ef093517..5a369805efc 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_state.h +++ b/ydb/library/yql/providers/dq/provider/yql_dq_state.h @@ -15,6 +15,8 @@ namespace NYql { using namespace NDqs; // TODO: remove this namespace; struct TDqState: public TThrRefBase { + using TPtr = TIntrusivePtr<TDqState>; + IDqGateway::TPtr DqGateway; const TGatewaysConfig* GatewaysConfig; const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry; |