aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraakulaga <aakulaga@ydb.tech>2022-09-16 15:03:55 +0300
committeraakulaga <aakulaga@ydb.tech>2022-09-16 15:03:55 +0300
commit97475a8905bc20045b5157a38f3cd7d7793b67ac (patch)
tree48e72d6125d7e7b805aa94a64ddfa59788a2c37e
parent1bfaafcb1adce88e7a6161dba40b7769a04debb3 (diff)
downloadydb-97475a8905bc20045b5157a38f3cd7d7793b67ac.tar.gz
Grace Join type annotation added
Grace Join type annotation added
-rw-r--r--ydb/library/yql/core/type_ann/type_ann_core.cpp1
-rw-r--r--ydb/library/yql/core/type_ann/type_ann_impl.h1
-rw-r--r--ydb/library/yql/core/type_ann/type_ann_join.cpp153
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp13
-rw-r--r--ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp38
5 files changed, 194 insertions, 12 deletions
diff --git a/ydb/library/yql/core/type_ann/type_ann_core.cpp b/ydb/library/yql/core/type_ann/type_ann_core.cpp
index dabc56eee1f..ae54fa1648e 100644
--- a/ydb/library/yql/core/type_ann/type_ann_core.cpp
+++ b/ydb/library/yql/core/type_ann/type_ann_core.cpp
@@ -11304,6 +11304,7 @@ template <NKikimr::NUdf::EDataSlot DataSlot>
Functions["JoinDict"] = &JoinDictWrapper;
Functions["MapJoinCore"] = &MapJoinCoreWrapper;
Functions["CommonJoinCore"] = &CommonJoinCoreWrapper;
+ Functions["GraceJoinCore"] = &GraceJoinCoreWrapper;
Functions["CombineCore"] = &CombineCoreWrapper;
Functions["GroupingCore"] = &GroupingCoreWrapper;
Functions["HoppingTraits"] = &HoppingTraitsWrapper;
diff --git a/ydb/library/yql/core/type_ann/type_ann_impl.h b/ydb/library/yql/core/type_ann/type_ann_impl.h
index 3d940ebd737..31677f60ade 100644
--- a/ydb/library/yql/core/type_ann/type_ann_impl.h
+++ b/ydb/library/yql/core/type_ann/type_ann_impl.h
@@ -28,6 +28,7 @@ namespace NTypeAnnImpl {
IGraphTransformer::TStatus JoinWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
IGraphTransformer::TStatus JoinDictWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
IGraphTransformer::TStatus MapJoinCoreWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
+ IGraphTransformer::TStatus GraceJoinCoreWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
IGraphTransformer::TStatus CommonJoinCoreWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
IGraphTransformer::TStatus EquiJoinWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
IGraphTransformer::TStatus CombineCoreWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
diff --git a/ydb/library/yql/core/type_ann/type_ann_join.cpp b/ydb/library/yql/core/type_ann/type_ann_join.cpp
index 18516a8ab8b..cd7c8f54ec1 100644
--- a/ydb/library/yql/core/type_ann/type_ann_join.cpp
+++ b/ydb/library/yql/core/type_ann/type_ann_join.cpp
@@ -522,6 +522,159 @@ namespace NTypeAnnImpl {
}
}
+
+ IGraphTransformer::TStatus GraceJoinCoreWrapperImp(const TExprNode::TPtr& input, const TMultiExprType& leftTupleType, const TMultiExprType& rightTupleType, TContext& ctx) {
+
+ if (!EnsureAtom(*input->Child(2), ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ const auto joinKind = input->Child(2)->Content();
+
+ const auto& leftKeyColumns = *input->Child(3);
+ const auto& rightKeyColumns = *input->Child(4);
+
+ if (!EnsureTupleOfAtoms(leftKeyColumns, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (!EnsureTupleOfAtoms(rightKeyColumns, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+
+ const auto& leftRenames = *input->Child(5);
+ const auto& rightRenames = *input->Child(6);
+
+ if (!EnsureTupleOfAtoms(leftRenames, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (leftRenames.ChildrenSize() % 2 != 0) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(leftRenames.Pos()), TStringBuilder() << "Expected even count of atoms"));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (!EnsureTupleOfAtoms(rightRenames, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (rightRenames.ChildrenSize() % 2 != 0) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(rightRenames.Pos()), TStringBuilder() << "Expected even count of atoms"));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ const auto outputSize = (leftRenames.ChildrenSize() + rightRenames.ChildrenSize()) / 2;
+ TVector<const TTypeAnnotationNode*> resultItems;
+ resultItems.resize(outputSize);
+
+ THashSet<TStringBuf> outputColumns;
+ outputColumns.reserve(outputSize);
+
+ for (ui32 i = 0; i < leftRenames.ChildrenSize(); i += 2) {
+ const auto oldName = leftRenames.Child(i);
+ const auto newName = leftRenames.Child(i + 1);
+
+ const auto oldPos = GetFieldPosition(leftTupleType, oldName->Content());
+ if (!oldPos) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(oldName->Pos()), TStringBuilder() << "Unknown column: " << oldName->Content()));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (newName->Content().empty()) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(newName->Pos()), "Empty column is not allowed"));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (!outputColumns.emplace(newName->Content()).second) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(newName->Pos()), TStringBuilder() << "Duplicate output field: " << newName->Content()));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ const auto columnType = GetFieldType(leftTupleType, *oldPos);
+
+ if (ui32 index; !TryFromString(newName->Content(), index) || index >= resultItems.size()) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(newName->Pos()), TStringBuilder() << "Invalid output field index: " << newName->Content()));
+ return IGraphTransformer::TStatus::Error;
+ } else {
+ resultItems[index] = columnType;
+ }
+
+ }
+
+ for (ui32 i = 0; i < rightRenames.ChildrenSize(); i += 2) {
+ const auto oldName = rightRenames.Child(i);
+ const auto newName = rightRenames.Child(i + 1);
+
+ const auto oldPos = GetFieldPosition(rightTupleType, oldName->Content());
+ if (!oldPos) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(oldName->Pos()), TStringBuilder() << "Unknown column: " << oldName->Content()));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (newName->Content().empty()) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(newName->Pos()), "Empty column is not allowed"));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (!outputColumns.emplace(newName->Content()).second) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(newName->Pos()), TStringBuilder() << "Duplicate output field: " << newName->Content()));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ auto columnType = GetFieldType(rightTupleType, *oldPos);
+ if (joinKind == "Left" && !columnType->IsOptionalOrNull()) {
+ columnType = ctx.Expr.MakeType<TOptionalExprType>(columnType);
+ }
+
+ if (ui32 index; !TryFromString(newName->Content(), index) || index >= resultItems.size()) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(newName->Pos()), TStringBuilder() << "Invalid output field index: " << newName->Content()));
+ return IGraphTransformer::TStatus::Error;
+ } else {
+ resultItems[index] = columnType;
+ }
+
+ }
+
+ const auto resultItemType = ctx.Expr.MakeType<TMultiExprType>(resultItems);
+ if (!resultItemType->Validate(input->Pos(), ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+ input->SetTypeAnn(ctx.Expr.MakeType<TFlowExprType>(resultItemType));
+ return IGraphTransformer::TStatus::Ok;
+ }
+
+
+ IGraphTransformer::TStatus GraceJoinCoreWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
+ Y_UNUSED(output);
+
+ if (!EnsureArgsCount(*input, 7, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ const TTypeAnnotationNode* leftItemType = nullptr;
+ if (!EnsureNewSeqType<true>(input->Head(), ctx.Expr, &leftItemType)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ const TTypeAnnotationNode* rightItemType = nullptr;
+ if (!EnsureNewSeqType<true>(*input->Child(1), ctx.Expr, &rightItemType)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if ( !EnsureWideFlowType(*input->Child(0), ctx.Expr) ) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if ( !EnsureWideFlowType(*input->Child(1), ctx.Expr) ) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ return GraceJoinCoreWrapperImp(input, *leftItemType->Cast<TMultiExprType>(), *rightItemType->Cast<TMultiExprType>(), ctx);
+
+ }
+
+
template<class TInputType>
IGraphTransformer::TStatus CommonJoinCoreWrapperT(const TExprNode::TPtr& input, const TInputType& inputItemType, TContext& ctx) {
constexpr bool ByStruct = std::is_same<TInputType, TStructExprType>::value;
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp
index 88d9e615ba5..895efd42e54 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp
@@ -595,17 +595,8 @@ class TGraceJoinWrapper : public TStatefulWideFlowCodegeneratorNode<TGraceJoinWr
EFetchResult GraceJoinState::DoCalculateWrapper(TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
-/*
- const auto statePtr = static_cast<TState*>(state.AsBoxed().Get());
-
- auto & JoinCompleted = statePtr->State->JoinCompleted;
- auto & LeftPacker = statePtr->State->LeftPacker;
- auto & RightPacker = statePtr->State->RightPacker;
- auto & JoinedTablePtr = statePtr->State->JoinedTablePtr;
- auto & JoinedTuple = statePtr->State->JoinedTuple;
-*/
- while (!*JoinCompleted) {
+ while (!*JoinCompleted) {
const NKikimr::NMiniKQL::EFetchResult resultLeft = FlowLeft->FetchValues(ctx, LeftPacker->TuplePtrs.data());
const NKikimr::NMiniKQL::EFetchResult resultRight = FlowRight->FetchValues(ctx, RightPacker->TuplePtrs.data());
@@ -624,7 +615,7 @@ EFetchResult GraceJoinState::DoCalculateWrapper(TComputationContext& ctx, NUdf::
return EFetchResult::Yield;
}
- if (resultRight == EFetchResult::Finish && resultRight == EFetchResult::Finish && !*JoinCompleted) {
+ if (resultRight == EFetchResult::Finish && resultLeft == EFetchResult::Finish && !*JoinCompleted) {
*JoinCompleted = true;
JoinedTablePtr->Join(*LeftPacker->TablePtr, *RightPacker->TablePtr, JoinKind);
JoinedTablePtr->ResetIterator();
diff --git a/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp b/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp
index 2025958d96c..b242d345e3f 100644
--- a/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp
+++ b/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp
@@ -1435,7 +1435,43 @@ TMkqlCommonCallableCompiler::TShared::TShared() {
return ctx.ProgramBuilder.MapJoinCore(list, dict, joinKind, leftKeyColumns, leftRenames, rightRenames, returnType);
});
-// TODO: Add GraceJoinCore callable here
+ AddCallable("GraceJoinCore", [](const TExprNode& node, TMkqlBuildContext& ctx) {
+ const auto flowLeft = MkqlBuildExpr(*node.Child(0), ctx);
+ const auto flowRight = MkqlBuildExpr(*node.Child(1), ctx);
+ const auto joinKind = GetJoinKind(node, node.Child(2)->Content());
+
+ const auto outputItemType = GetSeqItemType(node.GetTypeAnn());
+
+ std::vector<ui32> leftKeyColumns, rightKeyColumns, leftRenames, rightRenames;
+ const auto leftItemType = GetSeqItemType(node.Child(0)->GetTypeAnn());
+ const auto rightItemType = GetSeqItemType(node.Child(1)->GetTypeAnn());
+
+ if (leftItemType->GetKind() != ETypeAnnotationKind::Multi ||
+ rightItemType->GetKind() != ETypeAnnotationKind::Multi ) {
+ ythrow TNodeException(node) << "Wrong GraceJoinCore input item type: " << *leftItemType << " " << *rightItemType;
+
+ }
+
+ if (outputItemType->GetKind() != ETypeAnnotationKind::Multi ) {
+ ythrow TNodeException(node) << "Wrong GraceJoinCore output item type: " << *outputItemType;
+
+ }
+
+ const auto leftTupleType = leftItemType->Cast<TMultiExprType>();
+ const auto rightTupleType = leftItemType->Cast<TMultiExprType>();
+ const auto outputTupleType = outputItemType->Cast<TMultiExprType>();
+
+ node.Child(3)->ForEachChild([&](const TExprNode& child){ leftKeyColumns.emplace_back(*GetFieldPosition(*leftTupleType, child.Content())); });
+ node.Child(4)->ForEachChild([&](const TExprNode& child){ rightKeyColumns.emplace_back(*GetFieldPosition(*rightTupleType, child.Content())); });
+ bool s = false;
+ node.Child(5)->ForEachChild([&](const TExprNode& child){ leftRenames.emplace_back(*GetFieldPosition((s = !s) ? *leftTupleType : *outputTupleType, child.Content())); });
+ s = false;
+ node.Child(6)->ForEachChild([&](const TExprNode& child){ rightRenames.emplace_back(*GetFieldPosition((s = !s) ? *rightTupleType : *outputTupleType, child.Content())); });
+
+ const auto returnType = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder);
+ return ctx.ProgramBuilder.GraceJoin(flowLeft, flowRight, joinKind, leftKeyColumns, rightKeyColumns, leftRenames, rightRenames, returnType);
+ });
+
AddCallable("CommonJoinCore", [](const TExprNode& node, TMkqlBuildContext& ctx) {
const auto list = MkqlBuildExpr(node.Head(), ctx);