aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDaniil Cherednik <dcherednik@ydb.tech>2023-10-24 14:02:02 +0300
committerdcherednik <dcherednik@ydb.tech>2023-10-24 15:02:39 +0300
commitd7bdf55cbf2375fa4a0d75d8b0d42cbaa39832fc (patch)
tree88acb4ae93d9565d977d69f4b7f96a1d1ad0551c
parent192ac5495fea615e8b7edd968d3f98fe7696c8b0 (diff)
downloadydb-d7bdf55cbf2375fa4a0d75d8b0d42cbaa39832fc.tar.gz
Make uniq helper a bit more clear
Make uniq helper a bit more clear Pull Request resolved: https://github.com/ydb-platform/ydb/pull/404
-rw-r--r--ydb/core/kqp/opt/physical/effects/kqp_opt_phy_insert.cpp16
-rw-r--r--ydb/core/kqp/opt/physical/effects/kqp_opt_phy_uniq_helper.cpp294
-rw-r--r--ydb/core/kqp/opt/physical/effects/kqp_opt_phy_uniq_helper.h42
-rw-r--r--ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_index.cpp16
4 files changed, 237 insertions, 131 deletions
diff --git a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_insert.cpp b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_insert.cpp
index db291902bbc..e81a91e87ce 100644
--- a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_insert.cpp
+++ b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_insert.cpp
@@ -16,15 +16,15 @@ TMaybeNode<TDqCnUnionAll> MakeConditionalInsertRows(const TExprBase& input, cons
return {};
}
- TUniqBuildHelper helper(table, pos, ctx, false);
- auto computeKeysStage = helper.CreateComputeKeysStage(condenseResult.GetRef(), pos, ctx);
+ auto helper = CreateInsertUniqBuildHelper(table, pos, ctx);
+ auto computeKeysStage = helper->CreateComputeKeysStage(condenseResult.GetRef(), pos, ctx);
- auto inputPrecompute = helper.CreateInputPrecompute(computeKeysStage, pos, ctx);
- auto uniquePrecomputes = helper.CreateUniquePrecompute(computeKeysStage, pos, ctx);
+ auto inputPrecompute = helper->CreateInputPrecompute(computeKeysStage, pos, ctx);
+ auto uniquePrecomputes = helper->CreateUniquePrecompute(computeKeysStage, pos, ctx);
auto _true = MakeBool(pos, true, ctx);
- auto aggrStage = helper.CreateLookupExistStage(computeKeysStage, table, _true, pos, ctx);
+ auto aggrStage = helper->CreateLookupExistStage(computeKeysStage, table, _true, pos, ctx);
// Returns <bool>: <true> - no existing keys, <false> - at least one key exists
auto noExistingKeysPrecompute = Build<TDqPhyPrecompute>(ctx, pos)
@@ -43,14 +43,14 @@ TMaybeNode<TDqCnUnionAll> MakeConditionalInsertRows(const TExprBase& input, cons
}
TVector<TExprNode::TPtr> Bodies;
TVector<TCoArgument> Args;
- } uniqueCheckNodes(helper.GetChecksNum());
+ } uniqueCheckNodes(helper->GetChecksNum());
TCoArgument noExistingKeysArg(ctx.NewArgument(pos, "no_existing_keys"));
TExprNode::TPtr noExistingKeysCheck;
// Build condition checks depending on INSERT kind
if (abortOnError) {
- for (size_t i = 0; i < helper.GetChecksNum(); i++) {
+ for (size_t i = 0; i < helper->GetChecksNum(); i++) {
uniqueCheckNodes.Args.emplace_back(ctx.NewArgument(pos, "are_keys_unique"));
uniqueCheckNodes.Bodies.emplace_back(Build<TKqpEnsure>(ctx, pos)
.Value(_true)
@@ -68,7 +68,7 @@ TMaybeNode<TDqCnUnionAll> MakeConditionalInsertRows(const TExprBase& input, cons
.Message(MakeMessage("Conflict with existing key.", pos, ctx))
.Done().Ptr();
} else {
- for (size_t i = 0; i < helper.GetChecksNum(); i++) {
+ for (size_t i = 0; i < helper->GetChecksNum(); i++) {
uniqueCheckNodes.Args.emplace_back(ctx.NewArgument(pos, "are_keys_unique"));
uniqueCheckNodes.Bodies.emplace_back(uniqueCheckNodes.Args.back().Ptr());
}
diff --git a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_uniq_helper.cpp b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_uniq_helper.cpp
index dc17cc56519..ea2b23015ab 100644
--- a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_uniq_helper.cpp
+++ b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_uniq_helper.cpp
@@ -18,49 +18,159 @@ struct TLookupNodes {
TVector<TCoArgument> Args;
};
-TDqCnUnionAll CreateLookupStageWithConnection(const TDqStage& computeKeysStage, size_t stageOut,
- const NYql::TKikimrTableMetadata& mainTableMeta, int indexId, TExprNode::TPtr _false,
- std::pair<TExprNode::TPtr, size_t> pkChecks, TPositionHandle pos, TExprContext& ctx)
+
+NYql::TExprNode::TPtr MakeUniqCheckDict(const TCoLambda& selector,
+ const TExprBase& rowsListArg, TPositionHandle pos, TExprContext& ctx)
{
- const NYql::TKikimrTableMetadata* meta;
- if (indexId == -1) {
- pkChecks.first.Reset();
- meta = &mainTableMeta;
- } else {
- YQL_ENSURE((size_t)indexId < mainTableMeta.SecondaryGlobalIndexMetadata.size());
- meta = mainTableMeta.SecondaryGlobalIndexMetadata[indexId].Get();
+ return Build<TCoToDict>(ctx, pos)
+ .List(rowsListArg)
+ .KeySelector(selector)
+ .PayloadSelector()
+ .Args({"stub"})
+ .Body<TCoVoid>()
+ .Build()
+ .Build()
+ .Settings()
+ .Add().Build("One")
+ .Add().Build("Hashed")
+ .Build()
+ .Done().Ptr();
+}
+
+class TInsertUniqBuildHelper : public TUniqBuildHelper {
+public:
+ TInsertUniqBuildHelper(const NYql::TKikimrTableDescription& table, NYql::TPositionHandle pos,
+ NYql::TExprContext& ctx)
+ : TUniqBuildHelper(table, pos, ctx, false)
+ {}
+
+private:
+ const NYql::TExprNode::TPtr GetPkDict() const override {
+ return {};
}
- TVector<TExprBase> inputs;
- TVector<TCoArgument> args;
+ TDqCnUnionAll CreateLookupStageWithConnection(const TDqStage& computeKeysStage, size_t stageOut,
+ const NYql::TKikimrTableMetadata& mainTableMeta, TUniqCheckNodes::TIndexId indexId,
+ TPositionHandle pos, TExprContext& ctx) const override
+ {
+ const NYql::TKikimrTableMetadata* meta;
+ if (indexId == -1) {
+ meta = &mainTableMeta;
+ } else {
+ YQL_ENSURE((size_t)indexId < mainTableMeta.SecondaryGlobalIndexMetadata.size());
+ meta = mainTableMeta.SecondaryGlobalIndexMetadata[indexId].Get();
+ }
- inputs.emplace_back(Build<TDqPhyPrecompute>(ctx, pos)
- .Connection<TDqCnValue>()
- .Output()
- .Stage(computeKeysStage)
- .Index().Build(IntToString<10>(stageOut))
- .Build()
- .Build()
- .Done()
- );
+ auto inputs = Build<TDqPhyPrecompute>(ctx, pos)
+ .Connection<TDqCnValue>()
+ .Output()
+ .Stage(computeKeysStage)
+ .Index().Build(IntToString<10>(stageOut))
+ .Build()
+ .Build()
+ .Done();
- args.emplace_back(
- Build<TCoArgument>(ctx, pos)
+ auto args = Build<TCoArgument>(ctx, pos)
.Name(TString("arg0"))
+ .Done();
+
+ auto lambda = Build<TCoLambda>(ctx, pos)
+ .Args({"row"})
+ .Body<TCoJust>()
+ .Input(False)
+ .Build()
.Done()
- );
+ .Ptr();
+
+ auto stage = Build<TDqStage>(ctx, pos)
+ .Inputs()
+ .Add(inputs)
+ .Build()
+ .Program()
+ .Args(args)
+ .Body<TCoFlatMap>()
+ .Input<TCoTake>()
+ .Input<TKqpLookupTable>()
+ .Table(BuildTableMeta(*meta, pos, ctx))
+ .LookupKeys<TCoIterator>()
+ .List(args)
+ .Build()
+ .Columns()
+ .Build()
+ .Build()
+ .Count<TCoUint64>()
+ .Literal().Build("1")
+ .Build()
+ .Build()
+ .Lambda(lambda)
+ .Build()
+ .Build()
+ .Settings().Build()
+ .Done();
+
+ return Build<TDqCnUnionAll>(ctx, pos)
+ .Output()
+ .Stage(stage)
+ .Index().Build("0")
+ .Build()
+ .Done();
+ }
+};
+
+class TUpsertUniqBuildHelper : public TUniqBuildHelper {
+public:
+ TUpsertUniqBuildHelper(const NYql::TKikimrTableDescription& table, NYql::TPositionHandle pos,
+ NYql::TExprContext& ctx)
+ : TUniqBuildHelper(table, pos, ctx, true)
+ , PkDict(MakeUniqCheckDict(MakeTableKeySelector(table.Metadata, pos, ctx), RowsListArg, pos, ctx))
+ {}
+
+private:
+ const NYql::TExprNode::TPtr GetPkDict() const override {
+ return PkDict;
+ }
+
+ TDqCnUnionAll CreateLookupStageWithConnection(const TDqStage& computeKeysStage, size_t stageOut,
+ const NYql::TKikimrTableMetadata& mainTableMeta, TUniqCheckNodes::TIndexId indexId,
+ TPositionHandle pos, TExprContext& ctx) const override
+ {
+ const NYql::TKikimrTableMetadata* meta;
+ if (indexId == -1) {
+ meta = &mainTableMeta;
+ } else {
+ YQL_ENSURE((size_t)indexId < mainTableMeta.SecondaryGlobalIndexMetadata.size());
+ meta = mainTableMeta.SecondaryGlobalIndexMetadata[indexId].Get();
+ }
+
+ TVector<TExprBase> inputs;
+ TVector<TCoArgument> args;
+
+ inputs.emplace_back(Build<TDqPhyPrecompute>(ctx, pos)
+ .Connection<TDqCnValue>()
+ .Output()
+ .Stage(computeKeysStage)
+ .Index().Build(IntToString<10>(stageOut))
+ .Build()
+ .Build()
+ .Done()
+ );
+
+ args.emplace_back(
+ Build<TCoArgument>(ctx, pos)
+ .Name(TString("arg0"))
+ .Done()
+ );
- NYql::TExprNode::TPtr lambda;
- TVector<TExprBase> columnsToSelect;
+ NYql::TExprNode::TPtr lambda;
+ TVector<TExprBase> columnsToSelect;
- if (pkChecks.first) {
columnsToSelect.reserve(mainTableMeta.KeyColumnNames.size());
inputs.emplace_back(Build<TDqPhyPrecompute>(ctx, pos)
.Connection<TDqCnValue>()
.Output()
.Stage(computeKeysStage)
- .Index().Build(IntToString<10>(pkChecks.second))
+ .Index().Build(IntToString<10>(CalcComputeKeysStageOutputNum()))
.Build()
.Build()
.Done()
@@ -87,88 +197,59 @@ TDqCnUnionAll CreateLookupStageWithConnection(const TDqStage& computeKeysStage,
.Lookup("row_from_index")
.Build()
.Build()
- .Value(_false)
+ .Value(False)
.Build()
.Done()
.Ptr();
- } else {
- lambda = Build<TCoLambda>(ctx, pos)
- .Args({"row"})
- .Body<TCoJust>()
- .Input(_false)
- .Build()
- .Done()
- .Ptr();
- }
- auto stage = Build<TDqStage>(ctx, pos)
- .Inputs()
- .Add(inputs)
- .Build()
- .Program()
- .Args(args)
- .Body<TCoFlatMap>()
- .Input<TCoTake>()
- .Input<TKqpLookupTable>()
- .Table(BuildTableMeta(*meta, pos, ctx))
- .LookupKeys<TCoIterator>()
- .List(args[0])
+ auto stage = Build<TDqStage>(ctx, pos)
+ .Inputs()
+ .Add(inputs)
+ .Build()
+ .Program()
+ .Args(args)
+ .Body<TCoFlatMap>()
+ .Input<TCoTake>()
+ .Input<TKqpLookupTable>()
+ .Table(BuildTableMeta(*meta, pos, ctx))
+ .LookupKeys<TCoIterator>()
+ .List(args[0])
+ .Build()
+ .Columns<TCoAtomList>()
+ .Add(columnsToSelect)
+ .Build()
.Build()
- .Columns<TCoAtomList>()
- .Add(columnsToSelect)
+ .Count<TCoUint64>()
+ .Literal().Build("1")
.Build()
.Build()
- .Count<TCoUint64>()
- .Literal().Build("1")
- .Build()
+ .Lambda(lambda)
.Build()
- .Lambda(lambda)
.Build()
- .Build()
- .Settings().Build()
- .Done();
-
- return Build<TDqCnUnionAll>(ctx, pos)
- .Output()
- .Stage(stage)
- .Index().Build("0")
- .Build()
- .Done();
-}
+ .Settings().Build()
+ .Done();
-NYql::TExprNode::TPtr MakeUniqCheckDict(const TCoLambda& selector,
- const TExprBase& rowsListArg, TPositionHandle pos, TExprContext& ctx)
-{
- return Build<TCoToDict>(ctx, pos)
- .List(rowsListArg)
- .KeySelector(selector)
- .PayloadSelector()
- .Args({"stub"})
- .Body<TCoVoid>()
+ return Build<TDqCnUnionAll>(ctx, pos)
+ .Output()
+ .Stage(stage)
+ .Index().Build("0")
.Build()
- .Build()
- .Settings()
- .Add().Build("One")
- .Add().Build("Hashed")
- .Build()
- .Done().Ptr();
-}
+ .Done();
+ }
+private:
+ const NYql::TExprNode::TPtr PkDict;
+};
}
-std::pair<TVector<TUniqBuildHelper::TUniqCheckNodes>, NYql::TExprNode::TPtr> TUniqBuildHelper::Prepare(const TCoArgument& rowsListArg,
+TVector<TUniqBuildHelper::TUniqCheckNodes> TUniqBuildHelper::Prepare(const TCoArgument& rowsListArg,
const TKikimrTableDescription& table,
TPositionHandle pos, TExprContext& ctx, bool skipPkCheck)
{
TVector<TUniqCheckNodes> checks;
- NYql::TExprNode::TPtr pkDict;
if (!skipPkCheck) {
checks.emplace_back(MakeUniqCheckNodes(MakeTableKeySelector(table.Metadata, pos, ctx), rowsListArg, pos, ctx));
- } else {
- // In case of update we must make additional filtering to exclude duplicate checks
- // non pk constraint for rows where pk was given
- pkDict = MakeUniqCheckDict(MakeTableKeySelector(table.Metadata, pos, ctx), rowsListArg, pos, ctx);
}
// make uniq check for each uniq constraint
@@ -199,12 +280,13 @@ std::pair<TVector<TUniqBuildHelper::TUniqCheckNodes>, NYql::TExprNode::TPtr> TUn
checks.back().IndexId = i;
}
- return {checks, pkDict};
+ return checks;
}
TUniqBuildHelper::TUniqBuildHelper(const TKikimrTableDescription& table,
TPositionHandle pos, TExprContext& ctx, bool skipPkCheck)
: RowsListArg(ctx.NewArgument(pos, "rows_list"))
+ , False(MakeBool(pos, false, ctx))
, Checks(Prepare(RowsListArg, table, pos, ctx, skipPkCheck))
{}
@@ -266,7 +348,7 @@ TDqStage TUniqBuildHelper::CreateComputeKeysStage(const TCondenseInputResult& co
);
}
- if (auto dict = Checks.GetPkDict()) {
+ if (auto dict = GetPkDict()) {
types.emplace_back(
Build<TCoTypeOf>(ctx, pos)
.Value(dict)
@@ -309,7 +391,7 @@ TDqStage TUniqBuildHelper::CreateComputeKeysStage(const TCondenseInputResult& co
);
}
- if (auto dict = Checks.GetPkDict()) {
+ if (auto dict = GetPkDict()) {
variants.emplace_back(
Build<TCoVariant>(ctx, pos)
.Item(dict)
@@ -376,18 +458,13 @@ TDqStage TUniqBuildHelper::CreateLookupExistStage(const TDqStage& computeKeysSta
{
TLookupNodes lookupNodes(Checks.Size());
- auto _false = MakeBool(pos, false, ctx);
-
- // last stage output is pk dict for update mode
- int pkDictOutputId = Checks.GetPkDict() ? CalcComputeKeysStageOutputNum() : -1;
-
// 0 output - input stream itself so start with output 1
// Each check produces 2 outputs
for (size_t i = 0, stage_out = 1; i < Checks.Size(); i++, stage_out += 2) {
const auto indexId = Checks[i].IndexId;
lookupNodes.Stages.emplace_back(
CreateLookupStageWithConnection(computeKeysStage, stage_out, *table.Metadata, indexId,
- _false, {Checks.GetPkDict(), pkDictOutputId}, pos, ctx)
+ pos, ctx)
);
lookupNodes.Args.emplace_back(
@@ -412,14 +489,31 @@ TDqStage TUniqBuildHelper::CreateLookupExistStage(const TDqStage& computeKeysSta
.State(_true)
.SwitchHandler()
.Args({"item", "state"})
- .Body(_false)
+ .Body(False)
.Build()
.UpdateHandler()
.Args({"item", "state"})
- .Body(_false)
+ .Body(False)
.Build()
.Build()
.Build()
.Settings().Build()
.Done();
}
+
+namespace NKikimr::NKqp::NOpt {
+
+
+TUniqBuildHelper::TPtr CreateInsertUniqBuildHelper(const NYql::TKikimrTableDescription& table, NYql::TPositionHandle pos,
+ NYql::TExprContext& ctx)
+{
+ return std::make_unique<TInsertUniqBuildHelper>(table, pos, ctx);
+}
+
+TUniqBuildHelper::TPtr CreateUpsertUniqBuildHelper(const NYql::TKikimrTableDescription& table, NYql::TPositionHandle pos,
+ NYql::TExprContext& ctx)
+{
+ return std::make_unique<TUpsertUniqBuildHelper>(table, pos, ctx);
+}
+
+}
diff --git a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_uniq_helper.h b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_uniq_helper.h
index a188b4afe53..a22f4f9edf3 100644
--- a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_uniq_helper.h
+++ b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_uniq_helper.h
@@ -11,10 +11,7 @@ namespace NKikimr::NKqp::NOpt {
class TUniqBuildHelper {
public:
- // table - metadata of table
- // skipPkCheck - false for insert mode, generate check on PK to issue an arror on PK conflict
- TUniqBuildHelper(const NYql::TKikimrTableDescription& table, NYql::TPositionHandle pos,
- NYql::TExprContext& ctx, bool skipPkCheck);
+ using TPtr = std::unique_ptr<TUniqBuildHelper>;
size_t GetChecksNum() const;
NYql::NNodes::TDqStage CreateComputeKeysStage(const TCondenseInputResult& condenseResult,
@@ -27,7 +24,14 @@ public:
const NYql::TKikimrTableDescription& table, NYql::TExprNode::TPtr _true,
NYql::TPositionHandle pos, NYql::TExprContext& ctx) const;
-private:
+ virtual ~TUniqBuildHelper() = default;
+protected:
+ // table - metadata of table
+ // skipPkCheck - false for insert mode, generate check on PK to issue an arror on PK conflict
+ TUniqBuildHelper(const NYql::TKikimrTableDescription& table, NYql::TPositionHandle pos,
+ NYql::TExprContext& ctx, bool skipPkCheck);
+ size_t CalcComputeKeysStageOutputNum() const;
+
struct TUniqCheckNodes {
using TIndexId = int;
static constexpr TIndexId NOT_INDEX_ID = -1;
@@ -36,11 +40,11 @@ private:
TIndexId IndexId = NOT_INDEX_ID;
};
+private:
class TChecks {
public:
- TChecks(std::pair<TVector<TUniqCheckNodes>, NYql::TExprNode::TPtr>&& pair)
- : Checks(std::move(pair.first))
- , PkDict(pair.second)
+ TChecks(TVector<TUniqCheckNodes> nodes)
+ : Checks(std::move(nodes))
{}
size_t Size() const {
@@ -51,24 +55,32 @@ private:
return Checks[i];
}
- const NYql::TExprNode::TPtr GetPkDict() const {
- return PkDict;
- }
private:
const TVector<TUniqCheckNodes> Checks;
- const NYql::TExprNode::TPtr PkDict;
};
- size_t CalcComputeKeysStageOutputNum() const;
static TUniqCheckNodes MakeUniqCheckNodes(const NYql::NNodes::TCoLambda& selector,
const NYql::NNodes::TExprBase& rowsListArg, NYql::TPositionHandle pos, NYql::TExprContext& ctx);
- static std::pair<TVector<TUniqCheckNodes>, NYql::TExprNode::TPtr> Prepare(const NYql::NNodes::TCoArgument& rowsListArg,
+ static TVector<TUniqCheckNodes> Prepare(const NYql::NNodes::TCoArgument& rowsListArg,
const NYql::TKikimrTableDescription& table, NYql::TPositionHandle pos, NYql::TExprContext& ctx,
bool skipPkCheck);
- NYql::NNodes::TCoArgument RowsListArg;
+ virtual NYql::NNodes::TDqCnUnionAll CreateLookupStageWithConnection(const NYql::NNodes::TDqStage& computeKeysStage,
+ size_t stageOut, const NYql::TKikimrTableMetadata& mainTableMeta, TUniqCheckNodes::TIndexId indexId,
+ NYql::TPositionHandle pos, NYql::TExprContext& ctx) const = 0;
+ virtual const NYql::TExprNode::TPtr GetPkDict() const = 0;
+
+protected:
+ const NYql::NNodes::TCoArgument RowsListArg;
+ const NYql::TExprNode::TPtr False;
+private:
const TChecks Checks;
};
+TUniqBuildHelper::TPtr CreateInsertUniqBuildHelper(const NYql::TKikimrTableDescription& table, NYql::TPositionHandle pos,
+ NYql::TExprContext& ctx);
+
+TUniqBuildHelper::TPtr CreateUpsertUniqBuildHelper(const NYql::TKikimrTableDescription& table, NYql::TPositionHandle pos,
+ NYql::TExprContext& ctx);
}
diff --git a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_index.cpp b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_index.cpp
index e89cb670444..8aeaab248d0 100644
--- a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_index.cpp
+++ b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_index.cpp
@@ -308,18 +308,18 @@ TMaybe<TCondenseInputResult> CheckUniqueConstraint(const TExprBase& inputRows, c
return {};
}
- TUniqBuildHelper helper(table, pos, ctx, true);
- if (helper.GetChecksNum() == 0) {
+ auto helper = CreateUpsertUniqBuildHelper(table, pos, ctx);
+ if (helper->GetChecksNum() == 0) {
return condenseResult;
}
- auto computeKeysStage = helper.CreateComputeKeysStage(condenseResult.GetRef(), pos, ctx);
- auto inputPrecompute = helper.CreateInputPrecompute(computeKeysStage, pos, ctx);
- auto uniquePrecomputes = helper.CreateUniquePrecompute(computeKeysStage, pos, ctx);
+ auto computeKeysStage = helper->CreateComputeKeysStage(condenseResult.GetRef(), pos, ctx);
+ auto inputPrecompute = helper->CreateInputPrecompute(computeKeysStage, pos, ctx);
+ auto uniquePrecomputes = helper->CreateUniquePrecompute(computeKeysStage, pos, ctx);
auto _true = MakeBool(pos, true, ctx);
- auto aggrStage = helper.CreateLookupExistStage(computeKeysStage, table, _true, pos, ctx);
+ auto aggrStage = helper->CreateLookupExistStage(computeKeysStage, table, _true, pos, ctx);
// Returns <bool>: <true> - no existing keys, <false> - at least one key exists
auto noExistingKeysPrecompute = Build<TDqPhyPrecompute>(ctx, pos)
@@ -341,9 +341,9 @@ TMaybe<TCondenseInputResult> CheckUniqueConstraint(const TExprBase& inputRows, c
}
TVector<TExprNode::TPtr> Bodies;
TVector<TCoArgument> Args;
- } uniqueCheckNodes(helper.GetChecksNum());
+ } uniqueCheckNodes(helper->GetChecksNum());
- for (size_t i = 0; i < helper.GetChecksNum(); i++) {
+ for (size_t i = 0; i < helper->GetChecksNum(); i++) {
uniqueCheckNodes.Args.emplace_back(ctx.NewArgument(pos, "are_keys_unique"));
uniqueCheckNodes.Bodies.emplace_back(Build<TKqpEnsure>(ctx, pos)
.Value(_true)