aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authora-romanov <Anton.Romanov@ydb.tech>2022-12-13 16:28:35 +0300
committera-romanov <Anton.Romanov@ydb.tech>2022-12-13 16:28:35 +0300
commitd60a1bdddafea6dea38cdbcbb74adabb7689999d (patch)
tree634c1fddb85bd141ef838cbd899e26c7ad93001d
parentf2bea70bea01921ec43846224d100f2c70dd5719 (diff)
downloadydb-d60a1bdddafea6dea38cdbcbb74adabb7689999d.tar.gz
Initial support constraints in DQ.
-rw-r--r--ydb/library/yql/ast/yql_constraint.h4
-rw-r--r--ydb/library/yql/providers/dq/provider/CMakeLists.txt2
-rw-r--r--ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp8
-rw-r--r--ydb/library/yql/providers/dq/provider/yql_dq_datasink_constraints.cpp111
-rw-r--r--ydb/library/yql/providers/dq/provider/yql_dq_datasink_constraints.h11
-rw-r--r--ydb/library/yql/providers/dq/provider/yql_dq_datasource.cpp8
-rw-r--r--ydb/library/yql/providers/dq/provider/yql_dq_datasource_constraints.cpp44
-rw-r--r--ydb/library/yql/providers/dq/provider/yql_dq_datasource_constraints.h12
-rw-r--r--ydb/library/yql/providers/dq/provider/yql_dq_state.h2
9 files changed, 202 insertions, 0 deletions
diff --git a/ydb/library/yql/ast/yql_constraint.h b/ydb/library/yql/ast/yql_constraint.h
index ec5928d6f1e..6b9a0b570b8 100644
--- a/ydb/library/yql/ast/yql_constraint.h
+++ b/ydb/library/yql/ast/yql_constraint.h
@@ -119,6 +119,10 @@ public:
Constraints_.clear();
}
+ explicit operator bool() const {
+ return !Constraints_.empty();
+ }
+
bool operator ==(const TConstraintSet& s) const {
return Constraints_ == s.Constraints_;
}
diff --git a/ydb/library/yql/providers/dq/provider/CMakeLists.txt b/ydb/library/yql/providers/dq/provider/CMakeLists.txt
index 79c518f259f..80deccd07a3 100644
--- a/ydb/library/yql/providers/dq/provider/CMakeLists.txt
+++ b/ydb/library/yql/providers/dq/provider/CMakeLists.txt
@@ -42,7 +42,9 @@ target_link_libraries(providers-dq-provider PUBLIC
)
target_sources(providers-dq-provider PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/provider/yql_dq_control.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/provider/yql_dq_datasink_constraints.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/provider/yql_dq_datasource_constraints.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/provider/yql_dq_datasource_type_ann.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/provider/yql_dq_provider.cpp
diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp
index f8fdd0b63e1..fe84536e0c4 100644
--- a/ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp
+++ b/ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp
@@ -1,5 +1,6 @@
#include "yql_dq_datasink.h"
#include "yql_dq_state.h"
+#include "yql_dq_datasink_constraints.h"
#include "yql_dq_datasink_type_ann.h"
#include "yql_dq_recapture.h"
@@ -34,6 +35,7 @@ public:
return CreateDqsDataSinkTypeAnnotationTransformer(
state->TypeCtx, state->Settings->EnableDqReplicate.Get().GetOrElse(TDqSettings::TDefault::EnableDqReplicate));
})
+ , ConstraintsTransformer([state] () { return CreateDqDataSinkConstraintTransformer(state); })
, RecaptureTransformer([state] () { return CreateDqsRecaptureTransformer(state); })
{ }
@@ -189,6 +191,11 @@ public:
return *TypeAnnotationTransformer;
}
+ IGraphTransformer& GetConstraintTransformer(bool instantOnly, bool subGraph) override {
+ Y_UNUSED(instantOnly && subGraph);
+ return *ConstraintsTransformer;
+ }
+
IGraphTransformer& GetRecaptureOptProposalTransformer() override {
return *RecaptureTransformer;
}
@@ -274,6 +281,7 @@ public:
TLazyInitHolder<IGraphTransformer> PhyOptTransformer;
TLazyInitHolder<IGraphTransformer> PhysicalFinalizingTransformer;
TLazyInitHolder<TVisitorTransformerBase> TypeAnnotationTransformer;
+ TLazyInitHolder<IGraphTransformer> ConstraintsTransformer;
TLazyInitHolder<IGraphTransformer> RecaptureTransformer;
};
diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_datasink_constraints.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_datasink_constraints.cpp
new file mode 100644
index 00000000000..306e230f8fb
--- /dev/null
+++ b/ydb/library/yql/providers/dq/provider/yql_dq_datasink_constraints.cpp
@@ -0,0 +1,111 @@
+#include "yql_dq_state.h"
+
+#include <ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.h>
+#include <ydb/library/yql/providers/common/transform/yql_visit.h>
+#include <ydb/library/yql/core/yql_expr_constraint.h>
+#include <ydb/library/yql/ast/yql_constraint.h>
+
+namespace NYql {
+
+using namespace NNodes;
+
+namespace {
+
+class TDqDataSinkConstraintTransformer : public TVisitorTransformerBase {
+public:
+ TDqDataSinkConstraintTransformer(TDqState::TPtr state)
+ : TVisitorTransformerBase(true)
+ , State_(std::move(state))
+ {
+ AddHandler({TDqStage::CallableName(), TDqPhyStage::CallableName()}, Hndl(&TDqDataSinkConstraintTransformer::HandleStage));
+ AddHandler({TDqOutput::CallableName()}, Hndl(&TDqDataSinkConstraintTransformer::HandleOutput));
+ AddHandler({
+ TDqCnUnionAll::CallableName(),
+ TDqCnBroadcast::CallableName(),
+ TDqCnMap::CallableName(),
+ TDqCnHashShuffle::CallableName(),
+ TDqCnMerge::CallableName(),
+ TDqCnResult::CallableName(),
+ TDqCnValue::CallableName()
+ }, Hndl(&TDqDataSinkConstraintTransformer::HandleConnection));
+ AddHandler({TDqReplicate::CallableName()}, Hndl(&TDqDataSinkConstraintTransformer::HandleReplicate));
+ AddHandler({
+ TDqJoin::CallableName(),
+ TDqPhyMapJoin::CallableName(),
+ TDqPhyCrossJoin::CallableName(),
+ TDqPhyJoinDict::CallableName(),
+ TDqSink::CallableName(),
+ TDqWrite::CallableName(),
+ TDqQuery::CallableName(),
+ TDqPrecompute::CallableName(),
+ TDqPhyPrecompute::CallableName(),
+ TDqTransform::CallableName()
+ }, Hndl(&TDqDataSinkConstraintTransformer::HandleDefault));
+ }
+
+ TStatus HandleDefault(TExprBase, TExprContext&) {
+ return TStatus::Ok;
+ }
+
+ TStatus HandleStage(TExprBase input, TExprContext& ctx) {
+ const auto stage = input.Cast<TDqStageBase>();
+ TSmallVec<TConstraintNode::TListType> argConstraints(stage.Inputs().Size());
+ for (auto i = 0U; i < argConstraints.size(); ++i)
+ argConstraints[i] = stage.Inputs().Item(i).Ref().GetAllConstraints();
+ return UpdateLambdaConstraints(stage.Ptr()->ChildRef(TDqStageBase::idx_Program), ctx, argConstraints);
+ }
+
+ TStatus HandleOutput(TExprBase input, TExprContext&) {
+ const auto output = input.Cast<TDqOutput>();
+ if (const auto multi = output.Stage().Program().Body().Ref().GetConstraint<TMultiConstraintNode>()) {
+ if (const auto set = multi->GetItem(FromString<ui32>(output.Index().Value())))
+ input.Ptr()->SetConstraints(*set);
+ } else
+ input.Ptr()->CopyConstraints(output.Stage().Program().Body().Ref()); // TODO: Copy onli limited set of constraints.
+ return TStatus::Ok;
+ }
+
+ TStatus HandleConnection(TExprBase input, TExprContext&) {
+ const auto output = input.Cast<TDqConnection>().Output();
+ if (const auto u = output.Ref().GetConstraint<TUniqueConstraintNode>())
+ input.Ptr()->AddConstraint(u);
+ if (const auto e = output.Ref().GetConstraint<TEmptyConstraintNode>())
+ input.Ptr()->AddConstraint(e);
+ return TStatus::Ok;
+ }
+
+ TStatus HandleReplicate(TExprBase input, TExprContext& ctx) {
+ const auto replicate = input.Cast<TDqReplicate>();
+ TSmallVec<TConstraintNode::TListType> argConstraints(1U, replicate.Input().Ref().GetAllConstraints());
+ TStatus status = TStatus::Ok;
+
+ for (auto i = 1U; i < replicate.Ref().ChildrenSize(); ++i)
+ status = status.Combine(UpdateLambdaConstraints(replicate.Ptr()->ChildRef(i), ctx, argConstraints));
+
+ if (status != TStatus::Ok)
+ return status;
+
+ TMultiConstraintNode::TMapType map;
+ map.reserve(replicate.Ref().ChildrenSize() - 1U);
+
+ for (auto i = 1U; i < replicate.Ref().ChildrenSize(); ++i)
+ if (auto constraints = replicate.Ref().Child(i)->Tail().GetConstraintSet())
+ map.insert_unique(std::make_pair(i - 1U, std::move(constraints)));
+
+ if (!map.empty())
+ input.Ptr()->AddConstraint(ctx.MakeConstraint<TMultiConstraintNode>(std::move(map)));
+
+ return TStatus::Ok;
+ }
+private:
+ const TDqState::TPtr State_;
+};
+
+}
+
+THolder<IGraphTransformer> CreateDqDataSinkConstraintTransformer(TDqState::TPtr state) {
+ return THolder<IGraphTransformer>(new TDqDataSinkConstraintTransformer(std::move(state)));
+}
+
+}
+
diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_datasink_constraints.h b/ydb/library/yql/providers/dq/provider/yql_dq_datasink_constraints.h
new file mode 100644
index 00000000000..1f5976f958d
--- /dev/null
+++ b/ydb/library/yql/providers/dq/provider/yql_dq_datasink_constraints.h
@@ -0,0 +1,11 @@
+#pragma once
+
+#include "yql_dq_state.h"
+
+#include <ydb/library/yql/core/yql_graph_transformer.h>
+
+namespace NYql {
+
+THolder<IGraphTransformer> CreateDqDataSinkConstraintTransformer(TDqState::TPtr state);
+
+} // NYql
diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_datasource.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_datasource.cpp
index 22ea4dad940..6db7a0fe8ac 100644
--- a/ydb/library/yql/providers/dq/provider/yql_dq_datasource.cpp
+++ b/ydb/library/yql/providers/dq/provider/yql_dq_datasource.cpp
@@ -1,4 +1,5 @@
#include "yql_dq_datasource.h"
+#include "yql_dq_datasource_constraints.h"
#include "yql_dq_datasource_type_ann.h"
#include "yql_dq_state.h"
@@ -43,6 +44,7 @@ public:
})
, ExecTransformer([this, execTransformerFactory] () { return THolder<IGraphTransformer>(execTransformerFactory(State)); })
, TypeAnnotationTransformer([] () { return CreateDqsDataSourceTypeAnnotationTransformer(); })
+ , ConstraintsTransformer([state] () { return CreateDqDataSourceConstraintTransformer(state); })
{ }
TStringBuf GetName() const override {
@@ -54,6 +56,11 @@ public:
return *TypeAnnotationTransformer;
}
+ IGraphTransformer& GetConstraintTransformer(bool instantOnly, bool subGraph) override {
+ Y_UNUSED(instantOnly && subGraph);
+ return *ConstraintsTransformer;
+ }
+
IGraphTransformer& GetConfigurationTransformer() override {
return *ConfigurationTransformer;
}
@@ -234,6 +241,7 @@ private:
TLazyInitHolder<IGraphTransformer> ConfigurationTransformer;
TLazyInitHolder<IGraphTransformer> ExecTransformer;
TLazyInitHolder<TVisitorTransformerBase> TypeAnnotationTransformer;
+ TLazyInitHolder<IGraphTransformer> ConstraintsTransformer;
};
TIntrusivePtr<IDataProvider> CreateDqDataSource(const TDqStatePtr& state, TExecTransformerFactory execTransformerFactory) {
diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_datasource_constraints.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_datasource_constraints.cpp
new file mode 100644
index 00000000000..8dc3a60870f
--- /dev/null
+++ b/ydb/library/yql/providers/dq/provider/yql_dq_datasource_constraints.cpp
@@ -0,0 +1,44 @@
+#include "yql_dq_state.h"
+
+#include <ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.h>
+#include <ydb/library/yql/providers/common/transform/yql_visit.h>
+#include <ydb/library/yql/core/yql_expr_constraint.h>
+#include <ydb/library/yql/ast/yql_constraint.h>
+
+namespace NYql {
+
+using namespace NNodes;
+
+namespace {
+
+class TDqDataSourceConstraintTransformer : public TVisitorTransformerBase {
+public:
+ TDqDataSourceConstraintTransformer(TDqState::TPtr state)
+ : TVisitorTransformerBase(true)
+ , State_(std::move(state))
+ {
+ AddHandler({
+ TCoConfigure::CallableName(),
+ TDqReadWrap::CallableName(),
+ TDqReadWideWrap::CallableName(),
+ TDqSource::CallableName(),
+ TDqSourceWrap::CallableName(),
+ TDqSourceWideWrap::CallableName(),
+ TDqSourceWideBlockWrap::CallableName()
+ }, Hndl(&TDqDataSourceConstraintTransformer::HandleDefault));
+ }
+
+ TStatus HandleDefault(TExprBase, TExprContext&) {
+ return TStatus::Ok;
+ }
+private:
+ const TDqState::TPtr State_;
+};
+
+}
+
+THolder<IGraphTransformer> CreateDqDataSourceConstraintTransformer(TDqState::TPtr state) {
+ return THolder<IGraphTransformer>(new TDqDataSourceConstraintTransformer(std::move(state)));
+}
+
+}
diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_datasource_constraints.h b/ydb/library/yql/providers/dq/provider/yql_dq_datasource_constraints.h
new file mode 100644
index 00000000000..22aa5716e9c
--- /dev/null
+++ b/ydb/library/yql/providers/dq/provider/yql_dq_datasource_constraints.h
@@ -0,0 +1,12 @@
+#pragma once
+
+#include "yql_dq_state.h"
+
+#include <ydb/library/yql/core/yql_graph_transformer.h>
+
+namespace NYql {
+
+THolder<IGraphTransformer> CreateDqDataSourceConstraintTransformer(TDqState::TPtr state);
+
+} // NYql
+
diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_state.h b/ydb/library/yql/providers/dq/provider/yql_dq_state.h
index 4c4ef093517..5a369805efc 100644
--- a/ydb/library/yql/providers/dq/provider/yql_dq_state.h
+++ b/ydb/library/yql/providers/dq/provider/yql_dq_state.h
@@ -15,6 +15,8 @@ namespace NYql {
using namespace NDqs; // TODO: remove this namespace;
struct TDqState: public TThrRefBase {
+ using TPtr = TIntrusivePtr<TDqState>;
+
IDqGateway::TPtr DqGateway;
const TGatewaysConfig* GatewaysConfig;
const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry;