diff options
author | Alexander Smirnov <alex@ydb.tech> | 2025-02-18 13:35:23 +0000 |
---|---|---|
committer | Alexander Smirnov <alex@ydb.tech> | 2025-02-18 13:35:23 +0000 |
commit | d1f5b91da822b27faad83d50ecfdd2830a1be93e (patch) | |
tree | 78df3bf535cb8a5451afa402c51cb3f8d11b4d06 /yql/essentials/minikql/mkql_program_builder.cpp | |
parent | 22bc9b81495143d67a93bf58c936c5d5a65c8e8e (diff) | |
parent | a2f16dc9eb108ecf11938c7c4275d701a3635bb7 (diff) | |
download | ydb-d1f5b91da822b27faad83d50ecfdd2830a1be93e.tar.gz |
Merge pull request #14716 from ydb-platform/merge-libs-250218-0050
Diffstat (limited to 'yql/essentials/minikql/mkql_program_builder.cpp')
-rw-r--r-- | yql/essentials/minikql/mkql_program_builder.cpp | 80 |
1 files changed, 69 insertions, 11 deletions
diff --git a/yql/essentials/minikql/mkql_program_builder.cpp b/yql/essentials/minikql/mkql_program_builder.cpp index 717eb9ec32..b139c24058 100644 --- a/yql/essentials/minikql/mkql_program_builder.cpp +++ b/yql/essentials/minikql/mkql_program_builder.cpp @@ -321,6 +321,11 @@ void EnsureDataOrOptionalOfData(TRuntimeNode node) { ->GetItemType()->IsData(), "Expected data or optional of data"); } +std::vector<TType*> ValidateBlockType(const TType* type, bool unwrap) { + const auto wideComponents = AS_TYPE(TMultiType, type)->GetElements(); + return ValidateBlockItems(wideComponents, unwrap); +} + std::vector<TType*> ValidateBlockStreamType(const TType* streamType, bool unwrap) { const auto wideComponents = GetWideComponents(AS_TYPE(TStreamType, streamType)); return ValidateBlockItems(wideComponents, unwrap); @@ -6009,29 +6014,82 @@ TRuntimeNode TProgramBuilder::ScalarApply(const TArrayRef<const TRuntimeNode>& a return TRuntimeNode(builder.Build(), false); } -TRuntimeNode TProgramBuilder::BlockMapJoinCore(TRuntimeNode leftStream, TRuntimeNode rightStream, EJoinKind joinKind, +TRuntimeNode TProgramBuilder::BlockStorage(TRuntimeNode stream, TType* returnType) { + if constexpr (RuntimeVersion < 59U) { + THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__; + } + + ValidateBlockStreamType(stream.GetStaticType()); + + MKQL_ENSURE(returnType->IsResource(), "Expected Resource as a result type"); + auto returnResourceType = AS_TYPE(TResourceType, returnType); + MKQL_ENSURE(returnResourceType->GetTag().StartsWith(BlockStorageResourcePrefix), "Expected block storage resource"); + + TCallableBuilder callableBuilder(Env, __func__, returnType); + callableBuilder.Add(stream); + + return TRuntimeNode(callableBuilder.Build(), false); +} + +TRuntimeNode TProgramBuilder::BlockMapJoinIndex(TRuntimeNode blockStorage, TType* streamItemType, const TArrayRef<const ui32>& keyColumns, bool any, TType* returnType) { + if constexpr (RuntimeVersion < 59U) { + THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__; + } + + MKQL_ENSURE(blockStorage.GetStaticType()->IsResource(), "Expected Resource as an input type"); + auto blockStorageType = AS_TYPE(TResourceType, blockStorage.GetStaticType()); + MKQL_ENSURE(blockStorageType->GetTag().StartsWith(BlockStorageResourcePrefix), "Expected block storage resource"); + + ValidateBlockType(streamItemType); + + MKQL_ENSURE(returnType->IsResource(), "Expected Resource as a result type"); + auto returnResourceType = AS_TYPE(TResourceType, returnType); + MKQL_ENSURE(returnResourceType->GetTag().StartsWith(BlockMapJoinIndexResourcePrefix), "Expected block map join index resource"); + + TRuntimeNode::TList keyColumnsNodes; + keyColumnsNodes.reserve(keyColumns.size()); + std::transform(keyColumns.cbegin(), keyColumns.cend(), + std::back_inserter(keyColumnsNodes), [this](const ui32 idx) { + return NewDataLiteral(idx); + }); + + TCallableBuilder callableBuilder(Env, __func__, returnType); + callableBuilder.Add(blockStorage); + callableBuilder.Add(TRuntimeNode(streamItemType, true)); + callableBuilder.Add(NewTuple(keyColumnsNodes)); + callableBuilder.Add(NewDataLiteral((bool)any)); + + return TRuntimeNode(callableBuilder.Build(), false); +} + +TRuntimeNode TProgramBuilder::BlockMapJoinCore(TRuntimeNode leftStream, TRuntimeNode rightBlockStorage, TType* rightStreamItemType, EJoinKind joinKind, const TArrayRef<const ui32>& leftKeyColumns, const TArrayRef<const ui32>& leftKeyDrops, - const TArrayRef<const ui32>& rightKeyColumns, const TArrayRef<const ui32>& rightKeyDrops, bool rightAny, TType* returnType + const TArrayRef<const ui32>& rightKeyColumns, const TArrayRef<const ui32>& rightKeyDrops, TType* returnType ) { - if constexpr (RuntimeVersion < 53U) { + if constexpr (RuntimeVersion < 59U) { THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__; } - if (RuntimeVersion < 57U && joinKind == EJoinKind::Cross) { - THROW yexception() << __func__ << " does not support cross join in runtime version (" << RuntimeVersion << ")"; + + MKQL_ENSURE(rightBlockStorage.GetStaticType()->IsResource(), "Expected Resource as an input type"); + auto rightBlockStorageType = AS_TYPE(TResourceType, rightBlockStorage.GetStaticType()); + if (joinKind != EJoinKind::Cross) { + MKQL_ENSURE(rightBlockStorageType->GetTag().StartsWith(BlockMapJoinIndexResourcePrefix), "Expected block map join index resource"); + } else { + MKQL_ENSURE(rightBlockStorageType->GetTag().StartsWith(BlockStorageResourcePrefix), "Expected block storage resource"); } MKQL_ENSURE(joinKind == EJoinKind::Inner || joinKind == EJoinKind::Left || joinKind == EJoinKind::LeftSemi || joinKind == EJoinKind::LeftOnly || joinKind == EJoinKind::Cross, "Unsupported join kind"); MKQL_ENSURE(leftKeyColumns.size() == rightKeyColumns.size(), "Key column count mismatch"); - if (joinKind == EJoinKind::Cross) { - MKQL_ENSURE(leftKeyColumns.empty(), "Specifying key columns is not allowed for cross join"); - } else { + if (joinKind != EJoinKind::Cross) { MKQL_ENSURE(!leftKeyColumns.empty(), "At least one key column must be specified"); + } else { + MKQL_ENSURE(leftKeyColumns.empty(), "Specifying key columns is not allowed for cross join"); } ValidateBlockStreamType(leftStream.GetStaticType()); - ValidateBlockStreamType(rightStream.GetStaticType()); + ValidateBlockType(rightStreamItemType); ValidateBlockStreamType(returnType); TRuntimeNode::TList leftKeyColumnsNodes; @@ -6064,13 +6122,13 @@ TRuntimeNode TProgramBuilder::BlockMapJoinCore(TRuntimeNode leftStream, TRuntime TCallableBuilder callableBuilder(Env, __func__, returnType); callableBuilder.Add(leftStream); - callableBuilder.Add(rightStream); + callableBuilder.Add(rightBlockStorage); + callableBuilder.Add(TRuntimeNode(rightStreamItemType, true)); callableBuilder.Add(NewDataLiteral((ui32)joinKind)); callableBuilder.Add(NewTuple(leftKeyColumnsNodes)); callableBuilder.Add(NewTuple(leftKeyDropsNodes)); callableBuilder.Add(NewTuple(rightKeyColumnsNodes)); callableBuilder.Add(NewTuple(rightKeyDropsNodes)); - callableBuilder.Add(NewDataLiteral((bool)rightAny)); return TRuntimeNode(callableBuilder.Build(), false); } |