aboutsummaryrefslogtreecommitdiffstats
path: root/yql/essentials/minikql/mkql_program_builder.cpp
diff options
context:
space:
mode:
authoraneporada <aneporada@yandex-team.com>2024-11-12 14:35:35 +0300
committeraneporada <aneporada@yandex-team.com>2024-11-12 14:49:16 +0300
commit0f8074b32931e95bc77e99fc0cc06079449a2f03 (patch)
treeaddc4b14e22412b769270d75018927719ee8e564 /yql/essentials/minikql/mkql_program_builder.cpp
parent39b2d17a032a25ea4334f40208471552f1e3561b (diff)
downloadydb-0f8074b32931e95bc77e99fc0cc06079449a2f03.tar.gz
Merge PR #10707: Fixed: Make block combine use stream instead of flow
commit_hash:946462d1ea7e74758c7d6f86cc30cd674dc2195e
Diffstat (limited to 'yql/essentials/minikql/mkql_program_builder.cpp')
-rw-r--r--yql/essentials/minikql/mkql_program_builder.cpp104
1 files changed, 87 insertions, 17 deletions
diff --git a/yql/essentials/minikql/mkql_program_builder.cpp b/yql/essentials/minikql/mkql_program_builder.cpp
index 81f857b4be..0a4cd30828 100644
--- a/yql/essentials/minikql/mkql_program_builder.cpp
+++ b/yql/essentials/minikql/mkql_program_builder.cpp
@@ -5730,14 +5730,15 @@ TRuntimeNode TProgramBuilder::BlockBitCast(TRuntimeNode value, TType* targetType
return TRuntimeNode(builder.Build(), false);
}
-TRuntimeNode TProgramBuilder::BlockCombineAll(TRuntimeNode flow, std::optional<ui32> filterColumn,
- const TArrayRef<const TAggInfo>& aggs, TType* returnType) {
- if constexpr (RuntimeVersion < 31U) {
- THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;
- }
+TRuntimeNode TProgramBuilder::BuildBlockCombineAll(const std::string_view& callableName, TRuntimeNode input, std::optional<ui32> filterColumn,
+ const TArrayRef<const TAggInfo>& aggs, TType* returnType) {
+ const auto inputType = input.GetStaticType();
+ MKQL_ENSURE(inputType->IsStream() || inputType->IsFlow(), "Expected either stream or flow as input type");
+ MKQL_ENSURE(returnType->IsStream() || returnType->IsFlow(), "Expected either stream or flow as return type");
+
+ TCallableBuilder builder(Env, callableName, returnType);
+ builder.Add(input);
- TCallableBuilder builder(Env, __func__, returnType);
- builder.Add(flow);
if (!filterColumn) {
builder.Add(NewEmptyOptionalDataLiteral(NUdf::TDataType<ui32>::Id));
} else {
@@ -5759,14 +5760,32 @@ TRuntimeNode TProgramBuilder::BlockCombineAll(TRuntimeNode flow, std::optional<u
return TRuntimeNode(builder.Build(), false);
}
-TRuntimeNode TProgramBuilder::BlockCombineHashed(TRuntimeNode flow, std::optional<ui32> filterColumn, const TArrayRef<ui32>& keys,
+TRuntimeNode TProgramBuilder::BlockCombineAll(TRuntimeNode stream, std::optional<ui32> filterColumn,
const TArrayRef<const TAggInfo>& aggs, TType* returnType) {
if constexpr (RuntimeVersion < 31U) {
THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;
}
- TCallableBuilder builder(Env, __func__, returnType);
- builder.Add(flow);
+ MKQL_ENSURE(stream.GetStaticType()->IsStream(), "Expected stream as input type");
+ MKQL_ENSURE(returnType->IsStream(), "Expected stream as return type");
+
+ if constexpr (RuntimeVersion < 52U) {
+ const auto flowReturnType = NewFlowType(AS_TYPE(TStreamType, returnType)->GetItemType());
+ return FromFlow(BuildBlockCombineAll(__func__, ToFlow(stream), filterColumn, aggs, flowReturnType));
+ } else {
+ return BuildBlockCombineAll(__func__, stream, filterColumn, aggs, returnType);
+ }
+}
+
+TRuntimeNode TProgramBuilder::BuildBlockCombineHashed(const std::string_view& callableName, TRuntimeNode input, std::optional<ui32> filterColumn,
+ const TArrayRef<ui32>& keys, const TArrayRef<const TAggInfo>& aggs, TType* returnType) {
+ const auto inputType = input.GetStaticType();
+ MKQL_ENSURE(inputType->IsStream() || inputType->IsFlow(), "Expected either stream or flow as input type");
+ MKQL_ENSURE(returnType->IsStream() || returnType->IsFlow(), "Expected either stream or flow as return type");
+
+ TCallableBuilder builder(Env, callableName, returnType);
+ builder.Add(input);
+
if (!filterColumn) {
builder.Add(NewEmptyOptionalDataLiteral(NUdf::TDataType<ui32>::Id));
} else {
@@ -5794,14 +5813,31 @@ TRuntimeNode TProgramBuilder::BlockCombineHashed(TRuntimeNode flow, std::optiona
return TRuntimeNode(builder.Build(), false);
}
-TRuntimeNode TProgramBuilder::BlockMergeFinalizeHashed(TRuntimeNode flow, const TArrayRef<ui32>& keys,
+TRuntimeNode TProgramBuilder::BlockCombineHashed(TRuntimeNode stream, std::optional<ui32> filterColumn, const TArrayRef<ui32>& keys,
const TArrayRef<const TAggInfo>& aggs, TType* returnType) {
if constexpr (RuntimeVersion < 31U) {
THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;
}
- TCallableBuilder builder(Env, __func__, returnType);
- builder.Add(flow);
+ MKQL_ENSURE(stream.GetStaticType()->IsStream(), "Expected stream as input type");
+ MKQL_ENSURE(returnType->IsStream(), "Expected stream as return type");
+
+ if constexpr (RuntimeVersion < 52U) {
+ const auto flowReturnType = NewFlowType(AS_TYPE(TStreamType, returnType)->GetItemType());
+ return FromFlow(BuildBlockCombineHashed(__func__, ToFlow(stream), filterColumn, keys, aggs, flowReturnType));
+ } else {
+ return BuildBlockCombineHashed(__func__, stream, filterColumn, keys, aggs, returnType);
+ }
+}
+
+TRuntimeNode TProgramBuilder::BuildBlockMergeFinalizeHashed(const std::string_view& callableName, TRuntimeNode input, const TArrayRef<ui32>& keys,
+ const TArrayRef<const TAggInfo>& aggs, TType* returnType) {
+ const auto inputType = input.GetStaticType();
+ MKQL_ENSURE(inputType->IsStream() || inputType->IsFlow(), "Expected either stream or flow as input type");
+ MKQL_ENSURE(returnType->IsStream() || returnType->IsFlow(), "Expected either stream or flow as return type");
+
+ TCallableBuilder builder(Env, callableName, returnType);
+ builder.Add(input);
TVector<TRuntimeNode> keyNodes;
for (const auto& key : keys) {
@@ -5824,14 +5860,31 @@ TRuntimeNode TProgramBuilder::BlockMergeFinalizeHashed(TRuntimeNode flow, const
return TRuntimeNode(builder.Build(), false);
}
-TRuntimeNode TProgramBuilder::BlockMergeManyFinalizeHashed(TRuntimeNode flow, const TArrayRef<ui32>& keys,
- const TArrayRef<const TAggInfo>& aggs, ui32 streamIndex, const TVector<TVector<ui32>>& streams, TType* returnType) {
+TRuntimeNode TProgramBuilder::BlockMergeFinalizeHashed(TRuntimeNode stream, const TArrayRef<ui32>& keys,
+ const TArrayRef<const TAggInfo>& aggs, TType* returnType) {
if constexpr (RuntimeVersion < 31U) {
THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;
}
- TCallableBuilder builder(Env, __func__, returnType);
- builder.Add(flow);
+ MKQL_ENSURE(stream.GetStaticType()->IsStream(), "Expected stream as input type");
+ MKQL_ENSURE(returnType->IsStream(), "Expected stream as return type");
+
+ if constexpr (RuntimeVersion < 52U) {
+ const auto flowReturnType = NewFlowType(AS_TYPE(TStreamType, returnType)->GetItemType());
+ return FromFlow(BuildBlockMergeFinalizeHashed(__func__, ToFlow(stream), keys, aggs, flowReturnType));
+ } else {
+ return BuildBlockMergeFinalizeHashed(__func__, stream, keys, aggs, returnType);
+ }
+}
+
+TRuntimeNode TProgramBuilder::BuildBlockMergeManyFinalizeHashed(const std::string_view& callableName, TRuntimeNode input, const TArrayRef<ui32>& keys,
+ const TArrayRef<const TAggInfo>& aggs, ui32 streamIndex, const TVector<TVector<ui32>>& streams, TType* returnType) {
+ const auto inputType = input.GetStaticType();
+ MKQL_ENSURE(inputType->IsStream() || inputType->IsFlow(), "Expected either stream or flow as input type");
+ MKQL_ENSURE(returnType->IsStream() || returnType->IsFlow(), "Expected either stream or flow as return type");
+
+ TCallableBuilder builder(Env, callableName, returnType);
+ builder.Add(input);
TVector<TRuntimeNode> keyNodes;
for (const auto& key : keys) {
@@ -5866,6 +5919,23 @@ TRuntimeNode TProgramBuilder::BlockMergeManyFinalizeHashed(TRuntimeNode flow, co
return TRuntimeNode(builder.Build(), false);
}
+TRuntimeNode TProgramBuilder::BlockMergeManyFinalizeHashed(TRuntimeNode stream, const TArrayRef<ui32>& keys,
+ const TArrayRef<const TAggInfo>& aggs, ui32 streamIndex, const TVector<TVector<ui32>>& streams, TType* returnType) {
+ if constexpr (RuntimeVersion < 31U) {
+ THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;
+ }
+
+ MKQL_ENSURE(stream.GetStaticType()->IsStream(), "Expected stream as input type");
+ MKQL_ENSURE(returnType->IsStream(), "Expected stream as return type");
+
+ if constexpr (RuntimeVersion < 52U) {
+ const auto flowReturnType = NewFlowType(AS_TYPE(TStreamType, returnType)->GetItemType());
+ return FromFlow(BuildBlockMergeManyFinalizeHashed(__func__, ToFlow(stream), keys, aggs, streamIndex, streams, flowReturnType));
+ } else {
+ return BuildBlockMergeManyFinalizeHashed(__func__, stream, keys, aggs, streamIndex, streams, returnType);
+ }
+}
+
TRuntimeNode TProgramBuilder::ScalarApply(const TArrayRef<const TRuntimeNode>& args, const TArrayLambda& handler) {
if constexpr (RuntimeVersion < 39U) {
THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;