aboutsummaryrefslogtreecommitdiffstats
path: root/yql/essentials/minikql/mkql_program_builder.cpp
diff options
context:
space:
mode:
authorAlexander Smirnov <alex@ydb.tech>2025-02-18 13:35:23 +0000
committerAlexander Smirnov <alex@ydb.tech>2025-02-18 13:35:23 +0000
commitd1f5b91da822b27faad83d50ecfdd2830a1be93e (patch)
tree78df3bf535cb8a5451afa402c51cb3f8d11b4d06 /yql/essentials/minikql/mkql_program_builder.cpp
parent22bc9b81495143d67a93bf58c936c5d5a65c8e8e (diff)
parenta2f16dc9eb108ecf11938c7c4275d701a3635bb7 (diff)
downloadydb-d1f5b91da822b27faad83d50ecfdd2830a1be93e.tar.gz
Merge pull request #14716 from ydb-platform/merge-libs-250218-0050
Diffstat (limited to 'yql/essentials/minikql/mkql_program_builder.cpp')
-rw-r--r--yql/essentials/minikql/mkql_program_builder.cpp80
1 files changed, 69 insertions, 11 deletions
diff --git a/yql/essentials/minikql/mkql_program_builder.cpp b/yql/essentials/minikql/mkql_program_builder.cpp
index 717eb9ec32..b139c24058 100644
--- a/yql/essentials/minikql/mkql_program_builder.cpp
+++ b/yql/essentials/minikql/mkql_program_builder.cpp
@@ -321,6 +321,11 @@ void EnsureDataOrOptionalOfData(TRuntimeNode node) {
->GetItemType()->IsData(), "Expected data or optional of data");
}
+std::vector<TType*> ValidateBlockType(const TType* type, bool unwrap) {
+ const auto wideComponents = AS_TYPE(TMultiType, type)->GetElements();
+ return ValidateBlockItems(wideComponents, unwrap);
+}
+
std::vector<TType*> ValidateBlockStreamType(const TType* streamType, bool unwrap) {
const auto wideComponents = GetWideComponents(AS_TYPE(TStreamType, streamType));
return ValidateBlockItems(wideComponents, unwrap);
@@ -6009,29 +6014,82 @@ TRuntimeNode TProgramBuilder::ScalarApply(const TArrayRef<const TRuntimeNode>& a
return TRuntimeNode(builder.Build(), false);
}
-TRuntimeNode TProgramBuilder::BlockMapJoinCore(TRuntimeNode leftStream, TRuntimeNode rightStream, EJoinKind joinKind,
+TRuntimeNode TProgramBuilder::BlockStorage(TRuntimeNode stream, TType* returnType) {
+ if constexpr (RuntimeVersion < 59U) {
+ THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;
+ }
+
+ ValidateBlockStreamType(stream.GetStaticType());
+
+ MKQL_ENSURE(returnType->IsResource(), "Expected Resource as a result type");
+ auto returnResourceType = AS_TYPE(TResourceType, returnType);
+ MKQL_ENSURE(returnResourceType->GetTag().StartsWith(BlockStorageResourcePrefix), "Expected block storage resource");
+
+ TCallableBuilder callableBuilder(Env, __func__, returnType);
+ callableBuilder.Add(stream);
+
+ return TRuntimeNode(callableBuilder.Build(), false);
+}
+
+TRuntimeNode TProgramBuilder::BlockMapJoinIndex(TRuntimeNode blockStorage, TType* streamItemType, const TArrayRef<const ui32>& keyColumns, bool any, TType* returnType) {
+ if constexpr (RuntimeVersion < 59U) {
+ THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;
+ }
+
+ MKQL_ENSURE(blockStorage.GetStaticType()->IsResource(), "Expected Resource as an input type");
+ auto blockStorageType = AS_TYPE(TResourceType, blockStorage.GetStaticType());
+ MKQL_ENSURE(blockStorageType->GetTag().StartsWith(BlockStorageResourcePrefix), "Expected block storage resource");
+
+ ValidateBlockType(streamItemType);
+
+ MKQL_ENSURE(returnType->IsResource(), "Expected Resource as a result type");
+ auto returnResourceType = AS_TYPE(TResourceType, returnType);
+ MKQL_ENSURE(returnResourceType->GetTag().StartsWith(BlockMapJoinIndexResourcePrefix), "Expected block map join index resource");
+
+ TRuntimeNode::TList keyColumnsNodes;
+ keyColumnsNodes.reserve(keyColumns.size());
+ std::transform(keyColumns.cbegin(), keyColumns.cend(),
+ std::back_inserter(keyColumnsNodes), [this](const ui32 idx) {
+ return NewDataLiteral(idx);
+ });
+
+ TCallableBuilder callableBuilder(Env, __func__, returnType);
+ callableBuilder.Add(blockStorage);
+ callableBuilder.Add(TRuntimeNode(streamItemType, true));
+ callableBuilder.Add(NewTuple(keyColumnsNodes));
+ callableBuilder.Add(NewDataLiteral((bool)any));
+
+ return TRuntimeNode(callableBuilder.Build(), false);
+}
+
+TRuntimeNode TProgramBuilder::BlockMapJoinCore(TRuntimeNode leftStream, TRuntimeNode rightBlockStorage, TType* rightStreamItemType, EJoinKind joinKind,
const TArrayRef<const ui32>& leftKeyColumns, const TArrayRef<const ui32>& leftKeyDrops,
- const TArrayRef<const ui32>& rightKeyColumns, const TArrayRef<const ui32>& rightKeyDrops, bool rightAny, TType* returnType
+ const TArrayRef<const ui32>& rightKeyColumns, const TArrayRef<const ui32>& rightKeyDrops, TType* returnType
) {
- if constexpr (RuntimeVersion < 53U) {
+ if constexpr (RuntimeVersion < 59U) {
THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;
}
- if (RuntimeVersion < 57U && joinKind == EJoinKind::Cross) {
- THROW yexception() << __func__ << " does not support cross join in runtime version (" << RuntimeVersion << ")";
+
+ MKQL_ENSURE(rightBlockStorage.GetStaticType()->IsResource(), "Expected Resource as an input type");
+ auto rightBlockStorageType = AS_TYPE(TResourceType, rightBlockStorage.GetStaticType());
+ if (joinKind != EJoinKind::Cross) {
+ MKQL_ENSURE(rightBlockStorageType->GetTag().StartsWith(BlockMapJoinIndexResourcePrefix), "Expected block map join index resource");
+ } else {
+ MKQL_ENSURE(rightBlockStorageType->GetTag().StartsWith(BlockStorageResourcePrefix), "Expected block storage resource");
}
MKQL_ENSURE(joinKind == EJoinKind::Inner || joinKind == EJoinKind::Left ||
joinKind == EJoinKind::LeftSemi || joinKind == EJoinKind::LeftOnly || joinKind == EJoinKind::Cross,
"Unsupported join kind");
MKQL_ENSURE(leftKeyColumns.size() == rightKeyColumns.size(), "Key column count mismatch");
- if (joinKind == EJoinKind::Cross) {
- MKQL_ENSURE(leftKeyColumns.empty(), "Specifying key columns is not allowed for cross join");
- } else {
+ if (joinKind != EJoinKind::Cross) {
MKQL_ENSURE(!leftKeyColumns.empty(), "At least one key column must be specified");
+ } else {
+ MKQL_ENSURE(leftKeyColumns.empty(), "Specifying key columns is not allowed for cross join");
}
ValidateBlockStreamType(leftStream.GetStaticType());
- ValidateBlockStreamType(rightStream.GetStaticType());
+ ValidateBlockType(rightStreamItemType);
ValidateBlockStreamType(returnType);
TRuntimeNode::TList leftKeyColumnsNodes;
@@ -6064,13 +6122,13 @@ TRuntimeNode TProgramBuilder::BlockMapJoinCore(TRuntimeNode leftStream, TRuntime
TCallableBuilder callableBuilder(Env, __func__, returnType);
callableBuilder.Add(leftStream);
- callableBuilder.Add(rightStream);
+ callableBuilder.Add(rightBlockStorage);
+ callableBuilder.Add(TRuntimeNode(rightStreamItemType, true));
callableBuilder.Add(NewDataLiteral((ui32)joinKind));
callableBuilder.Add(NewTuple(leftKeyColumnsNodes));
callableBuilder.Add(NewTuple(leftKeyDropsNodes));
callableBuilder.Add(NewTuple(rightKeyColumnsNodes));
callableBuilder.Add(NewTuple(rightKeyDropsNodes));
- callableBuilder.Add(NewDataLiteral((bool)rightAny));
return TRuntimeNode(callableBuilder.Build(), false);
}