aboutsummaryrefslogtreecommitdiffstats
path: root/yql/essentials/minikql/mkql_program_builder.cpp
diff options
context:
space:
mode:
authorAlexander Smirnov <alex@ydb.tech>2025-02-18 00:51:31 +0000
committerAlexander Smirnov <alex@ydb.tech>2025-02-18 00:51:31 +0000
commitd46fe70ab3363efe215e4c7b142fb2e25e772f8e (patch)
tree1931cba78c3a24456f8f0dca6d80ba3cbc147acd /yql/essentials/minikql/mkql_program_builder.cpp
parentc25b7ee30559ef027fbc049354af1debffb6c1c6 (diff)
parent8fe93946bc369873a7ffbb3a7403463aa80e3117 (diff)
downloadydb-d46fe70ab3363efe215e4c7b142fb2e25e772f8e.tar.gz
Merge branch 'rightlib' into merge-libs-250218-0050
Diffstat (limited to 'yql/essentials/minikql/mkql_program_builder.cpp')
-rw-r--r--yql/essentials/minikql/mkql_program_builder.cpp80
1 files changed, 69 insertions, 11 deletions
diff --git a/yql/essentials/minikql/mkql_program_builder.cpp b/yql/essentials/minikql/mkql_program_builder.cpp
index 717eb9ec32..b139c24058 100644
--- a/yql/essentials/minikql/mkql_program_builder.cpp
+++ b/yql/essentials/minikql/mkql_program_builder.cpp
@@ -321,6 +321,11 @@ void EnsureDataOrOptionalOfData(TRuntimeNode node) {
->GetItemType()->IsData(), "Expected data or optional of data");
}
+std::vector<TType*> ValidateBlockType(const TType* type, bool unwrap) {
+ const auto wideComponents = AS_TYPE(TMultiType, type)->GetElements();
+ return ValidateBlockItems(wideComponents, unwrap);
+}
+
std::vector<TType*> ValidateBlockStreamType(const TType* streamType, bool unwrap) {
const auto wideComponents = GetWideComponents(AS_TYPE(TStreamType, streamType));
return ValidateBlockItems(wideComponents, unwrap);
@@ -6009,29 +6014,82 @@ TRuntimeNode TProgramBuilder::ScalarApply(const TArrayRef<const TRuntimeNode>& a
return TRuntimeNode(builder.Build(), false);
}
-TRuntimeNode TProgramBuilder::BlockMapJoinCore(TRuntimeNode leftStream, TRuntimeNode rightStream, EJoinKind joinKind,
+TRuntimeNode TProgramBuilder::BlockStorage(TRuntimeNode stream, TType* returnType) {
+ if constexpr (RuntimeVersion < 59U) {
+ THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;
+ }
+
+ ValidateBlockStreamType(stream.GetStaticType());
+
+ MKQL_ENSURE(returnType->IsResource(), "Expected Resource as a result type");
+ auto returnResourceType = AS_TYPE(TResourceType, returnType);
+ MKQL_ENSURE(returnResourceType->GetTag().StartsWith(BlockStorageResourcePrefix), "Expected block storage resource");
+
+ TCallableBuilder callableBuilder(Env, __func__, returnType);
+ callableBuilder.Add(stream);
+
+ return TRuntimeNode(callableBuilder.Build(), false);
+}
+
+TRuntimeNode TProgramBuilder::BlockMapJoinIndex(TRuntimeNode blockStorage, TType* streamItemType, const TArrayRef<const ui32>& keyColumns, bool any, TType* returnType) {
+ if constexpr (RuntimeVersion < 59U) {
+ THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;
+ }
+
+ MKQL_ENSURE(blockStorage.GetStaticType()->IsResource(), "Expected Resource as an input type");
+ auto blockStorageType = AS_TYPE(TResourceType, blockStorage.GetStaticType());
+ MKQL_ENSURE(blockStorageType->GetTag().StartsWith(BlockStorageResourcePrefix), "Expected block storage resource");
+
+ ValidateBlockType(streamItemType);
+
+ MKQL_ENSURE(returnType->IsResource(), "Expected Resource as a result type");
+ auto returnResourceType = AS_TYPE(TResourceType, returnType);
+ MKQL_ENSURE(returnResourceType->GetTag().StartsWith(BlockMapJoinIndexResourcePrefix), "Expected block map join index resource");
+
+ TRuntimeNode::TList keyColumnsNodes;
+ keyColumnsNodes.reserve(keyColumns.size());
+ std::transform(keyColumns.cbegin(), keyColumns.cend(),
+ std::back_inserter(keyColumnsNodes), [this](const ui32 idx) {
+ return NewDataLiteral(idx);
+ });
+
+ TCallableBuilder callableBuilder(Env, __func__, returnType);
+ callableBuilder.Add(blockStorage);
+ callableBuilder.Add(TRuntimeNode(streamItemType, true));
+ callableBuilder.Add(NewTuple(keyColumnsNodes));
+ callableBuilder.Add(NewDataLiteral((bool)any));
+
+ return TRuntimeNode(callableBuilder.Build(), false);
+}
+
+TRuntimeNode TProgramBuilder::BlockMapJoinCore(TRuntimeNode leftStream, TRuntimeNode rightBlockStorage, TType* rightStreamItemType, EJoinKind joinKind,
const TArrayRef<const ui32>& leftKeyColumns, const TArrayRef<const ui32>& leftKeyDrops,
- const TArrayRef<const ui32>& rightKeyColumns, const TArrayRef<const ui32>& rightKeyDrops, bool rightAny, TType* returnType
+ const TArrayRef<const ui32>& rightKeyColumns, const TArrayRef<const ui32>& rightKeyDrops, TType* returnType
) {
- if constexpr (RuntimeVersion < 53U) {
+ if constexpr (RuntimeVersion < 59U) {
THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;
}
- if (RuntimeVersion < 57U && joinKind == EJoinKind::Cross) {
- THROW yexception() << __func__ << " does not support cross join in runtime version (" << RuntimeVersion << ")";
+
+ MKQL_ENSURE(rightBlockStorage.GetStaticType()->IsResource(), "Expected Resource as an input type");
+ auto rightBlockStorageType = AS_TYPE(TResourceType, rightBlockStorage.GetStaticType());
+ if (joinKind != EJoinKind::Cross) {
+ MKQL_ENSURE(rightBlockStorageType->GetTag().StartsWith(BlockMapJoinIndexResourcePrefix), "Expected block map join index resource");
+ } else {
+ MKQL_ENSURE(rightBlockStorageType->GetTag().StartsWith(BlockStorageResourcePrefix), "Expected block storage resource");
}
MKQL_ENSURE(joinKind == EJoinKind::Inner || joinKind == EJoinKind::Left ||
joinKind == EJoinKind::LeftSemi || joinKind == EJoinKind::LeftOnly || joinKind == EJoinKind::Cross,
"Unsupported join kind");
MKQL_ENSURE(leftKeyColumns.size() == rightKeyColumns.size(), "Key column count mismatch");
- if (joinKind == EJoinKind::Cross) {
- MKQL_ENSURE(leftKeyColumns.empty(), "Specifying key columns is not allowed for cross join");
- } else {
+ if (joinKind != EJoinKind::Cross) {
MKQL_ENSURE(!leftKeyColumns.empty(), "At least one key column must be specified");
+ } else {
+ MKQL_ENSURE(leftKeyColumns.empty(), "Specifying key columns is not allowed for cross join");
}
ValidateBlockStreamType(leftStream.GetStaticType());
- ValidateBlockStreamType(rightStream.GetStaticType());
+ ValidateBlockType(rightStreamItemType);
ValidateBlockStreamType(returnType);
TRuntimeNode::TList leftKeyColumnsNodes;
@@ -6064,13 +6122,13 @@ TRuntimeNode TProgramBuilder::BlockMapJoinCore(TRuntimeNode leftStream, TRuntime
TCallableBuilder callableBuilder(Env, __func__, returnType);
callableBuilder.Add(leftStream);
- callableBuilder.Add(rightStream);
+ callableBuilder.Add(rightBlockStorage);
+ callableBuilder.Add(TRuntimeNode(rightStreamItemType, true));
callableBuilder.Add(NewDataLiteral((ui32)joinKind));
callableBuilder.Add(NewTuple(leftKeyColumnsNodes));
callableBuilder.Add(NewTuple(leftKeyDropsNodes));
callableBuilder.Add(NewTuple(rightKeyColumnsNodes));
callableBuilder.Add(NewTuple(rightKeyDropsNodes));
- callableBuilder.Add(NewDataLiteral((bool)rightAny));
return TRuntimeNode(callableBuilder.Build(), false);
}