diff options
author | aakulaga <aakulaga@ydb.tech> | 2023-07-12 21:00:37 +0300 |
---|---|---|
committer | aakulaga <aakulaga@ydb.tech> | 2023-07-12 21:00:37 +0300 |
commit | dba71ef6b2793b48a7a459fc452ac0ecf80babb1 (patch) | |
tree | 2e406cb09205b32e34e0abd2dd88d0838bfd699b | |
parent | 0eae0451d1b559aedec887598d5807fb984511b5 (diff) | |
download | ydb-dba71ef6b2793b48a7a459fc452ac0ecf80babb1.tar.gz |
SelfJoin Node added
SelfJoin Node added
CreateGraceJoin
6 files changed, 193 insertions, 11 deletions
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp index 26905d3bf2..0f20f922ee 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp @@ -228,6 +228,7 @@ struct TCallableComputationNodeBuilderFuncMapFiller { {"ToIndexDict", &WrapToIndexDict}, {"JoinDict", &WrapJoinDict}, {"GraceJoin", &WrapGraceJoin}, + {"SelfJoin", &WrapSelfJoin}, {"MapJoinCore", &WrapMapJoinCore}, {"CommonJoinCore", &WrapCommonJoinCore}, {"CombineCore", &WrapCombineCore}, diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp index f5c4a38f0d..f9a9fdf3d7 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp @@ -562,7 +562,8 @@ public: IComputationWideFlowNode* flowLeft, IComputationWideFlowNode* flowRight, EJoinKind joinKind, EAnyJoinSettings anyJoinSettings, const std::vector<ui32>& leftKeyColumns, const std::vector<ui32>& rightKeyColumns, const std::vector<ui32>& leftRenames, const std::vector<ui32>& rightRenames, - const std::vector<TType*>& leftColumnsTypes, const std::vector<TType*>& rightColumnsTypes, const THolderFactory & holderFactory) + const std::vector<TType*>& leftColumnsTypes, const std::vector<TType*>& rightColumnsTypes, const THolderFactory & holderFactory, + const bool isSelfJoin) : TBase(memInfo) , FlowLeft(flowLeft) , FlowRight(flowRight) @@ -578,6 +579,7 @@ public: , HaveMoreLeftRows(std::make_unique<bool>(true)) , HaveMoreRightRows(std::make_unique<bool>(true)) , JoinedTuple(std::make_unique<std::vector<NUdf::TUnboxedValue*>>() ) + , IsSelfJoin_(isSelfJoin) { if (JoinKind == EJoinKind::Full || JoinKind == EJoinKind::Exclusion ) { LeftPacker->BatchSize = std::numeric_limits<ui64>::max(); @@ -603,6 +605,7 @@ private: const std::unique_ptr<bool> HaveMoreLeftRows; const std::unique_ptr<bool> HaveMoreRightRows; const std::unique_ptr<std::vector<NUdf::TUnboxedValue*>> JoinedTuple; + const bool IsSelfJoin_; }; class TGraceJoinWrapper : public TStatefulWideFlowCodegeneratorNode<TGraceJoinWrapper> { @@ -613,7 +616,7 @@ class TGraceJoinWrapper : public TStatefulWideFlowCodegeneratorNode<TGraceJoinWr EJoinKind joinKind, EAnyJoinSettings anyJoinSettings, std::vector<ui32>&& leftKeyColumns, std::vector<ui32>&& rightKeyColumns, std::vector<ui32>&& leftRenames, std::vector<ui32>&& rightRenames, std::vector<TType*>&& leftColumnsTypes, std::vector<TType*>&& rightColumnsTypes, - std::vector<EValueRepresentation>&& outputRepresentations) + std::vector<EValueRepresentation>&& outputRepresentations, bool isSelfJoin) : TBaseComputation(mutables, nullptr, EValueRepresentation::Boxed) , FlowLeft(flowLeft) , FlowRight(flowRight) @@ -626,6 +629,7 @@ class TGraceJoinWrapper : public TStatefulWideFlowCodegeneratorNode<TGraceJoinWr , LeftColumnsTypes(std::move(leftColumnsTypes)) , RightColumnsTypes(std::move(rightColumnsTypes)) , OutputRepresentations(std::move(outputRepresentations)) + , IsSelfJoin_(isSelfJoin) {} EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const { @@ -727,7 +731,7 @@ class TGraceJoinWrapper : public TStatefulWideFlowCodegeneratorNode<TGraceJoinWr state = ctx.HolderFactory.Create<TGraceJoinState>( FlowLeft, FlowRight, JoinKind, AnyJoinSettings_, LeftKeyColumns, RightKeyColumns, LeftRenames, RightRenames, LeftColumnsTypes, RightColumnsTypes, - ctx.HolderFactory); + ctx.HolderFactory, IsSelfJoin_); } IComputationWideFlowNode *const FlowLeft; @@ -741,6 +745,7 @@ class TGraceJoinWrapper : public TStatefulWideFlowCodegeneratorNode<TGraceJoinWr const std::vector<TType *> LeftColumnsTypes; const std::vector<TType *> RightColumnsTypes; const std::vector<EValueRepresentation> OutputRepresentations; + const bool IsSelfJoin_; }; EFetchResult TGraceJoinState::FetchValues(TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const { @@ -751,7 +756,7 @@ EFetchResult TGraceJoinState::FetchValues(TComputationContext& ctx, NUdf::TUnbox if ( *PartialJoinCompleted) { // Returns join results (batch or full) - JoinedTuple->resize((LeftRenames.size() + RightRenames.size()) / 2); + while (JoinedTablePtr->NextJoinedData(LeftPacker->JoinTupleData, RightPacker->JoinTupleData)) { LeftPacker->UnPack(); @@ -808,7 +813,14 @@ EFetchResult TGraceJoinState::FetchValues(TComputationContext& ctx, NUdf::TUnbox const NKikimr::NMiniKQL::EFetchResult resultLeft = FlowLeft->FetchValues(ctx, LeftPacker->TuplePtrs.data()); - const NKikimr::NMiniKQL::EFetchResult resultRight = FlowRight->FetchValues(ctx, RightPacker->TuplePtrs.data()); + + NKikimr::NMiniKQL::EFetchResult resultRight; + + if (IsSelfJoin_) { + resultRight = resultLeft; + } else { + resultRight = FlowRight->FetchValues(ctx, RightPacker->TuplePtrs.data()); + } if (resultLeft == EFetchResult::One) { if (LeftPacker->TuplesPacked == 0) { @@ -823,8 +835,10 @@ EFetchResult TGraceJoinState::FetchValues(TComputationContext& ctx, NUdf::TUnbox RightPacker->StartTime = std::chrono::system_clock::now(); } - RightPacker->Pack(); - RightPacker->TablePtr->AddTuple(RightPacker->TupleIntVals.data(), RightPacker->TupleStrings.data(), RightPacker->TupleStrSizes.data(), RightPacker->IColumnsHolder.data()); + if ( !IsSelfJoin_ ) { + RightPacker->Pack(); + RightPacker->TablePtr->AddTuple(RightPacker->TupleIntVals.data(), RightPacker->TupleStrings.data(), RightPacker->TupleStrSizes.data(), RightPacker->IColumnsHolder.data()); + } } if (resultLeft == EFetchResult::Yield || resultRight == EFetchResult::Yield) { @@ -859,7 +873,11 @@ EFetchResult TGraceJoinState::FetchValues(TComputationContext& ctx, NUdf::TUnbox *PartialJoinCompleted = true; LeftPacker->StartTime = std::chrono::system_clock::now(); RightPacker->StartTime = std::chrono::system_clock::now(); - JoinedTablePtr->Join(*LeftPacker->TablePtr, *RightPacker->TablePtr, JoinKind, *HaveMoreLeftRows, *HaveMoreRightRows); + if ( IsSelfJoin_ ) { + JoinedTablePtr->Join(*LeftPacker->TablePtr, *LeftPacker->TablePtr, JoinKind, *HaveMoreLeftRows, *HaveMoreRightRows); + } else { + JoinedTablePtr->Join(*LeftPacker->TablePtr, *RightPacker->TablePtr, JoinKind, *HaveMoreLeftRows, *HaveMoreRightRows); + } JoinedTablePtr->ResetIterator(); LeftPacker->EndTime = std::chrono::system_clock::now(); RightPacker->EndTime = std::chrono::system_clock::now(); @@ -874,8 +892,67 @@ EFetchResult TGraceJoinState::FetchValues(TComputationContext& ctx, NUdf::TUnbox } +IComputationNode* CreateGraceJoin(TCallable& callable, const TComputationNodeFactoryContext& ctx, bool isSelfJoin = false) { + + MKQL_ENSURE(callable.GetInputsCount() == 8, "Expected 8 args"); + + const auto leftFlowNode = callable.GetInput(0); + const auto rightFlowNode = callable.GetInput(1); + const auto leftFlowComponents = GetWideComponents(AS_TYPE(TFlowType, leftFlowNode)); + const auto rightFlowComponents = GetWideComponents(AS_TYPE(TFlowType, rightFlowNode)); + const auto joinKindNode = callable.GetInput(2); + const auto leftKeyColumnsNode = AS_VALUE(TTupleLiteral, callable.GetInput(3)); + const auto rightKeyColumnsNode = AS_VALUE(TTupleLiteral, callable.GetInput(4)); + const auto leftRenamesNode = AS_VALUE(TTupleLiteral, callable.GetInput(5)); + const auto rightRenamesNode = AS_VALUE(TTupleLiteral, callable.GetInput(6)); + const ui32 rawJoinKind = AS_VALUE(TDataLiteral, joinKindNode)->AsValue().Get<ui32>(); + + const EAnyJoinSettings anyJoinSettings = GetAnyJoinSettings(AS_VALUE(TDataLiteral, callable.GetInput(7))->AsValue().Get<ui32>()); + + const auto flowLeft = dynamic_cast<IComputationWideFlowNode*> (LocateNode(ctx.NodeLocator, callable, 0)); + const auto flowRight = dynamic_cast<IComputationWideFlowNode*> (LocateNode(ctx.NodeLocator, callable, 1)); + + const auto outputFlowComponents = GetWideComponents(AS_TYPE(TFlowType, callable.GetType()->GetReturnType())); + std::vector<EValueRepresentation> outputRepresentations; + outputRepresentations.reserve(outputFlowComponents.size()); + for (ui32 i = 0U; i < outputFlowComponents.size(); ++i) { + outputRepresentations.emplace_back(GetValueRepresentation(outputFlowComponents[i])); + } + + std::vector<ui32> leftKeyColumns, leftRenames, rightKeyColumns, rightRenames; + std::vector<TType*> leftColumnsTypes(leftFlowComponents.begin(), leftFlowComponents.end()); + std::vector<TType*> rightColumnsTypes(rightFlowComponents.begin(), rightFlowComponents.end()); + + leftKeyColumns.reserve(leftKeyColumnsNode->GetValuesCount()); + for (ui32 i = 0; i < leftKeyColumnsNode->GetValuesCount(); ++i) { + leftKeyColumns.emplace_back(AS_VALUE(TDataLiteral, leftKeyColumnsNode->GetValue(i))->AsValue().Get<ui32>()); + } + + leftRenames.reserve(leftRenamesNode->GetValuesCount()); + for (ui32 i = 0; i < leftRenamesNode->GetValuesCount(); ++i) { + leftRenames.emplace_back(AS_VALUE(TDataLiteral, leftRenamesNode->GetValue(i))->AsValue().Get<ui32>()); + } + + rightKeyColumns.reserve(rightKeyColumnsNode->GetValuesCount()); + for (ui32 i = 0; i < rightKeyColumnsNode->GetValuesCount(); ++i) { + rightKeyColumns.emplace_back(AS_VALUE(TDataLiteral, rightKeyColumnsNode->GetValue(i))->AsValue().Get<ui32>()); + } + + rightRenames.reserve(rightRenamesNode->GetValuesCount()); + for (ui32 i = 0; i < rightRenamesNode->GetValuesCount(); ++i) { + rightRenames.emplace_back(AS_VALUE(TDataLiteral, rightRenamesNode->GetValue(i))->AsValue().Get<ui32>()); + } + + return new TGraceJoinWrapper( + ctx.Mutables, flowLeft, flowRight, GetJoinKind(rawJoinKind), anyJoinSettings, + std::move(leftKeyColumns), std::move(rightKeyColumns), std::move(leftRenames), std::move(rightRenames), + std::move(leftColumnsTypes), std::move(rightColumnsTypes), std::move(outputRepresentations), isSelfJoin); + +} + IComputationNode* WrapGraceJoin(TCallable& callable, const TComputationNodeFactoryContext& ctx) { - MKQL_ENSURE(callable.GetInputsCount() == 8, "Expected 8 args"); + + MKQL_ENSURE(callable.GetInputsCount() == 8, "Expected 8 args"); const auto leftFlowNode = callable.GetInput(0); const auto rightFlowNode = callable.GetInput(1); @@ -927,10 +1004,73 @@ IComputationNode* WrapGraceJoin(TCallable& callable, const TComputationNodeFacto return new TGraceJoinWrapper( ctx.Mutables, flowLeft, flowRight, GetJoinKind(rawJoinKind), anyJoinSettings, std::move(leftKeyColumns), std::move(rightKeyColumns), std::move(leftRenames), std::move(rightRenames), - std::move(leftColumnsTypes), std::move(rightColumnsTypes), std::move(outputRepresentations)); + std::move(leftColumnsTypes), std::move(rightColumnsTypes), std::move(outputRepresentations), false); + +} + +IComputationNode* WrapSelfJoin(TCallable& callable, const TComputationNodeFactoryContext& ctx) { + + MKQL_ENSURE(callable.GetInputsCount() == 7, "Expected 7 args"); + + const auto leftFlowNode = callable.GetInput(0); + const auto leftFlowComponents = GetWideComponents(AS_TYPE(TFlowType, leftFlowNode)); + const auto rightFlowComponents{leftFlowComponents}; + const auto joinKindNode = callable.GetInput(1); + const auto leftKeyColumnsNode = AS_VALUE(TTupleLiteral, callable.GetInput(2)); + const auto rightKeyColumnsNode = AS_VALUE(TTupleLiteral, callable.GetInput(3)); + const auto leftRenamesNode = AS_VALUE(TTupleLiteral, callable.GetInput(4)); + const auto rightRenamesNode = AS_VALUE(TTupleLiteral, callable.GetInput(5)); + const ui32 rawJoinKind = AS_VALUE(TDataLiteral, joinKindNode)->AsValue().Get<ui32>(); + + const EAnyJoinSettings anyJoinSettings = GetAnyJoinSettings(AS_VALUE(TDataLiteral, callable.GetInput(6))->AsValue().Get<ui32>()); + + const auto flowLeft = dynamic_cast<IComputationWideFlowNode*> (LocateNode(ctx.NodeLocator, callable, 0)); + + const auto outputFlowComponents = GetWideComponents(AS_TYPE(TFlowType, callable.GetType()->GetReturnType())); + std::vector<EValueRepresentation> outputRepresentations; + outputRepresentations.reserve(outputFlowComponents.size()); + for (ui32 i = 0U; i < outputFlowComponents.size(); ++i) { + outputRepresentations.emplace_back(GetValueRepresentation(outputFlowComponents[i])); + } + + std::vector<ui32> leftKeyColumns, leftRenames, rightKeyColumns, rightRenames; + std::vector<TType*> leftColumnsTypes(leftFlowComponents.begin(), leftFlowComponents.end()); + std::vector<TType*> rightColumnsTypes{leftColumnsTypes}; + + leftKeyColumns.reserve(leftKeyColumnsNode->GetValuesCount()); + for (ui32 i = 0; i < leftKeyColumnsNode->GetValuesCount(); ++i) { + leftKeyColumns.emplace_back(AS_VALUE(TDataLiteral, leftKeyColumnsNode->GetValue(i))->AsValue().Get<ui32>()); + } + + leftRenames.reserve(leftRenamesNode->GetValuesCount()); + for (ui32 i = 0; i < leftRenamesNode->GetValuesCount(); ++i) { + leftRenames.emplace_back(AS_VALUE(TDataLiteral, leftRenamesNode->GetValue(i))->AsValue().Get<ui32>()); + } + + + rightKeyColumns.reserve(rightKeyColumnsNode->GetValuesCount()); + for (ui32 i = 0; i < rightKeyColumnsNode->GetValuesCount(); ++i) { + rightKeyColumns.emplace_back(AS_VALUE(TDataLiteral, rightKeyColumnsNode->GetValue(i))->AsValue().Get<ui32>()); + } + + MKQL_ENSURE(leftKeyColumns.size() == rightKeyColumns.size(), "Number of key columns for self join should be equal"); + + MKQL_ENSURE(leftKeyColumns == rightKeyColumns, "Key columns for self join should be equal"); + + rightRenames.reserve(rightRenamesNode->GetValuesCount()); + for (ui32 i = 0; i < rightRenamesNode->GetValuesCount(); ++i) { + rightRenames.emplace_back(AS_VALUE(TDataLiteral, rightRenamesNode->GetValue(i))->AsValue().Get<ui32>()); + } + + return new TGraceJoinWrapper( + ctx.Mutables, flowLeft, nullptr, GetJoinKind(rawJoinKind), anyJoinSettings, + std::move(leftKeyColumns), std::move(rightKeyColumns), std::move(leftRenames), std::move(rightRenames), + std::move(leftColumnsTypes), std::move(rightColumnsTypes), std::move(outputRepresentations), true); + } + } } diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.h b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.h index 941311e6b8..0c0772a903 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.h +++ b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.h @@ -5,6 +5,7 @@ namespace NKikimr { namespace NMiniKQL { IComputationNode* WrapGraceJoin(TCallable& callable, const TComputationNodeFactoryContext& ctx); +IComputationNode* WrapSelfJoin(TCallable& callable, const TComputationNodeFactoryContext& ctx); } } diff --git a/ydb/library/yql/minikql/mkql_program_builder.cpp b/ydb/library/yql/minikql/mkql_program_builder.cpp index c64b5b7dbd..e85212138c 100644 --- a/ydb/library/yql/minikql/mkql_program_builder.cpp +++ b/ydb/library/yql/minikql/mkql_program_builder.cpp @@ -2023,6 +2023,44 @@ TRuntimeNode TProgramBuilder::GraceJoin(TRuntimeNode flowLeft, TRuntimeNode flow } +TRuntimeNode TProgramBuilder::SelfJoin(TRuntimeNode flowLeft, EJoinKind joinKind, const TArrayRef<const ui32>& leftKeyColumns, const TArrayRef<const ui32>& rightKeyColumns, + const TArrayRef<const ui32>& leftRenames, const TArrayRef<const ui32>& rightRenames, TType* returnType, EAnyJoinSettings anyJoinSettings ) { + + if constexpr (RuntimeVersion < 40U) { + THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__; + } + + MKQL_ENSURE(!leftKeyColumns.empty(), "At least one key column must be specified"); + + + TRuntimeNode::TList leftKeyColumnsNodes, rightKeyColumnsNodes, leftRenamesNodes, rightRenamesNodes; + + leftKeyColumnsNodes.reserve(leftKeyColumns.size()); + std::transform(leftKeyColumns.cbegin(), leftKeyColumns.cend(), std::back_inserter(leftKeyColumnsNodes), [this](const ui32 idx) { return NewDataLiteral(idx); }); + + rightKeyColumnsNodes.reserve(rightKeyColumns.size()); + std::transform(rightKeyColumns.cbegin(), rightKeyColumns.cend(), std::back_inserter(rightKeyColumnsNodes), [this](const ui32 idx) { return NewDataLiteral(idx); }); + + leftRenamesNodes.reserve(leftRenames.size()); + std::transform(leftRenames.cbegin(), leftRenames.cend(), std::back_inserter(leftRenamesNodes), [this](const ui32 idx) { return NewDataLiteral(idx); }); + + rightRenamesNodes.reserve(rightRenames.size()); + std::transform(rightRenames.cbegin(), rightRenames.cend(), std::back_inserter(rightRenamesNodes), [this](const ui32 idx) { return NewDataLiteral(idx); }); + + + TCallableBuilder callableBuilder(Env, __func__, returnType); + callableBuilder.Add(flowLeft); + callableBuilder.Add(NewDataLiteral((ui32)joinKind)); + callableBuilder.Add(NewTuple(leftKeyColumnsNodes)); + callableBuilder.Add(NewTuple(rightKeyColumnsNodes)); + callableBuilder.Add(NewTuple(leftRenamesNodes)); + callableBuilder.Add(NewTuple(rightRenamesNodes)); + callableBuilder.Add(NewDataLiteral((ui32)anyJoinSettings)); + + return TRuntimeNode(callableBuilder.Build(), false); + +} + TRuntimeNode TProgramBuilder::ToSortedDict(TRuntimeNode list, bool all, const TUnaryLambda& keySelector, const TUnaryLambda& payloadSelector, bool isCompact, ui64 itemsCountHint) { diff --git a/ydb/library/yql/minikql/mkql_program_builder.h b/ydb/library/yql/minikql/mkql_program_builder.h index 01d7cdd85c..c6f2806398 100644 --- a/ydb/library/yql/minikql/mkql_program_builder.h +++ b/ydb/library/yql/minikql/mkql_program_builder.h @@ -461,6 +461,8 @@ public: TRuntimeNode GraceJoin(TRuntimeNode flowLeft, TRuntimeNode flowRight, EJoinKind joinKind, const TArrayRef<const ui32>& leftKeyColumns, const TArrayRef<const ui32>& rightKeyColumns, const TArrayRef<const ui32>& leftRenames, const TArrayRef<const ui32>& rightRenames, TType* returnType, EAnyJoinSettings anyJoinSettings = EAnyJoinSettings::None); + TRuntimeNode SelfJoin(TRuntimeNode flowLeft, EJoinKind joinKind, const TArrayRef<const ui32>& leftKeyColumns, const TArrayRef<const ui32>& rightKeyColumns, + const TArrayRef<const ui32>& leftRenames, const TArrayRef<const ui32>& rightRenames, TType* returnType, EAnyJoinSettings anyJoinSettings = EAnyJoinSettings::None); TRuntimeNode CombineCore(TRuntimeNode stream, const TUnaryLambda& keyExtractor, const TBinaryLambda& init, diff --git a/ydb/library/yql/minikql/mkql_runtime_version.h b/ydb/library/yql/minikql/mkql_runtime_version.h index 7ad57a995a..d1106f6e53 100644 --- a/ydb/library/yql/minikql/mkql_runtime_version.h +++ b/ydb/library/yql/minikql/mkql_runtime_version.h @@ -24,7 +24,7 @@ namespace NMiniKQL { // 1. Bump this version every time incompatible runtime nodes are introduced. // 2. Make sure you provide runtime node generation for previous runtime versions. #ifndef MKQL_RUNTIME_VERSION -#define MKQL_RUNTIME_VERSION 39U +#define MKQL_RUNTIME_VERSION 40U #endif // History: |