diff options
author | aneporada <aneporada@yandex-team.com> | 2024-11-12 14:35:35 +0300 |
---|---|---|
committer | aneporada <aneporada@yandex-team.com> | 2024-11-12 14:49:16 +0300 |
commit | 0f8074b32931e95bc77e99fc0cc06079449a2f03 (patch) | |
tree | addc4b14e22412b769270d75018927719ee8e564 /yql/essentials/minikql/mkql_program_builder.cpp | |
parent | 39b2d17a032a25ea4334f40208471552f1e3561b (diff) | |
download | ydb-0f8074b32931e95bc77e99fc0cc06079449a2f03.tar.gz |
Merge PR #10707: Fixed: Make block combine use stream instead of flow
commit_hash:946462d1ea7e74758c7d6f86cc30cd674dc2195e
Diffstat (limited to 'yql/essentials/minikql/mkql_program_builder.cpp')
-rw-r--r-- | yql/essentials/minikql/mkql_program_builder.cpp | 104 |
1 files changed, 87 insertions, 17 deletions
diff --git a/yql/essentials/minikql/mkql_program_builder.cpp b/yql/essentials/minikql/mkql_program_builder.cpp index 81f857b4be..0a4cd30828 100644 --- a/yql/essentials/minikql/mkql_program_builder.cpp +++ b/yql/essentials/minikql/mkql_program_builder.cpp @@ -5730,14 +5730,15 @@ TRuntimeNode TProgramBuilder::BlockBitCast(TRuntimeNode value, TType* targetType return TRuntimeNode(builder.Build(), false); } -TRuntimeNode TProgramBuilder::BlockCombineAll(TRuntimeNode flow, std::optional<ui32> filterColumn, - const TArrayRef<const TAggInfo>& aggs, TType* returnType) { - if constexpr (RuntimeVersion < 31U) { - THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__; - } +TRuntimeNode TProgramBuilder::BuildBlockCombineAll(const std::string_view& callableName, TRuntimeNode input, std::optional<ui32> filterColumn, + const TArrayRef<const TAggInfo>& aggs, TType* returnType) { + const auto inputType = input.GetStaticType(); + MKQL_ENSURE(inputType->IsStream() || inputType->IsFlow(), "Expected either stream or flow as input type"); + MKQL_ENSURE(returnType->IsStream() || returnType->IsFlow(), "Expected either stream or flow as return type"); + + TCallableBuilder builder(Env, callableName, returnType); + builder.Add(input); - TCallableBuilder builder(Env, __func__, returnType); - builder.Add(flow); if (!filterColumn) { builder.Add(NewEmptyOptionalDataLiteral(NUdf::TDataType<ui32>::Id)); } else { @@ -5759,14 +5760,32 @@ TRuntimeNode TProgramBuilder::BlockCombineAll(TRuntimeNode flow, std::optional<u return TRuntimeNode(builder.Build(), false); } -TRuntimeNode TProgramBuilder::BlockCombineHashed(TRuntimeNode flow, std::optional<ui32> filterColumn, const TArrayRef<ui32>& keys, +TRuntimeNode TProgramBuilder::BlockCombineAll(TRuntimeNode stream, std::optional<ui32> filterColumn, const TArrayRef<const TAggInfo>& aggs, TType* returnType) { if constexpr (RuntimeVersion < 31U) { THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__; } - TCallableBuilder builder(Env, __func__, returnType); - builder.Add(flow); + MKQL_ENSURE(stream.GetStaticType()->IsStream(), "Expected stream as input type"); + MKQL_ENSURE(returnType->IsStream(), "Expected stream as return type"); + + if constexpr (RuntimeVersion < 52U) { + const auto flowReturnType = NewFlowType(AS_TYPE(TStreamType, returnType)->GetItemType()); + return FromFlow(BuildBlockCombineAll(__func__, ToFlow(stream), filterColumn, aggs, flowReturnType)); + } else { + return BuildBlockCombineAll(__func__, stream, filterColumn, aggs, returnType); + } +} + +TRuntimeNode TProgramBuilder::BuildBlockCombineHashed(const std::string_view& callableName, TRuntimeNode input, std::optional<ui32> filterColumn, + const TArrayRef<ui32>& keys, const TArrayRef<const TAggInfo>& aggs, TType* returnType) { + const auto inputType = input.GetStaticType(); + MKQL_ENSURE(inputType->IsStream() || inputType->IsFlow(), "Expected either stream or flow as input type"); + MKQL_ENSURE(returnType->IsStream() || returnType->IsFlow(), "Expected either stream or flow as return type"); + + TCallableBuilder builder(Env, callableName, returnType); + builder.Add(input); + if (!filterColumn) { builder.Add(NewEmptyOptionalDataLiteral(NUdf::TDataType<ui32>::Id)); } else { @@ -5794,14 +5813,31 @@ TRuntimeNode TProgramBuilder::BlockCombineHashed(TRuntimeNode flow, std::optiona return TRuntimeNode(builder.Build(), false); } -TRuntimeNode TProgramBuilder::BlockMergeFinalizeHashed(TRuntimeNode flow, const TArrayRef<ui32>& keys, +TRuntimeNode TProgramBuilder::BlockCombineHashed(TRuntimeNode stream, std::optional<ui32> filterColumn, const TArrayRef<ui32>& keys, const TArrayRef<const TAggInfo>& aggs, TType* returnType) { if constexpr (RuntimeVersion < 31U) { THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__; } - TCallableBuilder builder(Env, __func__, returnType); - builder.Add(flow); + MKQL_ENSURE(stream.GetStaticType()->IsStream(), "Expected stream as input type"); + MKQL_ENSURE(returnType->IsStream(), "Expected stream as return type"); + + if constexpr (RuntimeVersion < 52U) { + const auto flowReturnType = NewFlowType(AS_TYPE(TStreamType, returnType)->GetItemType()); + return FromFlow(BuildBlockCombineHashed(__func__, ToFlow(stream), filterColumn, keys, aggs, flowReturnType)); + } else { + return BuildBlockCombineHashed(__func__, stream, filterColumn, keys, aggs, returnType); + } +} + +TRuntimeNode TProgramBuilder::BuildBlockMergeFinalizeHashed(const std::string_view& callableName, TRuntimeNode input, const TArrayRef<ui32>& keys, + const TArrayRef<const TAggInfo>& aggs, TType* returnType) { + const auto inputType = input.GetStaticType(); + MKQL_ENSURE(inputType->IsStream() || inputType->IsFlow(), "Expected either stream or flow as input type"); + MKQL_ENSURE(returnType->IsStream() || returnType->IsFlow(), "Expected either stream or flow as return type"); + + TCallableBuilder builder(Env, callableName, returnType); + builder.Add(input); TVector<TRuntimeNode> keyNodes; for (const auto& key : keys) { @@ -5824,14 +5860,31 @@ TRuntimeNode TProgramBuilder::BlockMergeFinalizeHashed(TRuntimeNode flow, const return TRuntimeNode(builder.Build(), false); } -TRuntimeNode TProgramBuilder::BlockMergeManyFinalizeHashed(TRuntimeNode flow, const TArrayRef<ui32>& keys, - const TArrayRef<const TAggInfo>& aggs, ui32 streamIndex, const TVector<TVector<ui32>>& streams, TType* returnType) { +TRuntimeNode TProgramBuilder::BlockMergeFinalizeHashed(TRuntimeNode stream, const TArrayRef<ui32>& keys, + const TArrayRef<const TAggInfo>& aggs, TType* returnType) { if constexpr (RuntimeVersion < 31U) { THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__; } - TCallableBuilder builder(Env, __func__, returnType); - builder.Add(flow); + MKQL_ENSURE(stream.GetStaticType()->IsStream(), "Expected stream as input type"); + MKQL_ENSURE(returnType->IsStream(), "Expected stream as return type"); + + if constexpr (RuntimeVersion < 52U) { + const auto flowReturnType = NewFlowType(AS_TYPE(TStreamType, returnType)->GetItemType()); + return FromFlow(BuildBlockMergeFinalizeHashed(__func__, ToFlow(stream), keys, aggs, flowReturnType)); + } else { + return BuildBlockMergeFinalizeHashed(__func__, stream, keys, aggs, returnType); + } +} + +TRuntimeNode TProgramBuilder::BuildBlockMergeManyFinalizeHashed(const std::string_view& callableName, TRuntimeNode input, const TArrayRef<ui32>& keys, + const TArrayRef<const TAggInfo>& aggs, ui32 streamIndex, const TVector<TVector<ui32>>& streams, TType* returnType) { + const auto inputType = input.GetStaticType(); + MKQL_ENSURE(inputType->IsStream() || inputType->IsFlow(), "Expected either stream or flow as input type"); + MKQL_ENSURE(returnType->IsStream() || returnType->IsFlow(), "Expected either stream or flow as return type"); + + TCallableBuilder builder(Env, callableName, returnType); + builder.Add(input); TVector<TRuntimeNode> keyNodes; for (const auto& key : keys) { @@ -5866,6 +5919,23 @@ TRuntimeNode TProgramBuilder::BlockMergeManyFinalizeHashed(TRuntimeNode flow, co return TRuntimeNode(builder.Build(), false); } +TRuntimeNode TProgramBuilder::BlockMergeManyFinalizeHashed(TRuntimeNode stream, const TArrayRef<ui32>& keys, + const TArrayRef<const TAggInfo>& aggs, ui32 streamIndex, const TVector<TVector<ui32>>& streams, TType* returnType) { + if constexpr (RuntimeVersion < 31U) { + THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__; + } + + MKQL_ENSURE(stream.GetStaticType()->IsStream(), "Expected stream as input type"); + MKQL_ENSURE(returnType->IsStream(), "Expected stream as return type"); + + if constexpr (RuntimeVersion < 52U) { + const auto flowReturnType = NewFlowType(AS_TYPE(TStreamType, returnType)->GetItemType()); + return FromFlow(BuildBlockMergeManyFinalizeHashed(__func__, ToFlow(stream), keys, aggs, streamIndex, streams, flowReturnType)); + } else { + return BuildBlockMergeManyFinalizeHashed(__func__, stream, keys, aggs, streamIndex, streams, returnType); + } +} + TRuntimeNode TProgramBuilder::ScalarApply(const TArrayRef<const TRuntimeNode>& args, const TArrayLambda& handler) { if constexpr (RuntimeVersion < 39U) { THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__; |