diff options
author | ziganshinmr <ziganshinmr@yandex-team.com> | 2025-02-17 23:25:34 +0300 |
---|---|---|
committer | ziganshinmr <ziganshinmr@yandex-team.com> | 2025-02-17 23:43:14 +0300 |
commit | 8fe93946bc369873a7ffbb3a7403463aa80e3117 (patch) | |
tree | 9881b73381be7912e07909359d277e1016b43d2c /yql/essentials/minikql/mkql_program_builder.cpp | |
parent | 3af8bdc1b9f36eea116453da1a8b456810d3038e (diff) | |
download | ydb-8fe93946bc369873a7ffbb3a7403463aa80e3117.tar.gz |
BlockMapJoinCore refactor
* Split storage and index parts from BlockMapJoinCore computation node into separate BlockStorage and BlockIndex nodes in order to allow multiple join nodes to reuse the same block data and index for the right table where possible
* Corresponding s-expressions changes
commit_hash:40e39fb0b22c2f929c184963b5bd901006122c14
Diffstat (limited to 'yql/essentials/minikql/mkql_program_builder.cpp')
-rw-r--r-- | yql/essentials/minikql/mkql_program_builder.cpp | 80 |
1 files changed, 69 insertions, 11 deletions
diff --git a/yql/essentials/minikql/mkql_program_builder.cpp b/yql/essentials/minikql/mkql_program_builder.cpp index 717eb9ec32..b139c24058 100644 --- a/yql/essentials/minikql/mkql_program_builder.cpp +++ b/yql/essentials/minikql/mkql_program_builder.cpp @@ -321,6 +321,11 @@ void EnsureDataOrOptionalOfData(TRuntimeNode node) { ->GetItemType()->IsData(), "Expected data or optional of data"); } +std::vector<TType*> ValidateBlockType(const TType* type, bool unwrap) { + const auto wideComponents = AS_TYPE(TMultiType, type)->GetElements(); + return ValidateBlockItems(wideComponents, unwrap); +} + std::vector<TType*> ValidateBlockStreamType(const TType* streamType, bool unwrap) { const auto wideComponents = GetWideComponents(AS_TYPE(TStreamType, streamType)); return ValidateBlockItems(wideComponents, unwrap); @@ -6009,29 +6014,82 @@ TRuntimeNode TProgramBuilder::ScalarApply(const TArrayRef<const TRuntimeNode>& a return TRuntimeNode(builder.Build(), false); } -TRuntimeNode TProgramBuilder::BlockMapJoinCore(TRuntimeNode leftStream, TRuntimeNode rightStream, EJoinKind joinKind, +TRuntimeNode TProgramBuilder::BlockStorage(TRuntimeNode stream, TType* returnType) { + if constexpr (RuntimeVersion < 59U) { + THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__; + } + + ValidateBlockStreamType(stream.GetStaticType()); + + MKQL_ENSURE(returnType->IsResource(), "Expected Resource as a result type"); + auto returnResourceType = AS_TYPE(TResourceType, returnType); + MKQL_ENSURE(returnResourceType->GetTag().StartsWith(BlockStorageResourcePrefix), "Expected block storage resource"); + + TCallableBuilder callableBuilder(Env, __func__, returnType); + callableBuilder.Add(stream); + + return TRuntimeNode(callableBuilder.Build(), false); +} + +TRuntimeNode TProgramBuilder::BlockMapJoinIndex(TRuntimeNode blockStorage, TType* streamItemType, const TArrayRef<const ui32>& keyColumns, bool any, TType* returnType) { + if constexpr (RuntimeVersion < 59U) { + THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__; + } + + MKQL_ENSURE(blockStorage.GetStaticType()->IsResource(), "Expected Resource as an input type"); + auto blockStorageType = AS_TYPE(TResourceType, blockStorage.GetStaticType()); + MKQL_ENSURE(blockStorageType->GetTag().StartsWith(BlockStorageResourcePrefix), "Expected block storage resource"); + + ValidateBlockType(streamItemType); + + MKQL_ENSURE(returnType->IsResource(), "Expected Resource as a result type"); + auto returnResourceType = AS_TYPE(TResourceType, returnType); + MKQL_ENSURE(returnResourceType->GetTag().StartsWith(BlockMapJoinIndexResourcePrefix), "Expected block map join index resource"); + + TRuntimeNode::TList keyColumnsNodes; + keyColumnsNodes.reserve(keyColumns.size()); + std::transform(keyColumns.cbegin(), keyColumns.cend(), + std::back_inserter(keyColumnsNodes), [this](const ui32 idx) { + return NewDataLiteral(idx); + }); + + TCallableBuilder callableBuilder(Env, __func__, returnType); + callableBuilder.Add(blockStorage); + callableBuilder.Add(TRuntimeNode(streamItemType, true)); + callableBuilder.Add(NewTuple(keyColumnsNodes)); + callableBuilder.Add(NewDataLiteral((bool)any)); + + return TRuntimeNode(callableBuilder.Build(), false); +} + +TRuntimeNode TProgramBuilder::BlockMapJoinCore(TRuntimeNode leftStream, TRuntimeNode rightBlockStorage, TType* rightStreamItemType, EJoinKind joinKind, const TArrayRef<const ui32>& leftKeyColumns, const TArrayRef<const ui32>& leftKeyDrops, - const TArrayRef<const ui32>& rightKeyColumns, const TArrayRef<const ui32>& rightKeyDrops, bool rightAny, TType* returnType + const TArrayRef<const ui32>& rightKeyColumns, const TArrayRef<const ui32>& rightKeyDrops, TType* returnType ) { - if constexpr (RuntimeVersion < 53U) { + if constexpr (RuntimeVersion < 59U) { THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__; } - if (RuntimeVersion < 57U && joinKind == EJoinKind::Cross) { - THROW yexception() << __func__ << " does not support cross join in runtime version (" << RuntimeVersion << ")"; + + MKQL_ENSURE(rightBlockStorage.GetStaticType()->IsResource(), "Expected Resource as an input type"); + auto rightBlockStorageType = AS_TYPE(TResourceType, rightBlockStorage.GetStaticType()); + if (joinKind != EJoinKind::Cross) { + MKQL_ENSURE(rightBlockStorageType->GetTag().StartsWith(BlockMapJoinIndexResourcePrefix), "Expected block map join index resource"); + } else { + MKQL_ENSURE(rightBlockStorageType->GetTag().StartsWith(BlockStorageResourcePrefix), "Expected block storage resource"); } MKQL_ENSURE(joinKind == EJoinKind::Inner || joinKind == EJoinKind::Left || joinKind == EJoinKind::LeftSemi || joinKind == EJoinKind::LeftOnly || joinKind == EJoinKind::Cross, "Unsupported join kind"); MKQL_ENSURE(leftKeyColumns.size() == rightKeyColumns.size(), "Key column count mismatch"); - if (joinKind == EJoinKind::Cross) { - MKQL_ENSURE(leftKeyColumns.empty(), "Specifying key columns is not allowed for cross join"); - } else { + if (joinKind != EJoinKind::Cross) { MKQL_ENSURE(!leftKeyColumns.empty(), "At least one key column must be specified"); + } else { + MKQL_ENSURE(leftKeyColumns.empty(), "Specifying key columns is not allowed for cross join"); } ValidateBlockStreamType(leftStream.GetStaticType()); - ValidateBlockStreamType(rightStream.GetStaticType()); + ValidateBlockType(rightStreamItemType); ValidateBlockStreamType(returnType); TRuntimeNode::TList leftKeyColumnsNodes; @@ -6064,13 +6122,13 @@ TRuntimeNode TProgramBuilder::BlockMapJoinCore(TRuntimeNode leftStream, TRuntime TCallableBuilder callableBuilder(Env, __func__, returnType); callableBuilder.Add(leftStream); - callableBuilder.Add(rightStream); + callableBuilder.Add(rightBlockStorage); + callableBuilder.Add(TRuntimeNode(rightStreamItemType, true)); callableBuilder.Add(NewDataLiteral((ui32)joinKind)); callableBuilder.Add(NewTuple(leftKeyColumnsNodes)); callableBuilder.Add(NewTuple(leftKeyDropsNodes)); callableBuilder.Add(NewTuple(rightKeyColumnsNodes)); callableBuilder.Add(NewTuple(rightKeyDropsNodes)); - callableBuilder.Add(NewDataLiteral((bool)rightAny)); return TRuntimeNode(callableBuilder.Build(), false); } |