diff options
author | aakulaga <aakulaga@ydb.tech> | 2022-09-16 15:03:55 +0300 |
---|---|---|
committer | aakulaga <aakulaga@ydb.tech> | 2022-09-16 15:03:55 +0300 |
commit | 97475a8905bc20045b5157a38f3cd7d7793b67ac (patch) | |
tree | 48e72d6125d7e7b805aa94a64ddfa59788a2c37e | |
parent | 1bfaafcb1adce88e7a6161dba40b7769a04debb3 (diff) | |
download | ydb-97475a8905bc20045b5157a38f3cd7d7793b67ac.tar.gz |
Grace Join type annotation added
Grace Join type annotation added
-rw-r--r-- | ydb/library/yql/core/type_ann/type_ann_core.cpp | 1 | ||||
-rw-r--r-- | ydb/library/yql/core/type_ann/type_ann_impl.h | 1 | ||||
-rw-r--r-- | ydb/library/yql/core/type_ann/type_ann_join.cpp | 153 | ||||
-rw-r--r-- | ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp | 13 | ||||
-rw-r--r-- | ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp | 38 |
5 files changed, 194 insertions, 12 deletions
diff --git a/ydb/library/yql/core/type_ann/type_ann_core.cpp b/ydb/library/yql/core/type_ann/type_ann_core.cpp index dabc56eee1f..ae54fa1648e 100644 --- a/ydb/library/yql/core/type_ann/type_ann_core.cpp +++ b/ydb/library/yql/core/type_ann/type_ann_core.cpp @@ -11304,6 +11304,7 @@ template <NKikimr::NUdf::EDataSlot DataSlot> Functions["JoinDict"] = &JoinDictWrapper; Functions["MapJoinCore"] = &MapJoinCoreWrapper; Functions["CommonJoinCore"] = &CommonJoinCoreWrapper; + Functions["GraceJoinCore"] = &GraceJoinCoreWrapper; Functions["CombineCore"] = &CombineCoreWrapper; Functions["GroupingCore"] = &GroupingCoreWrapper; Functions["HoppingTraits"] = &HoppingTraitsWrapper; diff --git a/ydb/library/yql/core/type_ann/type_ann_impl.h b/ydb/library/yql/core/type_ann/type_ann_impl.h index 3d940ebd737..31677f60ade 100644 --- a/ydb/library/yql/core/type_ann/type_ann_impl.h +++ b/ydb/library/yql/core/type_ann/type_ann_impl.h @@ -28,6 +28,7 @@ namespace NTypeAnnImpl { IGraphTransformer::TStatus JoinWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx); IGraphTransformer::TStatus JoinDictWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx); IGraphTransformer::TStatus MapJoinCoreWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx); + IGraphTransformer::TStatus GraceJoinCoreWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx); IGraphTransformer::TStatus CommonJoinCoreWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx); IGraphTransformer::TStatus EquiJoinWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx); IGraphTransformer::TStatus CombineCoreWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx); diff --git a/ydb/library/yql/core/type_ann/type_ann_join.cpp b/ydb/library/yql/core/type_ann/type_ann_join.cpp index 18516a8ab8b..cd7c8f54ec1 100644 --- a/ydb/library/yql/core/type_ann/type_ann_join.cpp +++ b/ydb/library/yql/core/type_ann/type_ann_join.cpp @@ -522,6 +522,159 @@ namespace NTypeAnnImpl { } } + + IGraphTransformer::TStatus GraceJoinCoreWrapperImp(const TExprNode::TPtr& input, const TMultiExprType& leftTupleType, const TMultiExprType& rightTupleType, TContext& ctx) { + + if (!EnsureAtom(*input->Child(2), ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + + const auto joinKind = input->Child(2)->Content(); + + const auto& leftKeyColumns = *input->Child(3); + const auto& rightKeyColumns = *input->Child(4); + + if (!EnsureTupleOfAtoms(leftKeyColumns, ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + + if (!EnsureTupleOfAtoms(rightKeyColumns, ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + + + const auto& leftRenames = *input->Child(5); + const auto& rightRenames = *input->Child(6); + + if (!EnsureTupleOfAtoms(leftRenames, ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + + if (leftRenames.ChildrenSize() % 2 != 0) { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(leftRenames.Pos()), TStringBuilder() << "Expected even count of atoms")); + return IGraphTransformer::TStatus::Error; + } + + if (!EnsureTupleOfAtoms(rightRenames, ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + + if (rightRenames.ChildrenSize() % 2 != 0) { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(rightRenames.Pos()), TStringBuilder() << "Expected even count of atoms")); + return IGraphTransformer::TStatus::Error; + } + + const auto outputSize = (leftRenames.ChildrenSize() + rightRenames.ChildrenSize()) / 2; + TVector<const TTypeAnnotationNode*> resultItems; + resultItems.resize(outputSize); + + THashSet<TStringBuf> outputColumns; + outputColumns.reserve(outputSize); + + for (ui32 i = 0; i < leftRenames.ChildrenSize(); i += 2) { + const auto oldName = leftRenames.Child(i); + const auto newName = leftRenames.Child(i + 1); + + const auto oldPos = GetFieldPosition(leftTupleType, oldName->Content()); + if (!oldPos) { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(oldName->Pos()), TStringBuilder() << "Unknown column: " << oldName->Content())); + return IGraphTransformer::TStatus::Error; + } + + if (newName->Content().empty()) { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(newName->Pos()), "Empty column is not allowed")); + return IGraphTransformer::TStatus::Error; + } + + if (!outputColumns.emplace(newName->Content()).second) { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(newName->Pos()), TStringBuilder() << "Duplicate output field: " << newName->Content())); + return IGraphTransformer::TStatus::Error; + } + + const auto columnType = GetFieldType(leftTupleType, *oldPos); + + if (ui32 index; !TryFromString(newName->Content(), index) || index >= resultItems.size()) { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(newName->Pos()), TStringBuilder() << "Invalid output field index: " << newName->Content())); + return IGraphTransformer::TStatus::Error; + } else { + resultItems[index] = columnType; + } + + } + + for (ui32 i = 0; i < rightRenames.ChildrenSize(); i += 2) { + const auto oldName = rightRenames.Child(i); + const auto newName = rightRenames.Child(i + 1); + + const auto oldPos = GetFieldPosition(rightTupleType, oldName->Content()); + if (!oldPos) { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(oldName->Pos()), TStringBuilder() << "Unknown column: " << oldName->Content())); + return IGraphTransformer::TStatus::Error; + } + + if (newName->Content().empty()) { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(newName->Pos()), "Empty column is not allowed")); + return IGraphTransformer::TStatus::Error; + } + + if (!outputColumns.emplace(newName->Content()).second) { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(newName->Pos()), TStringBuilder() << "Duplicate output field: " << newName->Content())); + return IGraphTransformer::TStatus::Error; + } + + auto columnType = GetFieldType(rightTupleType, *oldPos); + if (joinKind == "Left" && !columnType->IsOptionalOrNull()) { + columnType = ctx.Expr.MakeType<TOptionalExprType>(columnType); + } + + if (ui32 index; !TryFromString(newName->Content(), index) || index >= resultItems.size()) { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(newName->Pos()), TStringBuilder() << "Invalid output field index: " << newName->Content())); + return IGraphTransformer::TStatus::Error; + } else { + resultItems[index] = columnType; + } + + } + + const auto resultItemType = ctx.Expr.MakeType<TMultiExprType>(resultItems); + if (!resultItemType->Validate(input->Pos(), ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + input->SetTypeAnn(ctx.Expr.MakeType<TFlowExprType>(resultItemType)); + return IGraphTransformer::TStatus::Ok; + } + + + IGraphTransformer::TStatus GraceJoinCoreWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) { + Y_UNUSED(output); + + if (!EnsureArgsCount(*input, 7, ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + + const TTypeAnnotationNode* leftItemType = nullptr; + if (!EnsureNewSeqType<true>(input->Head(), ctx.Expr, &leftItemType)) { + return IGraphTransformer::TStatus::Error; + } + + const TTypeAnnotationNode* rightItemType = nullptr; + if (!EnsureNewSeqType<true>(*input->Child(1), ctx.Expr, &rightItemType)) { + return IGraphTransformer::TStatus::Error; + } + + if ( !EnsureWideFlowType(*input->Child(0), ctx.Expr) ) { + return IGraphTransformer::TStatus::Error; + } + + if ( !EnsureWideFlowType(*input->Child(1), ctx.Expr) ) { + return IGraphTransformer::TStatus::Error; + } + + return GraceJoinCoreWrapperImp(input, *leftItemType->Cast<TMultiExprType>(), *rightItemType->Cast<TMultiExprType>(), ctx); + + } + + template<class TInputType> IGraphTransformer::TStatus CommonJoinCoreWrapperT(const TExprNode::TPtr& input, const TInputType& inputItemType, TContext& ctx) { constexpr bool ByStruct = std::is_same<TInputType, TStructExprType>::value; diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp index 88d9e615ba5..895efd42e54 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp @@ -595,17 +595,8 @@ class TGraceJoinWrapper : public TStatefulWideFlowCodegeneratorNode<TGraceJoinWr EFetchResult GraceJoinState::DoCalculateWrapper(TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const { -/* - const auto statePtr = static_cast<TState*>(state.AsBoxed().Get()); - - auto & JoinCompleted = statePtr->State->JoinCompleted; - auto & LeftPacker = statePtr->State->LeftPacker; - auto & RightPacker = statePtr->State->RightPacker; - auto & JoinedTablePtr = statePtr->State->JoinedTablePtr; - auto & JoinedTuple = statePtr->State->JoinedTuple; -*/ - while (!*JoinCompleted) { + while (!*JoinCompleted) { const NKikimr::NMiniKQL::EFetchResult resultLeft = FlowLeft->FetchValues(ctx, LeftPacker->TuplePtrs.data()); const NKikimr::NMiniKQL::EFetchResult resultRight = FlowRight->FetchValues(ctx, RightPacker->TuplePtrs.data()); @@ -624,7 +615,7 @@ EFetchResult GraceJoinState::DoCalculateWrapper(TComputationContext& ctx, NUdf:: return EFetchResult::Yield; } - if (resultRight == EFetchResult::Finish && resultRight == EFetchResult::Finish && !*JoinCompleted) { + if (resultRight == EFetchResult::Finish && resultLeft == EFetchResult::Finish && !*JoinCompleted) { *JoinCompleted = true; JoinedTablePtr->Join(*LeftPacker->TablePtr, *RightPacker->TablePtr, JoinKind); JoinedTablePtr->ResetIterator(); diff --git a/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp b/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp index 2025958d96c..b242d345e3f 100644 --- a/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp +++ b/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp @@ -1435,7 +1435,43 @@ TMkqlCommonCallableCompiler::TShared::TShared() { return ctx.ProgramBuilder.MapJoinCore(list, dict, joinKind, leftKeyColumns, leftRenames, rightRenames, returnType); }); -// TODO: Add GraceJoinCore callable here + AddCallable("GraceJoinCore", [](const TExprNode& node, TMkqlBuildContext& ctx) { + const auto flowLeft = MkqlBuildExpr(*node.Child(0), ctx); + const auto flowRight = MkqlBuildExpr(*node.Child(1), ctx); + const auto joinKind = GetJoinKind(node, node.Child(2)->Content()); + + const auto outputItemType = GetSeqItemType(node.GetTypeAnn()); + + std::vector<ui32> leftKeyColumns, rightKeyColumns, leftRenames, rightRenames; + const auto leftItemType = GetSeqItemType(node.Child(0)->GetTypeAnn()); + const auto rightItemType = GetSeqItemType(node.Child(1)->GetTypeAnn()); + + if (leftItemType->GetKind() != ETypeAnnotationKind::Multi || + rightItemType->GetKind() != ETypeAnnotationKind::Multi ) { + ythrow TNodeException(node) << "Wrong GraceJoinCore input item type: " << *leftItemType << " " << *rightItemType; + + } + + if (outputItemType->GetKind() != ETypeAnnotationKind::Multi ) { + ythrow TNodeException(node) << "Wrong GraceJoinCore output item type: " << *outputItemType; + + } + + const auto leftTupleType = leftItemType->Cast<TMultiExprType>(); + const auto rightTupleType = leftItemType->Cast<TMultiExprType>(); + const auto outputTupleType = outputItemType->Cast<TMultiExprType>(); + + node.Child(3)->ForEachChild([&](const TExprNode& child){ leftKeyColumns.emplace_back(*GetFieldPosition(*leftTupleType, child.Content())); }); + node.Child(4)->ForEachChild([&](const TExprNode& child){ rightKeyColumns.emplace_back(*GetFieldPosition(*rightTupleType, child.Content())); }); + bool s = false; + node.Child(5)->ForEachChild([&](const TExprNode& child){ leftRenames.emplace_back(*GetFieldPosition((s = !s) ? *leftTupleType : *outputTupleType, child.Content())); }); + s = false; + node.Child(6)->ForEachChild([&](const TExprNode& child){ rightRenames.emplace_back(*GetFieldPosition((s = !s) ? *rightTupleType : *outputTupleType, child.Content())); }); + + const auto returnType = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder); + return ctx.ProgramBuilder.GraceJoin(flowLeft, flowRight, joinKind, leftKeyColumns, rightKeyColumns, leftRenames, rightRenames, returnType); + }); + AddCallable("CommonJoinCore", [](const TExprNode& node, TMkqlBuildContext& ctx) { const auto list = MkqlBuildExpr(node.Head(), ctx); |