aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authora-romanov <Anton.Romanov@ydb.tech>2022-12-19 17:52:52 +0300
committera-romanov <Anton.Romanov@ydb.tech>2022-12-19 17:52:52 +0300
commitbc9bfa700f812cea3b635c19f46b6db627b6d41e (patch)
treeabf826a0ace7b711751d29ee1980a8b84cb9e0ae
parent417bca7d9cf2fe1432e3354ade750c6187bbf396 (diff)
downloadydb-bc9bfa700f812cea3b635c19f46b6db627b6d41e.tar.gz
Add sorted for DqCnMerge.
-rw-r--r--ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp12
-rw-r--r--ydb/library/yql/providers/dq/provider/yql_dq_datasink_constraints.cpp47
-rw-r--r--ydb/library/yql/providers/dq/provider/yql_dq_datasink_constraints.h4
-rw-r--r--ydb/library/yql/providers/dq/provider/yql_dq_datasource.cpp13
-rw-r--r--ydb/library/yql/providers/dq/provider/yql_dq_datasource_constraints.cpp9
-rw-r--r--ydb/library/yql/providers/dq/provider/yql_dq_datasource_constraints.h5
6 files changed, 57 insertions, 33 deletions
diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp
index fe84536e0c4..094c786fb53 100644
--- a/ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp
+++ b/ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp
@@ -24,9 +24,11 @@ namespace NYql {
using namespace NNodes;
+namespace {
+
class TDqDataProviderSink: public TDataProviderBase {
public:
- TDqDataProviderSink(const TDqStatePtr& state)
+ TDqDataProviderSink(const TDqState::TPtr& state)
: State(state)
, LogOptTransformer([state] () { return CreateDqsLogOptTransformer(state->TypeCtx, state->Settings); })
, PhyOptTransformer([state] () { return CreateDqsPhyOptTransformer(/*TODO*/nullptr, state->Settings, state->TypeCtx->UseBlocks ); })
@@ -35,7 +37,7 @@ public:
return CreateDqsDataSinkTypeAnnotationTransformer(
state->TypeCtx, state->Settings->EnableDqReplicate.Get().GetOrElse(TDqSettings::TDefault::EnableDqReplicate));
})
- , ConstraintsTransformer([state] () { return CreateDqDataSinkConstraintTransformer(state); })
+ , ConstraintsTransformer([] () { return CreateDqDataSinkConstraintTransformer(); })
, RecaptureTransformer([state] () { return CreateDqsRecaptureTransformer(state); })
{ }
@@ -275,7 +277,7 @@ public:
}
}
- TDqStatePtr State;
+ const TDqState::TPtr State;
TLazyInitHolder<IGraphTransformer> LogOptTransformer;
TLazyInitHolder<IGraphTransformer> PhyOptTransformer;
@@ -285,7 +287,9 @@ public:
TLazyInitHolder<IGraphTransformer> RecaptureTransformer;
};
-TIntrusivePtr<IDataProvider> CreateDqDataSink(const TDqStatePtr& state) {
+}
+
+TIntrusivePtr<IDataProvider> CreateDqDataSink(const TDqState::TPtr& state) {
return new TDqDataProviderSink(state);
}
diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_datasink_constraints.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_datasink_constraints.cpp
index 306e230f8fb..e201a05d13c 100644
--- a/ydb/library/yql/providers/dq/provider/yql_dq_datasink_constraints.cpp
+++ b/ydb/library/yql/providers/dq/provider/yql_dq_datasink_constraints.cpp
@@ -11,11 +11,27 @@ using namespace NNodes;
namespace {
+template <class... Other>
+struct TCopyConstraint;
+
+template <>
+struct TCopyConstraint<> {
+ static void Do(const TExprNode&, const TExprNode::TPtr&) {}
+};
+
+template <class TConstraint, class... Other>
+struct TCopyConstraint<TConstraint, Other...> {
+ static void Do(const TExprNode& from, const TExprNode::TPtr& to) {
+ if (const auto c = from.GetConstraint<TConstraint>())
+ to->AddConstraint(c);
+ TCopyConstraint<Other...>::Do(from, to);
+ }
+};
+
class TDqDataSinkConstraintTransformer : public TVisitorTransformerBase {
public:
- TDqDataSinkConstraintTransformer(TDqState::TPtr state)
+ TDqDataSinkConstraintTransformer()
: TVisitorTransformerBase(true)
- , State_(std::move(state))
{
AddHandler({TDqStage::CallableName(), TDqPhyStage::CallableName()}, Hndl(&TDqDataSinkConstraintTransformer::HandleStage));
AddHandler({TDqOutput::CallableName()}, Hndl(&TDqDataSinkConstraintTransformer::HandleOutput));
@@ -24,10 +40,10 @@ public:
TDqCnBroadcast::CallableName(),
TDqCnMap::CallableName(),
TDqCnHashShuffle::CallableName(),
- TDqCnMerge::CallableName(),
TDqCnResult::CallableName(),
TDqCnValue::CallableName()
}, Hndl(&TDqDataSinkConstraintTransformer::HandleConnection));
+ AddHandler({TDqCnMerge::CallableName()}, Hndl(&TDqDataSinkConstraintTransformer::HandleMerge));
AddHandler({TDqReplicate::CallableName()}, Hndl(&TDqDataSinkConstraintTransformer::HandleReplicate));
AddHandler({
TDqJoin::CallableName(),
@@ -61,16 +77,25 @@ public:
if (const auto set = multi->GetItem(FromString<ui32>(output.Index().Value())))
input.Ptr()->SetConstraints(*set);
} else
- input.Ptr()->CopyConstraints(output.Stage().Program().Body().Ref()); // TODO: Copy onli limited set of constraints.
+ input.Ptr()->CopyConstraints(output.Stage().Program().Body().Ref());
return TStatus::Ok;
}
TStatus HandleConnection(TExprBase input, TExprContext&) {
const auto output = input.Cast<TDqConnection>().Output();
- if (const auto u = output.Ref().GetConstraint<TUniqueConstraintNode>())
- input.Ptr()->AddConstraint(u);
- if (const auto e = output.Ref().GetConstraint<TEmptyConstraintNode>())
- input.Ptr()->AddConstraint(e);
+ TCopyConstraint<TUniqueConstraintNode, TEmptyConstraintNode>::Do(output.Ref(), input.Ptr());
+ return TStatus::Ok;
+ }
+
+ TStatus HandleMerge(TExprBase input, TExprContext& ctx) {
+ const auto output = input.Cast<TDqCnMerge>().Output();
+ if (const auto outSorted = output.Ref().GetConstraint<TSortedConstraintNode>())
+ input.Ptr()->AddConstraint(outSorted);
+ else {
+ ctx.AddError(TIssue(ctx.GetPosition(input.Pos()), "Expected sorted constraint on stage output."));
+ return TStatus::Error;
+ }
+ TCopyConstraint<TUniqueConstraintNode, TEmptyConstraintNode>::Do(output.Ref(), input.Ptr());
return TStatus::Ok;
}
@@ -97,14 +122,12 @@ public:
return TStatus::Ok;
}
-private:
- const TDqState::TPtr State_;
};
}
-THolder<IGraphTransformer> CreateDqDataSinkConstraintTransformer(TDqState::TPtr state) {
- return THolder<IGraphTransformer>(new TDqDataSinkConstraintTransformer(std::move(state)));
+THolder<IGraphTransformer> CreateDqDataSinkConstraintTransformer() {
+ return THolder<IGraphTransformer>(new TDqDataSinkConstraintTransformer());
}
}
diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_datasink_constraints.h b/ydb/library/yql/providers/dq/provider/yql_dq_datasink_constraints.h
index 1f5976f958d..f78355f4c12 100644
--- a/ydb/library/yql/providers/dq/provider/yql_dq_datasink_constraints.h
+++ b/ydb/library/yql/providers/dq/provider/yql_dq_datasink_constraints.h
@@ -1,11 +1,9 @@
#pragma once
-#include "yql_dq_state.h"
-
#include <ydb/library/yql/core/yql_graph_transformer.h>
namespace NYql {
-THolder<IGraphTransformer> CreateDqDataSinkConstraintTransformer(TDqState::TPtr state);
+THolder<IGraphTransformer> CreateDqDataSinkConstraintTransformer();
} // NYql
diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_datasource.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_datasource.cpp
index 6db7a0fe8ac..b617b7da7c0 100644
--- a/ydb/library/yql/providers/dq/provider/yql_dq_datasource.cpp
+++ b/ydb/library/yql/providers/dq/provider/yql_dq_datasource.cpp
@@ -35,16 +35,18 @@ using namespace NKikimr::NMiniKQL;
using namespace NNodes;
using namespace NDq;
+namespace {
+
class TDqDataProviderSource: public TDataProviderBase {
public:
- TDqDataProviderSource(const TDqStatePtr& state, TExecTransformerFactory execTransformerFactory)
+ TDqDataProviderSource(const TDqState::TPtr& state, TExecTransformerFactory execTransformerFactory)
: State(state)
, ConfigurationTransformer([this]() {
return MakeHolder<NCommon::TProviderConfigurationTransformer>(State->Settings, *State->TypeCtx, TString{DqProviderName});
})
, ExecTransformer([this, execTransformerFactory] () { return THolder<IGraphTransformer>(execTransformerFactory(State)); })
, TypeAnnotationTransformer([] () { return CreateDqsDataSourceTypeAnnotationTransformer(); })
- , ConstraintsTransformer([state] () { return CreateDqDataSourceConstraintTransformer(state); })
+ , ConstraintsTransformer([] () { return CreateDqDataSourceConstraintTransformer(); })
{ }
TStringBuf GetName() const override {
@@ -233,18 +235,21 @@ public:
if (ExecTransformer) {
ExecTransformer->Rewind();
TypeAnnotationTransformer->Rewind();
+ ConstraintsTransformer->Rewind();
}
}
private:
- TDqStatePtr State;
+ const TDqState::TPtr State;
TLazyInitHolder<IGraphTransformer> ConfigurationTransformer;
TLazyInitHolder<IGraphTransformer> ExecTransformer;
TLazyInitHolder<TVisitorTransformerBase> TypeAnnotationTransformer;
TLazyInitHolder<IGraphTransformer> ConstraintsTransformer;
};
-TIntrusivePtr<IDataProvider> CreateDqDataSource(const TDqStatePtr& state, TExecTransformerFactory execTransformerFactory) {
+}
+
+TIntrusivePtr<IDataProvider> CreateDqDataSource(const TDqState::TPtr& state, TExecTransformerFactory execTransformerFactory) {
return new TDqDataProviderSource(state, execTransformerFactory);
}
diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_datasource_constraints.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_datasource_constraints.cpp
index 8dc3a60870f..914c9156259 100644
--- a/ydb/library/yql/providers/dq/provider/yql_dq_datasource_constraints.cpp
+++ b/ydb/library/yql/providers/dq/provider/yql_dq_datasource_constraints.cpp
@@ -13,9 +13,8 @@ namespace {
class TDqDataSourceConstraintTransformer : public TVisitorTransformerBase {
public:
- TDqDataSourceConstraintTransformer(TDqState::TPtr state)
+ TDqDataSourceConstraintTransformer()
: TVisitorTransformerBase(true)
- , State_(std::move(state))
{
AddHandler({
TCoConfigure::CallableName(),
@@ -31,14 +30,12 @@ public:
TStatus HandleDefault(TExprBase, TExprContext&) {
return TStatus::Ok;
}
-private:
- const TDqState::TPtr State_;
};
}
-THolder<IGraphTransformer> CreateDqDataSourceConstraintTransformer(TDqState::TPtr state) {
- return THolder<IGraphTransformer>(new TDqDataSourceConstraintTransformer(std::move(state)));
+THolder<IGraphTransformer> CreateDqDataSourceConstraintTransformer() {
+ return THolder<IGraphTransformer>(new TDqDataSourceConstraintTransformer());
}
}
diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_datasource_constraints.h b/ydb/library/yql/providers/dq/provider/yql_dq_datasource_constraints.h
index 22aa5716e9c..06639abfe45 100644
--- a/ydb/library/yql/providers/dq/provider/yql_dq_datasource_constraints.h
+++ b/ydb/library/yql/providers/dq/provider/yql_dq_datasource_constraints.h
@@ -1,12 +1,9 @@
#pragma once
-#include "yql_dq_state.h"
-
#include <ydb/library/yql/core/yql_graph_transformer.h>
namespace NYql {
-THolder<IGraphTransformer> CreateDqDataSourceConstraintTransformer(TDqState::TPtr state);
+THolder<IGraphTransformer> CreateDqDataSourceConstraintTransformer();
} // NYql
-