diff options
author | Daniil Cherednik <dcherednik@ydb.tech> | 2023-10-24 14:02:02 +0300 |
---|---|---|
committer | dcherednik <dcherednik@ydb.tech> | 2023-10-24 15:02:39 +0300 |
commit | d7bdf55cbf2375fa4a0d75d8b0d42cbaa39832fc (patch) | |
tree | 88acb4ae93d9565d977d69f4b7f96a1d1ad0551c | |
parent | 192ac5495fea615e8b7edd968d3f98fe7696c8b0 (diff) | |
download | ydb-d7bdf55cbf2375fa4a0d75d8b0d42cbaa39832fc.tar.gz |
Make uniq helper a bit more clear
Make uniq helper a bit more clear
Pull Request resolved: https://github.com/ydb-platform/ydb/pull/404
4 files changed, 237 insertions, 131 deletions
diff --git a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_insert.cpp b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_insert.cpp index db291902bbc..e81a91e87ce 100644 --- a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_insert.cpp +++ b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_insert.cpp @@ -16,15 +16,15 @@ TMaybeNode<TDqCnUnionAll> MakeConditionalInsertRows(const TExprBase& input, cons return {}; } - TUniqBuildHelper helper(table, pos, ctx, false); - auto computeKeysStage = helper.CreateComputeKeysStage(condenseResult.GetRef(), pos, ctx); + auto helper = CreateInsertUniqBuildHelper(table, pos, ctx); + auto computeKeysStage = helper->CreateComputeKeysStage(condenseResult.GetRef(), pos, ctx); - auto inputPrecompute = helper.CreateInputPrecompute(computeKeysStage, pos, ctx); - auto uniquePrecomputes = helper.CreateUniquePrecompute(computeKeysStage, pos, ctx); + auto inputPrecompute = helper->CreateInputPrecompute(computeKeysStage, pos, ctx); + auto uniquePrecomputes = helper->CreateUniquePrecompute(computeKeysStage, pos, ctx); auto _true = MakeBool(pos, true, ctx); - auto aggrStage = helper.CreateLookupExistStage(computeKeysStage, table, _true, pos, ctx); + auto aggrStage = helper->CreateLookupExistStage(computeKeysStage, table, _true, pos, ctx); // Returns <bool>: <true> - no existing keys, <false> - at least one key exists auto noExistingKeysPrecompute = Build<TDqPhyPrecompute>(ctx, pos) @@ -43,14 +43,14 @@ TMaybeNode<TDqCnUnionAll> MakeConditionalInsertRows(const TExprBase& input, cons } TVector<TExprNode::TPtr> Bodies; TVector<TCoArgument> Args; - } uniqueCheckNodes(helper.GetChecksNum()); + } uniqueCheckNodes(helper->GetChecksNum()); TCoArgument noExistingKeysArg(ctx.NewArgument(pos, "no_existing_keys")); TExprNode::TPtr noExistingKeysCheck; // Build condition checks depending on INSERT kind if (abortOnError) { - for (size_t i = 0; i < helper.GetChecksNum(); i++) { + for (size_t i = 0; i < helper->GetChecksNum(); i++) { uniqueCheckNodes.Args.emplace_back(ctx.NewArgument(pos, "are_keys_unique")); uniqueCheckNodes.Bodies.emplace_back(Build<TKqpEnsure>(ctx, pos) .Value(_true) @@ -68,7 +68,7 @@ TMaybeNode<TDqCnUnionAll> MakeConditionalInsertRows(const TExprBase& input, cons .Message(MakeMessage("Conflict with existing key.", pos, ctx)) .Done().Ptr(); } else { - for (size_t i = 0; i < helper.GetChecksNum(); i++) { + for (size_t i = 0; i < helper->GetChecksNum(); i++) { uniqueCheckNodes.Args.emplace_back(ctx.NewArgument(pos, "are_keys_unique")); uniqueCheckNodes.Bodies.emplace_back(uniqueCheckNodes.Args.back().Ptr()); } diff --git a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_uniq_helper.cpp b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_uniq_helper.cpp index dc17cc56519..ea2b23015ab 100644 --- a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_uniq_helper.cpp +++ b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_uniq_helper.cpp @@ -18,49 +18,159 @@ struct TLookupNodes { TVector<TCoArgument> Args; }; -TDqCnUnionAll CreateLookupStageWithConnection(const TDqStage& computeKeysStage, size_t stageOut, - const NYql::TKikimrTableMetadata& mainTableMeta, int indexId, TExprNode::TPtr _false, - std::pair<TExprNode::TPtr, size_t> pkChecks, TPositionHandle pos, TExprContext& ctx) + +NYql::TExprNode::TPtr MakeUniqCheckDict(const TCoLambda& selector, + const TExprBase& rowsListArg, TPositionHandle pos, TExprContext& ctx) { - const NYql::TKikimrTableMetadata* meta; - if (indexId == -1) { - pkChecks.first.Reset(); - meta = &mainTableMeta; - } else { - YQL_ENSURE((size_t)indexId < mainTableMeta.SecondaryGlobalIndexMetadata.size()); - meta = mainTableMeta.SecondaryGlobalIndexMetadata[indexId].Get(); + return Build<TCoToDict>(ctx, pos) + .List(rowsListArg) + .KeySelector(selector) + .PayloadSelector() + .Args({"stub"}) + .Body<TCoVoid>() + .Build() + .Build() + .Settings() + .Add().Build("One") + .Add().Build("Hashed") + .Build() + .Done().Ptr(); +} + +class TInsertUniqBuildHelper : public TUniqBuildHelper { +public: + TInsertUniqBuildHelper(const NYql::TKikimrTableDescription& table, NYql::TPositionHandle pos, + NYql::TExprContext& ctx) + : TUniqBuildHelper(table, pos, ctx, false) + {} + +private: + const NYql::TExprNode::TPtr GetPkDict() const override { + return {}; } - TVector<TExprBase> inputs; - TVector<TCoArgument> args; + TDqCnUnionAll CreateLookupStageWithConnection(const TDqStage& computeKeysStage, size_t stageOut, + const NYql::TKikimrTableMetadata& mainTableMeta, TUniqCheckNodes::TIndexId indexId, + TPositionHandle pos, TExprContext& ctx) const override + { + const NYql::TKikimrTableMetadata* meta; + if (indexId == -1) { + meta = &mainTableMeta; + } else { + YQL_ENSURE((size_t)indexId < mainTableMeta.SecondaryGlobalIndexMetadata.size()); + meta = mainTableMeta.SecondaryGlobalIndexMetadata[indexId].Get(); + } - inputs.emplace_back(Build<TDqPhyPrecompute>(ctx, pos) - .Connection<TDqCnValue>() - .Output() - .Stage(computeKeysStage) - .Index().Build(IntToString<10>(stageOut)) - .Build() - .Build() - .Done() - ); + auto inputs = Build<TDqPhyPrecompute>(ctx, pos) + .Connection<TDqCnValue>() + .Output() + .Stage(computeKeysStage) + .Index().Build(IntToString<10>(stageOut)) + .Build() + .Build() + .Done(); - args.emplace_back( - Build<TCoArgument>(ctx, pos) + auto args = Build<TCoArgument>(ctx, pos) .Name(TString("arg0")) + .Done(); + + auto lambda = Build<TCoLambda>(ctx, pos) + .Args({"row"}) + .Body<TCoJust>() + .Input(False) + .Build() .Done() - ); + .Ptr(); + + auto stage = Build<TDqStage>(ctx, pos) + .Inputs() + .Add(inputs) + .Build() + .Program() + .Args(args) + .Body<TCoFlatMap>() + .Input<TCoTake>() + .Input<TKqpLookupTable>() + .Table(BuildTableMeta(*meta, pos, ctx)) + .LookupKeys<TCoIterator>() + .List(args) + .Build() + .Columns() + .Build() + .Build() + .Count<TCoUint64>() + .Literal().Build("1") + .Build() + .Build() + .Lambda(lambda) + .Build() + .Build() + .Settings().Build() + .Done(); + + return Build<TDqCnUnionAll>(ctx, pos) + .Output() + .Stage(stage) + .Index().Build("0") + .Build() + .Done(); + } +}; + +class TUpsertUniqBuildHelper : public TUniqBuildHelper { +public: + TUpsertUniqBuildHelper(const NYql::TKikimrTableDescription& table, NYql::TPositionHandle pos, + NYql::TExprContext& ctx) + : TUniqBuildHelper(table, pos, ctx, true) + , PkDict(MakeUniqCheckDict(MakeTableKeySelector(table.Metadata, pos, ctx), RowsListArg, pos, ctx)) + {} + +private: + const NYql::TExprNode::TPtr GetPkDict() const override { + return PkDict; + } + + TDqCnUnionAll CreateLookupStageWithConnection(const TDqStage& computeKeysStage, size_t stageOut, + const NYql::TKikimrTableMetadata& mainTableMeta, TUniqCheckNodes::TIndexId indexId, + TPositionHandle pos, TExprContext& ctx) const override + { + const NYql::TKikimrTableMetadata* meta; + if (indexId == -1) { + meta = &mainTableMeta; + } else { + YQL_ENSURE((size_t)indexId < mainTableMeta.SecondaryGlobalIndexMetadata.size()); + meta = mainTableMeta.SecondaryGlobalIndexMetadata[indexId].Get(); + } + + TVector<TExprBase> inputs; + TVector<TCoArgument> args; + + inputs.emplace_back(Build<TDqPhyPrecompute>(ctx, pos) + .Connection<TDqCnValue>() + .Output() + .Stage(computeKeysStage) + .Index().Build(IntToString<10>(stageOut)) + .Build() + .Build() + .Done() + ); + + args.emplace_back( + Build<TCoArgument>(ctx, pos) + .Name(TString("arg0")) + .Done() + ); - NYql::TExprNode::TPtr lambda; - TVector<TExprBase> columnsToSelect; + NYql::TExprNode::TPtr lambda; + TVector<TExprBase> columnsToSelect; - if (pkChecks.first) { columnsToSelect.reserve(mainTableMeta.KeyColumnNames.size()); inputs.emplace_back(Build<TDqPhyPrecompute>(ctx, pos) .Connection<TDqCnValue>() .Output() .Stage(computeKeysStage) - .Index().Build(IntToString<10>(pkChecks.second)) + .Index().Build(IntToString<10>(CalcComputeKeysStageOutputNum())) .Build() .Build() .Done() @@ -87,88 +197,59 @@ TDqCnUnionAll CreateLookupStageWithConnection(const TDqStage& computeKeysStage, .Lookup("row_from_index") .Build() .Build() - .Value(_false) + .Value(False) .Build() .Done() .Ptr(); - } else { - lambda = Build<TCoLambda>(ctx, pos) - .Args({"row"}) - .Body<TCoJust>() - .Input(_false) - .Build() - .Done() - .Ptr(); - } - auto stage = Build<TDqStage>(ctx, pos) - .Inputs() - .Add(inputs) - .Build() - .Program() - .Args(args) - .Body<TCoFlatMap>() - .Input<TCoTake>() - .Input<TKqpLookupTable>() - .Table(BuildTableMeta(*meta, pos, ctx)) - .LookupKeys<TCoIterator>() - .List(args[0]) + auto stage = Build<TDqStage>(ctx, pos) + .Inputs() + .Add(inputs) + .Build() + .Program() + .Args(args) + .Body<TCoFlatMap>() + .Input<TCoTake>() + .Input<TKqpLookupTable>() + .Table(BuildTableMeta(*meta, pos, ctx)) + .LookupKeys<TCoIterator>() + .List(args[0]) + .Build() + .Columns<TCoAtomList>() + .Add(columnsToSelect) + .Build() .Build() - .Columns<TCoAtomList>() - .Add(columnsToSelect) + .Count<TCoUint64>() + .Literal().Build("1") .Build() .Build() - .Count<TCoUint64>() - .Literal().Build("1") - .Build() + .Lambda(lambda) .Build() - .Lambda(lambda) .Build() - .Build() - .Settings().Build() - .Done(); - - return Build<TDqCnUnionAll>(ctx, pos) - .Output() - .Stage(stage) - .Index().Build("0") - .Build() - .Done(); -} + .Settings().Build() + .Done(); -NYql::TExprNode::TPtr MakeUniqCheckDict(const TCoLambda& selector, - const TExprBase& rowsListArg, TPositionHandle pos, TExprContext& ctx) -{ - return Build<TCoToDict>(ctx, pos) - .List(rowsListArg) - .KeySelector(selector) - .PayloadSelector() - .Args({"stub"}) - .Body<TCoVoid>() + return Build<TDqCnUnionAll>(ctx, pos) + .Output() + .Stage(stage) + .Index().Build("0") .Build() - .Build() - .Settings() - .Add().Build("One") - .Add().Build("Hashed") - .Build() - .Done().Ptr(); -} + .Done(); + } +private: + const NYql::TExprNode::TPtr PkDict; +}; } -std::pair<TVector<TUniqBuildHelper::TUniqCheckNodes>, NYql::TExprNode::TPtr> TUniqBuildHelper::Prepare(const TCoArgument& rowsListArg, +TVector<TUniqBuildHelper::TUniqCheckNodes> TUniqBuildHelper::Prepare(const TCoArgument& rowsListArg, const TKikimrTableDescription& table, TPositionHandle pos, TExprContext& ctx, bool skipPkCheck) { TVector<TUniqCheckNodes> checks; - NYql::TExprNode::TPtr pkDict; if (!skipPkCheck) { checks.emplace_back(MakeUniqCheckNodes(MakeTableKeySelector(table.Metadata, pos, ctx), rowsListArg, pos, ctx)); - } else { - // In case of update we must make additional filtering to exclude duplicate checks - // non pk constraint for rows where pk was given - pkDict = MakeUniqCheckDict(MakeTableKeySelector(table.Metadata, pos, ctx), rowsListArg, pos, ctx); } // make uniq check for each uniq constraint @@ -199,12 +280,13 @@ std::pair<TVector<TUniqBuildHelper::TUniqCheckNodes>, NYql::TExprNode::TPtr> TUn checks.back().IndexId = i; } - return {checks, pkDict}; + return checks; } TUniqBuildHelper::TUniqBuildHelper(const TKikimrTableDescription& table, TPositionHandle pos, TExprContext& ctx, bool skipPkCheck) : RowsListArg(ctx.NewArgument(pos, "rows_list")) + , False(MakeBool(pos, false, ctx)) , Checks(Prepare(RowsListArg, table, pos, ctx, skipPkCheck)) {} @@ -266,7 +348,7 @@ TDqStage TUniqBuildHelper::CreateComputeKeysStage(const TCondenseInputResult& co ); } - if (auto dict = Checks.GetPkDict()) { + if (auto dict = GetPkDict()) { types.emplace_back( Build<TCoTypeOf>(ctx, pos) .Value(dict) @@ -309,7 +391,7 @@ TDqStage TUniqBuildHelper::CreateComputeKeysStage(const TCondenseInputResult& co ); } - if (auto dict = Checks.GetPkDict()) { + if (auto dict = GetPkDict()) { variants.emplace_back( Build<TCoVariant>(ctx, pos) .Item(dict) @@ -376,18 +458,13 @@ TDqStage TUniqBuildHelper::CreateLookupExistStage(const TDqStage& computeKeysSta { TLookupNodes lookupNodes(Checks.Size()); - auto _false = MakeBool(pos, false, ctx); - - // last stage output is pk dict for update mode - int pkDictOutputId = Checks.GetPkDict() ? CalcComputeKeysStageOutputNum() : -1; - // 0 output - input stream itself so start with output 1 // Each check produces 2 outputs for (size_t i = 0, stage_out = 1; i < Checks.Size(); i++, stage_out += 2) { const auto indexId = Checks[i].IndexId; lookupNodes.Stages.emplace_back( CreateLookupStageWithConnection(computeKeysStage, stage_out, *table.Metadata, indexId, - _false, {Checks.GetPkDict(), pkDictOutputId}, pos, ctx) + pos, ctx) ); lookupNodes.Args.emplace_back( @@ -412,14 +489,31 @@ TDqStage TUniqBuildHelper::CreateLookupExistStage(const TDqStage& computeKeysSta .State(_true) .SwitchHandler() .Args({"item", "state"}) - .Body(_false) + .Body(False) .Build() .UpdateHandler() .Args({"item", "state"}) - .Body(_false) + .Body(False) .Build() .Build() .Build() .Settings().Build() .Done(); } + +namespace NKikimr::NKqp::NOpt { + + +TUniqBuildHelper::TPtr CreateInsertUniqBuildHelper(const NYql::TKikimrTableDescription& table, NYql::TPositionHandle pos, + NYql::TExprContext& ctx) +{ + return std::make_unique<TInsertUniqBuildHelper>(table, pos, ctx); +} + +TUniqBuildHelper::TPtr CreateUpsertUniqBuildHelper(const NYql::TKikimrTableDescription& table, NYql::TPositionHandle pos, + NYql::TExprContext& ctx) +{ + return std::make_unique<TUpsertUniqBuildHelper>(table, pos, ctx); +} + +} diff --git a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_uniq_helper.h b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_uniq_helper.h index a188b4afe53..a22f4f9edf3 100644 --- a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_uniq_helper.h +++ b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_uniq_helper.h @@ -11,10 +11,7 @@ namespace NKikimr::NKqp::NOpt { class TUniqBuildHelper { public: - // table - metadata of table - // skipPkCheck - false for insert mode, generate check on PK to issue an arror on PK conflict - TUniqBuildHelper(const NYql::TKikimrTableDescription& table, NYql::TPositionHandle pos, - NYql::TExprContext& ctx, bool skipPkCheck); + using TPtr = std::unique_ptr<TUniqBuildHelper>; size_t GetChecksNum() const; NYql::NNodes::TDqStage CreateComputeKeysStage(const TCondenseInputResult& condenseResult, @@ -27,7 +24,14 @@ public: const NYql::TKikimrTableDescription& table, NYql::TExprNode::TPtr _true, NYql::TPositionHandle pos, NYql::TExprContext& ctx) const; -private: + virtual ~TUniqBuildHelper() = default; +protected: + // table - metadata of table + // skipPkCheck - false for insert mode, generate check on PK to issue an arror on PK conflict + TUniqBuildHelper(const NYql::TKikimrTableDescription& table, NYql::TPositionHandle pos, + NYql::TExprContext& ctx, bool skipPkCheck); + size_t CalcComputeKeysStageOutputNum() const; + struct TUniqCheckNodes { using TIndexId = int; static constexpr TIndexId NOT_INDEX_ID = -1; @@ -36,11 +40,11 @@ private: TIndexId IndexId = NOT_INDEX_ID; }; +private: class TChecks { public: - TChecks(std::pair<TVector<TUniqCheckNodes>, NYql::TExprNode::TPtr>&& pair) - : Checks(std::move(pair.first)) - , PkDict(pair.second) + TChecks(TVector<TUniqCheckNodes> nodes) + : Checks(std::move(nodes)) {} size_t Size() const { @@ -51,24 +55,32 @@ private: return Checks[i]; } - const NYql::TExprNode::TPtr GetPkDict() const { - return PkDict; - } private: const TVector<TUniqCheckNodes> Checks; - const NYql::TExprNode::TPtr PkDict; }; - size_t CalcComputeKeysStageOutputNum() const; static TUniqCheckNodes MakeUniqCheckNodes(const NYql::NNodes::TCoLambda& selector, const NYql::NNodes::TExprBase& rowsListArg, NYql::TPositionHandle pos, NYql::TExprContext& ctx); - static std::pair<TVector<TUniqCheckNodes>, NYql::TExprNode::TPtr> Prepare(const NYql::NNodes::TCoArgument& rowsListArg, + static TVector<TUniqCheckNodes> Prepare(const NYql::NNodes::TCoArgument& rowsListArg, const NYql::TKikimrTableDescription& table, NYql::TPositionHandle pos, NYql::TExprContext& ctx, bool skipPkCheck); - NYql::NNodes::TCoArgument RowsListArg; + virtual NYql::NNodes::TDqCnUnionAll CreateLookupStageWithConnection(const NYql::NNodes::TDqStage& computeKeysStage, + size_t stageOut, const NYql::TKikimrTableMetadata& mainTableMeta, TUniqCheckNodes::TIndexId indexId, + NYql::TPositionHandle pos, NYql::TExprContext& ctx) const = 0; + virtual const NYql::TExprNode::TPtr GetPkDict() const = 0; + +protected: + const NYql::NNodes::TCoArgument RowsListArg; + const NYql::TExprNode::TPtr False; +private: const TChecks Checks; }; +TUniqBuildHelper::TPtr CreateInsertUniqBuildHelper(const NYql::TKikimrTableDescription& table, NYql::TPositionHandle pos, + NYql::TExprContext& ctx); + +TUniqBuildHelper::TPtr CreateUpsertUniqBuildHelper(const NYql::TKikimrTableDescription& table, NYql::TPositionHandle pos, + NYql::TExprContext& ctx); } diff --git a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_index.cpp b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_index.cpp index e89cb670444..8aeaab248d0 100644 --- a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_index.cpp +++ b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_index.cpp @@ -308,18 +308,18 @@ TMaybe<TCondenseInputResult> CheckUniqueConstraint(const TExprBase& inputRows, c return {}; } - TUniqBuildHelper helper(table, pos, ctx, true); - if (helper.GetChecksNum() == 0) { + auto helper = CreateUpsertUniqBuildHelper(table, pos, ctx); + if (helper->GetChecksNum() == 0) { return condenseResult; } - auto computeKeysStage = helper.CreateComputeKeysStage(condenseResult.GetRef(), pos, ctx); - auto inputPrecompute = helper.CreateInputPrecompute(computeKeysStage, pos, ctx); - auto uniquePrecomputes = helper.CreateUniquePrecompute(computeKeysStage, pos, ctx); + auto computeKeysStage = helper->CreateComputeKeysStage(condenseResult.GetRef(), pos, ctx); + auto inputPrecompute = helper->CreateInputPrecompute(computeKeysStage, pos, ctx); + auto uniquePrecomputes = helper->CreateUniquePrecompute(computeKeysStage, pos, ctx); auto _true = MakeBool(pos, true, ctx); - auto aggrStage = helper.CreateLookupExistStage(computeKeysStage, table, _true, pos, ctx); + auto aggrStage = helper->CreateLookupExistStage(computeKeysStage, table, _true, pos, ctx); // Returns <bool>: <true> - no existing keys, <false> - at least one key exists auto noExistingKeysPrecompute = Build<TDqPhyPrecompute>(ctx, pos) @@ -341,9 +341,9 @@ TMaybe<TCondenseInputResult> CheckUniqueConstraint(const TExprBase& inputRows, c } TVector<TExprNode::TPtr> Bodies; TVector<TCoArgument> Args; - } uniqueCheckNodes(helper.GetChecksNum()); + } uniqueCheckNodes(helper->GetChecksNum()); - for (size_t i = 0; i < helper.GetChecksNum(); i++) { + for (size_t i = 0; i < helper->GetChecksNum(); i++) { uniqueCheckNodes.Args.emplace_back(ctx.NewArgument(pos, "are_keys_unique")); uniqueCheckNodes.Bodies.emplace_back(Build<TKqpEnsure>(ctx, pos) .Value(_true) |