aboutsummaryrefslogtreecommitdiffstats
path: root/yql/essentials/minikql/mkql_program_builder.cpp
diff options
context:
space:
mode:
authorziganshinmr <ziganshinmr@yandex-team.com>2025-02-17 23:25:34 +0300
committerziganshinmr <ziganshinmr@yandex-team.com>2025-02-17 23:43:14 +0300
commit8fe93946bc369873a7ffbb3a7403463aa80e3117 (patch)
tree9881b73381be7912e07909359d277e1016b43d2c /yql/essentials/minikql/mkql_program_builder.cpp
parent3af8bdc1b9f36eea116453da1a8b456810d3038e (diff)
downloadydb-8fe93946bc369873a7ffbb3a7403463aa80e3117.tar.gz
BlockMapJoinCore refactor
* Split storage and index parts from BlockMapJoinCore computation node into separate BlockStorage and BlockIndex nodes in order to allow multiple join nodes to reuse the same block data and index for the right table where possible * Corresponding s-expressions changes commit_hash:40e39fb0b22c2f929c184963b5bd901006122c14
Diffstat (limited to 'yql/essentials/minikql/mkql_program_builder.cpp')
-rw-r--r--yql/essentials/minikql/mkql_program_builder.cpp80
1 files changed, 69 insertions, 11 deletions
diff --git a/yql/essentials/minikql/mkql_program_builder.cpp b/yql/essentials/minikql/mkql_program_builder.cpp
index 717eb9ec32..b139c24058 100644
--- a/yql/essentials/minikql/mkql_program_builder.cpp
+++ b/yql/essentials/minikql/mkql_program_builder.cpp
@@ -321,6 +321,11 @@ void EnsureDataOrOptionalOfData(TRuntimeNode node) {
->GetItemType()->IsData(), "Expected data or optional of data");
}
+std::vector<TType*> ValidateBlockType(const TType* type, bool unwrap) {
+ const auto wideComponents = AS_TYPE(TMultiType, type)->GetElements();
+ return ValidateBlockItems(wideComponents, unwrap);
+}
+
std::vector<TType*> ValidateBlockStreamType(const TType* streamType, bool unwrap) {
const auto wideComponents = GetWideComponents(AS_TYPE(TStreamType, streamType));
return ValidateBlockItems(wideComponents, unwrap);
@@ -6009,29 +6014,82 @@ TRuntimeNode TProgramBuilder::ScalarApply(const TArrayRef<const TRuntimeNode>& a
return TRuntimeNode(builder.Build(), false);
}
-TRuntimeNode TProgramBuilder::BlockMapJoinCore(TRuntimeNode leftStream, TRuntimeNode rightStream, EJoinKind joinKind,
+TRuntimeNode TProgramBuilder::BlockStorage(TRuntimeNode stream, TType* returnType) {
+ if constexpr (RuntimeVersion < 59U) {
+ THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;
+ }
+
+ ValidateBlockStreamType(stream.GetStaticType());
+
+ MKQL_ENSURE(returnType->IsResource(), "Expected Resource as a result type");
+ auto returnResourceType = AS_TYPE(TResourceType, returnType);
+ MKQL_ENSURE(returnResourceType->GetTag().StartsWith(BlockStorageResourcePrefix), "Expected block storage resource");
+
+ TCallableBuilder callableBuilder(Env, __func__, returnType);
+ callableBuilder.Add(stream);
+
+ return TRuntimeNode(callableBuilder.Build(), false);
+}
+
+TRuntimeNode TProgramBuilder::BlockMapJoinIndex(TRuntimeNode blockStorage, TType* streamItemType, const TArrayRef<const ui32>& keyColumns, bool any, TType* returnType) {
+ if constexpr (RuntimeVersion < 59U) {
+ THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;
+ }
+
+ MKQL_ENSURE(blockStorage.GetStaticType()->IsResource(), "Expected Resource as an input type");
+ auto blockStorageType = AS_TYPE(TResourceType, blockStorage.GetStaticType());
+ MKQL_ENSURE(blockStorageType->GetTag().StartsWith(BlockStorageResourcePrefix), "Expected block storage resource");
+
+ ValidateBlockType(streamItemType);
+
+ MKQL_ENSURE(returnType->IsResource(), "Expected Resource as a result type");
+ auto returnResourceType = AS_TYPE(TResourceType, returnType);
+ MKQL_ENSURE(returnResourceType->GetTag().StartsWith(BlockMapJoinIndexResourcePrefix), "Expected block map join index resource");
+
+ TRuntimeNode::TList keyColumnsNodes;
+ keyColumnsNodes.reserve(keyColumns.size());
+ std::transform(keyColumns.cbegin(), keyColumns.cend(),
+ std::back_inserter(keyColumnsNodes), [this](const ui32 idx) {
+ return NewDataLiteral(idx);
+ });
+
+ TCallableBuilder callableBuilder(Env, __func__, returnType);
+ callableBuilder.Add(blockStorage);
+ callableBuilder.Add(TRuntimeNode(streamItemType, true));
+ callableBuilder.Add(NewTuple(keyColumnsNodes));
+ callableBuilder.Add(NewDataLiteral((bool)any));
+
+ return TRuntimeNode(callableBuilder.Build(), false);
+}
+
+TRuntimeNode TProgramBuilder::BlockMapJoinCore(TRuntimeNode leftStream, TRuntimeNode rightBlockStorage, TType* rightStreamItemType, EJoinKind joinKind,
const TArrayRef<const ui32>& leftKeyColumns, const TArrayRef<const ui32>& leftKeyDrops,
- const TArrayRef<const ui32>& rightKeyColumns, const TArrayRef<const ui32>& rightKeyDrops, bool rightAny, TType* returnType
+ const TArrayRef<const ui32>& rightKeyColumns, const TArrayRef<const ui32>& rightKeyDrops, TType* returnType
) {
- if constexpr (RuntimeVersion < 53U) {
+ if constexpr (RuntimeVersion < 59U) {
THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;
}
- if (RuntimeVersion < 57U && joinKind == EJoinKind::Cross) {
- THROW yexception() << __func__ << " does not support cross join in runtime version (" << RuntimeVersion << ")";
+
+ MKQL_ENSURE(rightBlockStorage.GetStaticType()->IsResource(), "Expected Resource as an input type");
+ auto rightBlockStorageType = AS_TYPE(TResourceType, rightBlockStorage.GetStaticType());
+ if (joinKind != EJoinKind::Cross) {
+ MKQL_ENSURE(rightBlockStorageType->GetTag().StartsWith(BlockMapJoinIndexResourcePrefix), "Expected block map join index resource");
+ } else {
+ MKQL_ENSURE(rightBlockStorageType->GetTag().StartsWith(BlockStorageResourcePrefix), "Expected block storage resource");
}
MKQL_ENSURE(joinKind == EJoinKind::Inner || joinKind == EJoinKind::Left ||
joinKind == EJoinKind::LeftSemi || joinKind == EJoinKind::LeftOnly || joinKind == EJoinKind::Cross,
"Unsupported join kind");
MKQL_ENSURE(leftKeyColumns.size() == rightKeyColumns.size(), "Key column count mismatch");
- if (joinKind == EJoinKind::Cross) {
- MKQL_ENSURE(leftKeyColumns.empty(), "Specifying key columns is not allowed for cross join");
- } else {
+ if (joinKind != EJoinKind::Cross) {
MKQL_ENSURE(!leftKeyColumns.empty(), "At least one key column must be specified");
+ } else {
+ MKQL_ENSURE(leftKeyColumns.empty(), "Specifying key columns is not allowed for cross join");
}
ValidateBlockStreamType(leftStream.GetStaticType());
- ValidateBlockStreamType(rightStream.GetStaticType());
+ ValidateBlockType(rightStreamItemType);
ValidateBlockStreamType(returnType);
TRuntimeNode::TList leftKeyColumnsNodes;
@@ -6064,13 +6122,13 @@ TRuntimeNode TProgramBuilder::BlockMapJoinCore(TRuntimeNode leftStream, TRuntime
TCallableBuilder callableBuilder(Env, __func__, returnType);
callableBuilder.Add(leftStream);
- callableBuilder.Add(rightStream);
+ callableBuilder.Add(rightBlockStorage);
+ callableBuilder.Add(TRuntimeNode(rightStreamItemType, true));
callableBuilder.Add(NewDataLiteral((ui32)joinKind));
callableBuilder.Add(NewTuple(leftKeyColumnsNodes));
callableBuilder.Add(NewTuple(leftKeyDropsNodes));
callableBuilder.Add(NewTuple(rightKeyColumnsNodes));
callableBuilder.Add(NewTuple(rightKeyDropsNodes));
- callableBuilder.Add(NewDataLiteral((bool)rightAny));
return TRuntimeNode(callableBuilder.Build(), false);
}