aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraakulaga <aakulaga@ydb.tech>2022-10-04 12:19:30 +0300
committeraakulaga <aakulaga@ydb.tech>2022-10-04 12:19:30 +0300
commit35bbcdd0f9d21e73beee28cb7fdd49d1109c2734 (patch)
treefcc0900a44de9b6b659efa54eba30172215dbbdc
parenta04bcbd6d1f7cef1565bd1a474f3860e8f2238a0 (diff)
downloadydb-35bbcdd0f9d21e73beee28cb7fdd49d1109c2734.tar.gz
Grace Join optimizer
Grace Join optimizer added
-rw-r--r--ydb/library/yql/core/expr_nodes/yql_expr_nodes.json15
-rw-r--r--ydb/library/yql/core/type_ann/type_ann_join.cpp2
-rw-r--r--ydb/library/yql/dq/expr_nodes/dq_expr_nodes.json5
-rw-r--r--ydb/library/yql/dq/opt/dq_opt_join.cpp249
-rw-r--r--ydb/library/yql/dq/opt/dq_opt_phy.cpp7
-rw-r--r--ydb/library/yql/dq/opt/dq_opt_phy.h6
-rw-r--r--ydb/library/yql/dq/type_ann/dq_type_ann.cpp11
-rw-r--r--ydb/library/yql/dq/type_ann/dq_type_ann.h1
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp22
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp4
-rw-r--r--ydb/library/yql/minikql/comp_nodes/ut/mkql_grace_join_ut.cpp132
-rw-r--r--ydb/library/yql/providers/dq/common/yql_dq_settings.cpp1
-rw-r--r--ydb/library/yql/providers/dq/common/yql_dq_settings.h3
-rw-r--r--ydb/library/yql/providers/dq/opt/physical_optimize.cpp3
-rw-r--r--ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.cpp1
15 files changed, 430 insertions, 32 deletions
diff --git a/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json b/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json
index dc77e6f1275..5c5625782d9 100644
--- a/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json
+++ b/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json
@@ -1439,6 +1439,21 @@
]
},
{
+ "Name": "TCoGraceJoinCore",
+ "Base": "TCallable",
+ "Match": {"Type": "Callable", "Name": "GraceJoinCore"},
+ "Children": [
+ {"Index": 0, "Name": "LeftInput", "Type": "TExprBase", "Optional": true},
+ {"Index": 1, "Name": "RightInput", "Type": "TExprBase" , "Optional": true},
+ {"Index": 2, "Name": "JoinKind", "Type": "TCoAtom" , "Optional": true},
+ {"Index": 3, "Name": "LeftKeysColumns", "Type": "TCoAtomList" , "Optional": true},
+ {"Index": 4, "Name": "RightKeysColumns", "Type": "TCoAtomList", "Optional": true},
+ {"Index": 5, "Name": "LeftRenames", "Type": "TCoAtomList", "Optional": true},
+ {"Index": 6, "Name": "RightRenames", "Type": "TCoAtomList", "Optional": true}
+ ]
+ },
+
+ {
"Name": "TCoJoinDict",
"Base": "TCallable",
"Match": {"Type": "Callable", "Name": "JoinDict"},
diff --git a/ydb/library/yql/core/type_ann/type_ann_join.cpp b/ydb/library/yql/core/type_ann/type_ann_join.cpp
index cd7c8f54ec1..4d71278884b 100644
--- a/ydb/library/yql/core/type_ann/type_ann_join.cpp
+++ b/ydb/library/yql/core/type_ann/type_ann_join.cpp
@@ -653,7 +653,7 @@ namespace NTypeAnnImpl {
}
const TTypeAnnotationNode* leftItemType = nullptr;
- if (!EnsureNewSeqType<true>(input->Head(), ctx.Expr, &leftItemType)) {
+ if (!EnsureNewSeqType<true>(*input->Child(0), ctx.Expr, &leftItemType)) {
return IGraphTransformer::TStatus::Error;
}
diff --git a/ydb/library/yql/dq/expr_nodes/dq_expr_nodes.json b/ydb/library/yql/dq/expr_nodes/dq_expr_nodes.json
index 11e2d47da40..6fd47f46b1b 100644
--- a/ydb/library/yql/dq/expr_nodes/dq_expr_nodes.json
+++ b/ydb/library/yql/dq/expr_nodes/dq_expr_nodes.json
@@ -47,6 +47,11 @@
"Match": {"Type": "Callable", "Name": "DqPhyMapJoin"}
},
{
+ "Name": "TDqPhyGraceJoin",
+ "Base": "TDqJoinBase",
+ "Match": {"Type": "Callable", "Name": "GraceJoinCore"}
+ },
+ {
"Name": "TDqPhyCrossJoin",
"Base": "TDqJoinBase",
"Match": {"Type": "Callable", "Name": "DqPhyCrossJoin"}
diff --git a/ydb/library/yql/dq/opt/dq_opt_join.cpp b/ydb/library/yql/dq/opt/dq_opt_join.cpp
index 7ce13c9b83d..5fa805102dc 100644
--- a/ydb/library/yql/dq/opt/dq_opt_join.cpp
+++ b/ydb/library/yql/dq/opt/dq_opt_join.cpp
@@ -5,6 +5,7 @@
#include <ydb/library/yql/dq/type_ann/dq_type_ann.h>
#include <ydb/library/yql/utils/log/log.h>
#include <ydb/library/yql/providers/common/provider/yql_provider.h>
+#include <ydb/library/yql/core/yql_type_helpers.h>
namespace NYql::NDq {
@@ -271,6 +272,7 @@ std::pair<TVector<TCoAtom>, TVector<TCoAtom>> GetJoinKeys(const TDqJoin& join, T
return std::make_pair(std::move(leftJoinKeys), std::move(rightJoinKeys));
}
+
TDqPhyMapJoin DqMakePhyMapJoin(const TDqJoin& join, const TExprBase& leftInput, const TExprBase& rightInput,
TExprContext& ctx)
{
@@ -690,6 +692,7 @@ TExprBase DqBuildPhyJoin(const TDqJoin& join, bool pushLeftStage, TExprContext&
return newConnection.Cast();
}
+
TExprBase DqBuildJoinDict(const TDqJoin& join, TExprContext& ctx) {
auto joinType = join.JoinType().Value();
@@ -840,4 +843,250 @@ TExprBase DqBuildJoinDict(const TDqJoin& join, TExprContext& ctx) {
return join;
}
+TExprBase DqBuildGraceJoin(const TDqJoin& join, TExprContext& ctx) {
+
+ static const std::set<std::string_view> supportedTypes = {
+ "Inner"sv,
+ "Left"sv,
+ "Cross"sv,
+ "LeftOnly"sv,
+ "LeftSemi"sv,
+ "Right"sv,
+ "RightOnly"sv,
+ "RightSemi"sv,
+ "Full"sv,
+ "Exclusion"sv
+ };
+
+ auto joinType = join.JoinType().Value();
+
+ if (!supportedTypes.contains(joinType)) {
+ return join;
+ }
+
+ auto buildShuffle = [&ctx, &join](const TExprBase& input, const TVector<TCoAtom>& keys) {
+ auto stage = Build<TDqStage>(ctx, join.Pos())
+ .Inputs()
+ .Add(input)
+ .Build()
+ .Program()
+ .Args({"stream"})
+ .Body("stream")
+ .Build()
+ .Settings(TDqStageSettings().BuildNode(ctx, join.Pos()))
+ .Done();
+
+ return Build<TDqCnHashShuffle>(ctx, join.Pos())
+ .Output()
+ .Stage(stage)
+ .Index().Build("0")
+ .Build()
+ .KeyColumns()
+ .Add(keys)
+ .Build()
+ .Done();
+ };
+
+ TMaybeNode<TDqStage> joinStage;
+
+ auto leftCn = join.LeftInput().Cast<TDqCnUnionAll>();
+ auto rightCn = join.RightInput().Cast<TDqCnUnionAll>();
+
+ const TStructExprType* leftStructType = nullptr;
+ auto leftSeqType = GetSequenceItemType(leftCn, false, ctx);
+ leftStructType = leftSeqType->Cast<TStructExprType>();
+
+ const TStructExprType* rightStructType = nullptr;
+ auto rightSeqType = GetSequenceItemType(rightCn, false, ctx);
+ rightStructType = rightSeqType->Cast<TStructExprType>();
+
+
+ const TVector<const TItemExprType*> & leftItems = leftStructType->GetItems();
+ const TVector<const TItemExprType*> & rightItems = rightStructType->GetItems();
+
+ std::map<TStringBuf, ui32> leftNames;
+ for (ui32 i = 0; i < leftItems.size(); i++) {
+ auto v = leftItems[i];
+ leftNames.emplace(v->GetName(), i);
+ }
+
+ std::map<TStringBuf, ui32> rightNames;
+ for (ui32 i = 0; i < rightItems.size(); i++) {
+ auto v = rightItems[i];
+ rightNames.emplace(v->GetName(), i);
+ }
+
+ auto [leftJoinKeys, rightJoinKeys] = GetJoinKeys(join, ctx);
+
+ auto leftJoinKeysVar = leftJoinKeys;
+ auto rightJoinKeysVar = rightJoinKeys;
+
+ auto rightShuffle = buildShuffle(rightCn, rightJoinKeys);
+ auto leftShuffle = buildShuffle(leftCn, leftJoinKeys);
+
+ TCoArgument leftInputArg{ctx.NewArgument(join.Pos(), "_dq_join_left")};
+ TCoArgument rightInputArg{ctx.NewArgument(join.Pos(), "_dq_join_right")};
+
+ auto leftWideFlow = ctx.Builder(join.Pos())
+ .Callable("ExpandMap")
+ .Add(0, leftInputArg.Ptr())
+ .Lambda(1)
+ .Param("item")
+ .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
+ for (ui32 i = 0U; i < leftItems.size(); ++i) {
+ parent.Callable(i, "Member")
+ .Arg(0, "item")
+ .Atom(1, leftItems[i]->GetName())
+ .Seal();
+ }
+ return parent;
+ })
+ .Seal()
+ .Seal()
+ .Build();
+
+ auto rightWideFlow = ctx.Builder(join.Pos())
+ .Callable("ExpandMap")
+ .Add(0, rightInputArg.Ptr())
+ .Lambda(1)
+ .Param("item")
+ .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
+ for (ui32 i = 0U; i < rightItems.size(); ++i) {
+ parent.Callable(i, "Member")
+ .Arg(0, "item")
+ .Atom(1, rightItems[i]->GetName())
+ .Seal();
+ }
+ return parent;
+ })
+ .Seal()
+ .Seal()
+ .Build();
+
+ auto graceJoin = ctx.Builder(join.Pos())
+ .Callable("GraceJoinCore")
+ .Add(0, leftWideFlow)
+ .Add(1, rightWideFlow)
+ .Atom(2, joinType)
+ .List(3)
+ .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
+ for (ui32 i = 0U; i < leftJoinKeysVar.size(); ++i) {
+ parent.Atom(i, std::to_string(leftNames[leftJoinKeysVar[i]]) );
+ }
+ return parent;
+ })
+ .Seal()
+ .List(4)
+ .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
+ for (ui32 i = 0U; i < rightJoinKeysVar.size(); ++i) {
+ parent.Atom(i, std::to_string(rightNames[rightJoinKeysVar[i]]) );
+ }
+ return parent;
+ })
+ .Seal()
+ .List(5)
+ .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
+ for (ui32 i = 0U; i < leftNames.size(); ++i) {
+ parent.Atom(2*i, std::to_string(i));
+ parent.Atom(2*i + 1, std::to_string(i));
+ }
+ return parent;
+ })
+ .Seal()
+ .List(6)
+ .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
+ ui32 colIndexStart = leftNames.size();
+ for (ui32 i = 0U; i < rightNames.size(); ++i) {
+ parent.Atom(2*i, std::to_string(i) );
+ parent.Atom(2*i + 1, std::to_string(colIndexStart + i) );
+ }
+ return parent;
+ })
+ .Seal()
+ .Seal()
+ .Build();
+
+ TStringBuf leftTableName, rightTableName;
+
+ if ( join.LeftLabel().Ref().IsAtom() ) {
+ leftTableName = join.LeftLabel().Cast<TCoAtom>().Value();
+ }
+
+ if ( join.RightLabel().Ref().IsAtom() ) {
+ rightTableName = join.RightLabel().Cast<TCoAtom>().Value();
+ }
+
+
+ TVector<TString> fullColNames;
+
+
+ for (const auto & v: leftNames ) {
+ auto name = FullColumnName(leftTableName, v.first);
+ if ( leftTableName.size() > 0) {
+ fullColNames.emplace_back(name);
+ } else {
+ fullColNames.emplace_back(v.first);
+ }
+ }
+
+ for (const auto & v: rightNames ) {
+ auto name = FullColumnName(rightTableName, v.first);
+ if ( rightTableName.size() > 0) {
+ fullColNames.emplace_back(name);
+ } else {
+ fullColNames.emplace_back(v.first);
+ }
+ }
+
+
+ auto narrowMapJoin = ctx.Builder(join.Pos())
+ .Callable("NarrowMap")
+ .Add(0, graceJoin)
+ .Lambda(1)
+ .Params("fields", fullColNames.size())
+ .Callable("AsStruct")
+ .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
+ ui32 i = 0U;
+ for (const auto& colName : fullColNames) {
+ parent.List(i)
+ .Atom(0, colName)
+ .Arg(1, "fields", i)
+ .Seal();
+ ++i;
+ }
+ return parent;
+ })
+ .Seal()
+ .Seal()
+ .Seal()
+ .Build();
+
+
+ joinStage = Build<TDqStage>(ctx, join.Pos())
+ .Inputs()
+ .Add(leftShuffle)
+ .Add(rightShuffle)
+ .Build()
+ .Program()
+ .Args({leftInputArg, rightInputArg})
+ .Body(narrowMapJoin)
+ .Build()
+ .Settings(TDqStageSettings().BuildNode(ctx, join.Pos()))
+ .Done();
+
+
+
+ if (joinStage) {
+ return Build<TDqCnUnionAll>(ctx, join.Pos())
+ .Output()
+ .Stage(joinStage.Cast())
+ .Index().Build("0")
+ .Build()
+ .Done();
+ }
+
+ return join;
+}
+
+
} // namespace NYql::NDq
diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.cpp b/ydb/library/yql/dq/opt/dq_opt_phy.cpp
index d457c0780fa..ee6a8108649 100644
--- a/ydb/library/yql/dq/opt/dq_opt_phy.cpp
+++ b/ydb/library/yql/dq/opt/dq_opt_phy.cpp
@@ -2055,8 +2055,9 @@ TMaybeNode<TDqJoin> DqFlipJoin(const TDqJoin& join, TExprContext& ctx) {
.Done();
}
+
TExprBase DqBuildJoin(const TExprBase& node, TExprContext& ctx, IOptimizationContext& optCtx,
- const TParentsMap& parentsMap, bool allowStageMultiUsage, bool pushLeftStage)
+ const TParentsMap& parentsMap, bool allowStageMultiUsage, bool pushLeftStage, bool useGraceJoin)
{
if (!node.Maybe<TDqJoin>()) {
return node;
@@ -2078,6 +2079,10 @@ TExprBase DqBuildJoin(const TExprBase& node, TExprContext& ctx, IOptimizationCon
auto joinType = join.JoinType().Value();
+ if (useGraceJoin) {
+ return DqBuildGraceJoin(join, ctx);
+ }
+
if (joinType == "Full"sv || joinType == "Exclusion"sv) {
return DqBuildJoinDict(join, ctx);
}
diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.h b/ydb/library/yql/dq/opt/dq_opt_phy.h
index 87b8f5a7041..d955441fa20 100644
--- a/ydb/library/yql/dq/opt/dq_opt_phy.h
+++ b/ydb/library/yql/dq/opt/dq_opt_phy.h
@@ -75,7 +75,11 @@ NNodes::TExprBase DqRewriteLeftPureJoin(const NNodes::TExprBase node, TExprConte
NNodes::TExprBase DqBuildPhyJoin(const NNodes::TDqJoin& join, bool pushLeftStage, TExprContext& ctx,
IOptimizationContext& optCtx);
-NNodes::TExprBase DqBuildJoin(const NNodes::TExprBase& node, TExprContext& ctx, IOptimizationContext& optCtx, const TParentsMap& parentsMap, bool allowStageMultiUsage, bool pushLeftStage);
+NNodes::TExprBase DqBuildJoin(const NNodes::TExprBase& node, TExprContext& ctx,
+ IOptimizationContext& optCtx, const TParentsMap& parentsMap, bool allowStageMultiUsage, bool pushLeftStage, bool useGraceJoin = false);
+
+NNodes::TExprBase DqBuildGraceJoin(const NNodes::TDqJoin& join, TExprContext& ctx);
+
bool DqValidateJoinInputs(
const NNodes::TExprBase& left, const NNodes::TExprBase& right, const TParentsMap& parentsMap,
diff --git a/ydb/library/yql/dq/type_ann/dq_type_ann.cpp b/ydb/library/yql/dq/type_ann/dq_type_ann.cpp
index 7d8428140cb..f533dbd948c 100644
--- a/ydb/library/yql/dq/type_ann/dq_type_ann.cpp
+++ b/ydb/library/yql/dq/type_ann/dq_type_ann.cpp
@@ -722,6 +722,17 @@ TStatus AnnotateDqMapOrDictJoin(const TExprNode::TPtr& input, TExprContext& ctx)
return TStatus::Ok;
}
+TStatus AnnotateDqGraceJoin(const TExprNode::TPtr& input, TExprContext& ctx) {
+ auto resultRowType = GetDqJoinResultType<true>(input, true, ctx);
+ if (!resultRowType) {
+ return TStatus::Error;
+ }
+
+ input->SetTypeAnn(ctx.MakeType<TFlowExprType>(resultRowType));
+ return TStatus::Ok;
+}
+
+
TStatus AnnotateDqCrossJoin(const TExprNode::TPtr& input, TExprContext& ctx) {
auto resultRowType = GetDqJoinResultType<false>(input, true, ctx);
if (!resultRowType) {
diff --git a/ydb/library/yql/dq/type_ann/dq_type_ann.h b/ydb/library/yql/dq/type_ann/dq_type_ann.h
index afcc29eb88e..92269430375 100644
--- a/ydb/library/yql/dq/type_ann/dq_type_ann.h
+++ b/ydb/library/yql/dq/type_ann/dq_type_ann.h
@@ -18,6 +18,7 @@ IGraphTransformer::TStatus AnnotateDqConnection(const TExprNode::TPtr& input, TE
IGraphTransformer::TStatus AnnotateDqCnMerge(const TExprNode::TPtr& input, TExprContext& ctx);
IGraphTransformer::TStatus AnnotateDqJoin(const TExprNode::TPtr& input, TExprContext& ctx);
IGraphTransformer::TStatus AnnotateDqMapOrDictJoin(const TExprNode::TPtr& input, TExprContext& ctx);
+IGraphTransformer::TStatus AnnotateDqGraceJoin(const TExprNode::TPtr& input, TExprContext& ctx);
IGraphTransformer::TStatus AnnotateDqCrossJoin(const TExprNode::TPtr& input, TExprContext& ctx);
IGraphTransformer::TStatus AnnotateDqSource(const TExprNode::TPtr& input, TExprContext& ctx);
IGraphTransformer::TStatus AnnotateDqSink(const TExprNode::TPtr& input, TExprContext& ctx);
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp
index 24fedca4b22..27c64571cb2 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp
@@ -387,7 +387,9 @@ TGraceJoinPacker::TGraceJoinPacker(const std::vector<TType *> & columnTypes, con
DataIntColumnsNum = TotalIntColumnsNum - KeyIntColumnsNum;
DataStrColumnsNum = TotalStrColumnsNum - KeyStrColumnsNum;
- TupleIntVals.resize(2 * totalIntColumnsNum );
+ NullsBitmapSize = (nColumns / (8 * sizeof(ui64)) + 1) ;
+
+ TupleIntVals.resize(2 * totalIntColumnsNum + NullsBitmapSize);
TupleStrings.resize(totalStrColumnsNum);
TupleStrSizes.resize(totalStrColumnsNum);
@@ -399,10 +401,10 @@ TGraceJoinPacker::TGraceJoinPacker(const std::vector<TType *> & columnTypes, con
{
if (a.IsKeyColumn && !b.IsKeyColumn) return true;
if (a.Bytes > b.Bytes) return true;
- if (a.ColumnIdx > b.ColumnIdx ) return true;
+ if (a.ColumnIdx < b.ColumnIdx ) return true;
return false;
});
-
+
Offsets.resize(nColumns);
PackedIdx.resize(nColumns);
TupleHolder.resize(nColumns);
@@ -410,7 +412,6 @@ TGraceJoinPacker::TGraceJoinPacker(const std::vector<TType *> & columnTypes, con
std::transform(TupleHolder.begin(), TupleHolder.end(), std::back_inserter(TuplePtrs), [](NUdf::TUnboxedValue& v) { return std::addressof(v); });
- NullsBitmapSize = (nColumns / (8 * sizeof(ui64)) + 1) ;
ui32 currIntOffset = NullsBitmapSize * sizeof(ui64) ;
ui32 currStrOffset = 0;
ui32 currIdx = 0;
@@ -618,7 +619,7 @@ class TGraceJoinWrapper : public TStatefulWideFlowCodegeneratorNode<TGraceJoinWr
EFetchResult TGraceJoinState::FetchValues(TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
- while (!*JoinCompleted) {
+ while (!*JoinCompleted) {
const NKikimr::NMiniKQL::EFetchResult resultLeft = FlowLeft->FetchValues(ctx, LeftPacker->TuplePtrs.data());
const NKikimr::NMiniKQL::EFetchResult resultRight = FlowRight->FetchValues(ctx, RightPacker->TuplePtrs.data());
@@ -653,14 +654,21 @@ EFetchResult TGraceJoinState::FetchValues(TComputationContext& ctx, NUdf::TUnbox
auto &valsLeft = LeftPacker->TupleHolder;
auto &valsRight = RightPacker->TupleHolder;
+
for (ui32 i = 0; i < LeftRenames.size() / 2; i++)
{
- *output[LeftRenames[2 * i + 1]] = valsLeft[LeftRenames[2 * i]];
+ auto & valPtr = output[LeftRenames[2 * i + 1]];
+ if ( valPtr ) {
+ *valPtr = valsLeft[LeftRenames[2 * i]];
+ }
}
for (ui32 i = 0; i < RightRenames.size() / 2; i++)
{
- *output[RightRenames[2 * i + 1]] = valsRight[RightRenames[2 * i]];
+ auto & valPtr = output[RightRenames[2 * i + 1]];
+ if ( valPtr ) {
+ *valPtr = valsRight[RightRenames[2 * i]];
+ }
}
return EFetchResult::One;
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp
index 76cd026c517..8909a2c7016 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp
@@ -367,7 +367,9 @@ inline void TTable::GetTupleData(ui32 bucketNum, ui32 tupleId, TupleData & td) {
char *strPtr = nullptr;
if(NumberOfKeyStringColumns != 0) {
keyStringsOffset = tb.StringsOffsets[stringsOffsetsIdx] + HeaderSize;
- strPtr = reinterpret_cast<char *>(tb.KeyIntVals[keyStringsOffset]);
+
+ strPtr = reinterpret_cast<char *>(tb.KeyIntVals.data() + keyStringsOffset);
+
for (ui64 i = 0; i < NumberOfKeyStringColumns; ++i)
{
td.StrColumns[i] = strPtr;
diff --git a/ydb/library/yql/minikql/comp_nodes/ut/mkql_grace_join_ut.cpp b/ydb/library/yql/minikql/comp_nodes/ut/mkql_grace_join_ut.cpp
index 2d42af3b417..99622ac38e1 100644
--- a/ydb/library/yql/minikql/comp_nodes/ut/mkql_grace_join_ut.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/ut/mkql_grace_join_ut.cpp
@@ -25,7 +25,7 @@ constexpr bool IsVerbose = false;
Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinMemTest) {
Y_UNIT_TEST(TestMem1) {
-
+
const ui64 TupleSize = 1024;
const ui64 NBuckets = 128;
const ui64 NTuples = 10000;
@@ -162,14 +162,14 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinMemTest) {
CTEST << Endl;
UNIT_ASSERT(true);
-
+
}
}
Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinImpTest) {
- Y_UNIT_TEST(TestImp1) {
+ Y_UNIT_TEST_LLVM(TestImp1) {
ui64 tuple[11] = {0,1,2,3,4,5,6,7,8,9,10};
ui32 strSizes[2] = {4, 4};
@@ -179,7 +179,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinImpTest) {
(char *)"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"};
ui32 bigStrSize[2] = {151, 151};
-
+
GraceJoin::TTable bigTable(1,1,1,1);
GraceJoin::TTable smallTable(1,1,1,1);
@@ -189,7 +189,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinImpTest) {
const ui64 TupleSize = 1024;
- ui64 bigTuple[TupleSize];
+ ui64 bigTuple[TupleSize];
for (ui64 i = 0; i < TupleSize; i++) {
bigTuple[i] = std::rand() / ( RAND_MAX / 10000 );
@@ -197,13 +197,13 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinImpTest) {
ui64 milliseconds = 0;
-
+
const ui64 BigTableTuples = 60000;
const ui64 SmallTableTuples = 15000;
const ui64 BigTupleSize = 32;
- std::chrono::steady_clock::time_point begin03 = std::chrono::steady_clock::now();
+ std::chrono::steady_clock::time_point begin03 = std::chrono::steady_clock::now();
for ( ui64 i = 0; i < BigTableTuples; i++) {
@@ -229,7 +229,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinImpTest) {
bigTable.Clear();
smallTable.Clear();
- begin03 = std::chrono::steady_clock::now();
+ begin03 = std::chrono::steady_clock::now();
for ( ui64 i = 0; i < BigTableTuples; i++) {
@@ -251,7 +251,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinImpTest) {
CTEST << "Time for hash = " << milliseconds << "[ms]" << Endl;
CTEST << "Adding tuples speed: " << (BigTupleSize * (BigTableTuples + SmallTableTuples) * 1000) / ( milliseconds * 1024 * 1024) << "MB/sec" << Endl;
CTEST << Endl;
-
+
std::vector<ui64> vals1, vals2;
@@ -274,7 +274,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinImpTest) {
ui64 numBigTuples = 0;
bigTable.ResetIterator();
- std::chrono::steady_clock::time_point begin04 = std::chrono::steady_clock::now();
+ std::chrono::steady_clock::time_point begin04 = std::chrono::steady_clock::now();
while(bigTable.NextTuple(td1)) { numBigTuples++; }
@@ -287,7 +287,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinImpTest) {
numBigTuples = 0;
bigTable.ResetIterator();
- std::chrono::steady_clock::time_point begin041 = std::chrono::steady_clock::now();
+ std::chrono::steady_clock::time_point begin041 = std::chrono::steady_clock::now();
while(bigTable.NextTuple(td2)) { numBigTuples++; }
@@ -298,7 +298,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinImpTest) {
CTEST << Endl;
- std::chrono::steady_clock::time_point begin05 = std::chrono::steady_clock::now();
+ std::chrono::steady_clock::time_point begin05 = std::chrono::steady_clock::now();
joinTable.Join(bigTable,smallTable);
@@ -310,7 +310,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinImpTest) {
ui64 numJoinedTuples = 0;
- std::chrono::steady_clock::time_point begin042 = std::chrono::steady_clock::now();
+ std::chrono::steady_clock::time_point begin042 = std::chrono::steady_clock::now();
while(joinTable.NextJoinedData(td1, td2)) { numJoinedTuples++; }
@@ -331,8 +331,11 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinImpTest) {
}
}
+
+
Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) {
Y_UNIT_TEST_LLVM(TestInner1) {
+
for (ui32 pass = 0; pass < 1; ++pass) {
TSetup<LLVM> setup;
TProgramBuilder& pb = *setup.PgmBuilder;
@@ -395,10 +398,81 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) {
}
+
+ }
+
+
+ Y_UNIT_TEST_LLVM(TestInnerStringKey1) {
+
+ for (ui32 pass = 0; pass < 1; ++pass) {
+ TSetup<LLVM> setup;
+ TProgramBuilder& pb = *setup.PgmBuilder;
+
+ const auto key1 = pb.NewDataLiteral<NUdf::EDataSlot::String>("1");
+ const auto key2 = pb.NewDataLiteral<NUdf::EDataSlot::String>("2");
+ const auto key3 = pb.NewDataLiteral<NUdf::EDataSlot::String>("4");
+ const auto key4 = pb.NewDataLiteral<NUdf::EDataSlot::String>("4");
+ const auto payload1 = pb.NewDataLiteral<NUdf::EDataSlot::String>("A");
+ const auto payload2 = pb.NewDataLiteral<NUdf::EDataSlot::String>("B");
+ const auto payload3 = pb.NewDataLiteral<NUdf::EDataSlot::String>("C");
+ const auto payload4 = pb.NewDataLiteral<NUdf::EDataSlot::String>("X");
+ const auto payload5 = pb.NewDataLiteral<NUdf::EDataSlot::String>("Y");
+ const auto payload6 = pb.NewDataLiteral<NUdf::EDataSlot::String>("Z");
+
+ const auto tupleType = pb.NewTupleType({
+ pb.NewDataType(NUdf::TDataType<char*>::Id),
+ pb.NewDataType(NUdf::TDataType<char*>::Id)
+ });
+
+ const auto list1 = pb.NewList(tupleType, {
+ pb.NewTuple({key1, payload1}),
+ pb.NewTuple({key2, payload2}),
+ pb.NewTuple({key3, payload3})
+ });
+
+ const auto list2 = pb.NewList(tupleType, {
+ pb.NewTuple({key2, payload4}),
+ pb.NewTuple({key3, payload5}),
+ pb.NewTuple({key4, payload6})
+ });
+
+
+ const auto resultType = pb.NewFlowType(pb.NewTupleType({
+ pb.NewDataType(NUdf::TDataType<char*>::Id),
+ pb.NewDataType(NUdf::TDataType<char*>::Id)
+ }));
+
+ const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.GraceJoin(
+ pb.ExpandMap(pb.ToFlow(list1), [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; }),
+ pb.ExpandMap(pb.ToFlow(list2), [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; }),
+ EJoinKind::Inner, {0U}, {0U}, {1U, 0U}, {1U, 1U}, resultType),
+ [&](TRuntimeNode::TList items) -> TRuntimeNode { return pb.NewTuple(items); })
+ );
+
+ const auto graph = setup.BuildGraph(pgmReturn);
+
+ const auto iterator = graph->GetValue().GetListIterator();
+
+ NUdf::TUnboxedValue tuple;
+
+ UNIT_ASSERT(iterator.Next(tuple));
+ UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(0), "B");
+ UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(1), "X");
+ UNIT_ASSERT(iterator.Next(tuple));
+ UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(0), "C");
+ UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(1), "Y");
+ UNIT_ASSERT(iterator.Next(tuple));
+ UNIT_ASSERT(!iterator.Next(tuple));
+
+ }
+
}
+
+
Y_UNIT_TEST_LLVM(TMiniKQLGraceJoinTestInnerMulti1) {
+
for (ui32 pass = 0; pass < 1; ++pass) {
TSetup<LLVM> setup;
TProgramBuilder& pb = *setup.PgmBuilder;
@@ -466,6 +540,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) {
}
Y_UNIT_TEST_LLVM(TestLeft1) {
+
for (ui32 pass = 0; pass < 1; ++pass) {
TSetup<LLVM> setup;
TProgramBuilder& pb = *setup.PgmBuilder;
@@ -537,6 +612,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) {
}
Y_UNIT_TEST_LLVM(TestLeftMulti1) {
+
for (ui32 pass = 0; pass < 1; ++pass) {
TSetup<LLVM> setup;
TProgramBuilder& pb = *setup.PgmBuilder;
@@ -609,6 +685,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) {
}
Y_UNIT_TEST_LLVM(TestLeftSemi1) {
+
for (ui32 pass = 0; pass < 1; ++pass) {
TSetup<LLVM> setup;
TProgramBuilder& pb = *setup.PgmBuilder;
@@ -671,6 +748,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) {
}
Y_UNIT_TEST_LLVM(TestLeftOnly1) {
+
for (ui32 pass = 0; pass < 1; ++pass) {
TSetup<LLVM> setup;
TProgramBuilder& pb = *setup.PgmBuilder;
@@ -728,6 +806,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) {
}
Y_UNIT_TEST_LLVM(TestLeftSemiWithNullKey1) {
+
for (ui32 pass = 0; pass < 1; ++pass) {
TSetup<LLVM> setup;
TProgramBuilder& pb = *setup.PgmBuilder;
@@ -792,6 +871,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) {
}
Y_UNIT_TEST_LLVM(TestLeftOnlyWithNullKey1) {
+
for (ui32 pass = 0; pass < 1; ++pass) {
TSetup<LLVM> setup;
TProgramBuilder& pb = *setup.PgmBuilder;
@@ -856,6 +936,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) {
}
Y_UNIT_TEST_LLVM(TestRight1) {
+
for (ui32 pass = 0; pass < 1; ++pass) {
TSetup<LLVM> setup;
TProgramBuilder& pb = *setup.PgmBuilder;
@@ -926,7 +1007,9 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) {
}
}
- Y_UNIT_TEST_LLVM(TestRightOnly1) {
+
+ Y_UNIT_TEST_LLVM(TestRightOnly1) {
+
for (ui32 pass = 0; pass < 1; ++pass) {
TSetup<LLVM> setup;
TProgramBuilder& pb = *setup.PgmBuilder;
@@ -982,7 +1065,10 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) {
}
}
+
+
Y_UNIT_TEST_LLVM(TestRightSemi1) {
+
for (ui32 pass = 0; pass < 1; ++pass) {
TSetup<LLVM> setup;
TProgramBuilder& pb = *setup.PgmBuilder;
@@ -1044,7 +1130,9 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) {
}
}
+
Y_UNIT_TEST_LLVM(TestRightMulti1) {
+
for (ui32 pass = 0; pass < 1; ++pass) {
TSetup<LLVM> setup;
TProgramBuilder& pb = *setup.PgmBuilder;
@@ -1116,7 +1204,9 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) {
}
}
- Y_UNIT_TEST_LLVM(TestRightSemiWithNullKey1) {
+
+ Y_UNIT_TEST_LLVM(TestRightSemiWithNullKey1) {
+
for (ui32 pass = 0; pass < 1; ++pass) {
TSetup<LLVM> setup;
TProgramBuilder& pb = *setup.PgmBuilder;
@@ -1181,6 +1271,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) {
}
Y_UNIT_TEST_LLVM(TestRightOnlyWithNullKey1) {
+
for (ui32 pass = 0; pass < 1; ++pass) {
TSetup<LLVM> setup;
TProgramBuilder& pb = *setup.PgmBuilder;
@@ -1245,6 +1336,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) {
}
Y_UNIT_TEST_LLVM(TestFull1) {
+
for (ui32 pass = 0; pass < 1; ++pass) {
TSetup<LLVM> setup;
TProgramBuilder& pb = *setup.PgmBuilder;
@@ -1309,9 +1401,9 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) {
UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(0), "C");
UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(1), "Y");
UNIT_ASSERT(iterator.Next(tuple));
- UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(0), "A");
+ UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(0), "A");
UNIT_ASSERT(!tuple.GetElement(1));
- UNIT_ASSERT(iterator.Next(tuple));
+ UNIT_ASSERT(iterator.Next(tuple));
UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(1), "Z");
UNIT_ASSERT(!tuple.GetElement(0));
UNIT_ASSERT(!iterator.Next(tuple));
@@ -1321,6 +1413,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) {
Y_UNIT_TEST_LLVM(TestExclusion1) {
+
for (ui32 pass = 0; pass < 1; ++pass) {
TSetup<LLVM> setup;
TProgramBuilder& pb = *setup.PgmBuilder;
@@ -1373,9 +1466,9 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) {
UNIT_ASSERT(iterator.Next(tuple));
- UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(0), "A");
+ UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(0), "A");
UNIT_ASSERT(!tuple.GetElement(1));
- UNIT_ASSERT(iterator.Next(tuple));
+ UNIT_ASSERT(iterator.Next(tuple));
UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(1), "Z");
UNIT_ASSERT(!tuple.GetElement(0));
UNIT_ASSERT(!iterator.Next(tuple));
@@ -1385,6 +1478,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) {
}
+
}
}
diff --git a/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp b/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp
index 110b78eb312..716b7273cd7 100644
--- a/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp
+++ b/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp
@@ -47,6 +47,7 @@ TDqConfiguration::TDqConfiguration() {
REGISTER_SETTING(*this, _FallbackOnRuntimeErrors);
REGISTER_SETTING(*this, WorkerFilter);
REGISTER_SETTING(*this, _EnablePrecompute);
+ REGISTER_SETTING(*this, EnableGraceJoin);
REGISTER_SETTING(*this, EnableDqReplicate);
REGISTER_SETTING(*this, WatermarksMode);
REGISTER_SETTING(*this, WatermarksGranularityMs);
diff --git a/ydb/library/yql/providers/dq/common/yql_dq_settings.h b/ydb/library/yql/providers/dq/common/yql_dq_settings.h
index dd4b1b35077..5d0fc8aa524 100644
--- a/ydb/library/yql/providers/dq/common/yql_dq_settings.h
+++ b/ydb/library/yql/providers/dq/common/yql_dq_settings.h
@@ -71,7 +71,7 @@ struct TDqSettings {
NCommon::TConfSetting<TString, false> _FallbackOnRuntimeErrors;
NCommon::TConfSetting<bool, false> _EnablePrecompute;
NCommon::TConfSetting<bool, false> EnableDqReplicate;
- NCommon::TConfSetting<bool, false> _EnableGraceJoin;
+ NCommon::TConfSetting<bool, false> EnableGraceJoin;
NCommon::TConfSetting<TString, false> WatermarksMode;
NCommon::TConfSetting<ui64, false> WatermarksGranularityMs;
@@ -114,6 +114,7 @@ struct TDqSettings {
SAVE_SETTING(WorkerFilter);
SAVE_SETTING(ComputeActorType);
SAVE_SETTING(WatermarksMode);
+ SAVE_SETTING(EnableGraceJoin);
SAVE_SETTING(WatermarksGranularityMs);
#undef SAVE_SETTING
diff --git a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp
index ddd45515bb4..8ededee8059 100644
--- a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp
+++ b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp
@@ -301,7 +301,8 @@ protected:
TMaybeNode<TExprBase> BuildJoin(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) {
auto join = node.Cast<TDqJoin>();
const TParentsMap* parentsMap = getParents();
- return DqBuildJoin(join, ctx, optCtx, *parentsMap, IsGlobal, /* pushLeftStage = */ false /* TODO */);
+ bool enableGraceJoin = Config->EnableGraceJoin.Get().GetOrElse(false);
+ return DqBuildJoin(join, ctx, optCtx, *parentsMap, IsGlobal, /* pushLeftStage = */ false /* TODO */, enableGraceJoin);
}
TMaybeNode<TExprBase> BuildHasItems(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx) {
diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.cpp
index f6aa1df5420..ef5458527f6 100644
--- a/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.cpp
+++ b/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.cpp
@@ -33,6 +33,7 @@ public:
}
AddHandler({TDqJoin::CallableName()}, Hndl(&NDq::AnnotateDqJoin));
AddHandler({TDqPhyMapJoin::CallableName()}, Hndl(&NDq::AnnotateDqMapOrDictJoin));
+ AddHandler({TDqPhyGraceJoin::CallableName()}, Hndl(&NDq::AnnotateDqGraceJoin));
AddHandler({TDqPhyCrossJoin::CallableName()}, Hndl(&NDq::AnnotateDqCrossJoin));
AddHandler({TDqPhyJoinDict::CallableName()}, Hndl(&NDq::AnnotateDqMapOrDictJoin));
AddHandler({TDqSink::CallableName()}, Hndl(&NDq::AnnotateDqSink));