diff options
author | a-romanov <Anton.Romanov@ydb.tech> | 2023-02-01 19:49:42 +0300 |
---|---|---|
committer | a-romanov <Anton.Romanov@ydb.tech> | 2023-02-01 19:49:42 +0300 |
commit | f7c1df39a660e1a15232fe3c1a59231e1a653a67 (patch) | |
tree | c44e8fb8b31755e7e7322c8e71074fe6395835c7 | |
parent | d6f0689a827ab421a99aee1610eebc41b0cb4dab (diff) | |
download | ydb-f7c1df39a660e1a15232fe3c1a59231e1a653a67.tar.gz |
Fix memory leak on join any.
-rw-r--r-- | ydb/library/yql/minikql/comp_nodes/mkql_join.cpp | 30 | ||||
-rw-r--r-- | ydb/library/yql/minikql/comp_nodes/ut/mkql_join_ut.cpp | 82 |
2 files changed, 100 insertions, 12 deletions
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_join.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_join.cpp index 88682498dd8..04946f18b93 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_join.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_join.cpp @@ -762,6 +762,7 @@ public: , List1(Self->PackerLeft.RefMutableObject(ctx, false, Self->InputLeftType), IsAnyJoinLeft(Self->AnyJoinSettings), Self->InputLeftType->GetElementsCount()) , List2(Self->PackerRight.RefMutableObject(ctx, false, Self->InputRightType), IsAnyJoinRight(Self->AnyJoinSettings), Self->InputRightType->GetElementsCount()) , Fields(GetPointers(Values)) + , Stubs(Values.size(), nullptr) { Init(); } @@ -817,12 +818,14 @@ public: } if (Self->SortedTableOrder && *Self->SortedTableOrder == RightIndex) { - TLiveFetcher fetcher = [this] (TComputationContext& ctx, NUdf::TUnboxedValue* output) { - if (const auto status = Fetcher(ctx, Fields.data()); EFetchResult::One != status) - return status; - std::transform(Self->LeftInputColumns.cbegin(), Self->LeftInputColumns.cend(), output, [this] (ui32 index) { return std::move(this->Values[index]); }); - return EFetchResult::One; - }; + auto fetcher = IsAnyJoinLeft(Self->AnyJoinSettings) ? + TLiveFetcher(std::bind(Fetcher, std::placeholders::_1, Stubs.data())): + [this] (TComputationContext& ctx, NUdf::TUnboxedValue* output) { + if (const auto status = Fetcher(ctx, Fields.data()); EFetchResult::One != status) + return status; + std::transform(Self->LeftInputColumns.cbegin(), Self->LeftInputColumns.cend(), output, [this] (ui32 index) { return std::move(this->Values[index]); }); + return EFetchResult::One; + }; std::transform(Self->LeftInputColumns.cbegin(), Self->LeftInputColumns.cend(), Values.data(), [this] (ui32 index) { return std::move(this->Values[index]); }); List1.Live(std::move(fetcher), Values.data()); EatInput = false; @@ -844,12 +847,14 @@ public: } if (Self->SortedTableOrder && *Self->SortedTableOrder == LeftIndex) { - TLiveFetcher fetcher = [this] (TComputationContext& ctx, NUdf::TUnboxedValue* output) { - if (const auto status = Fetcher(ctx, Fields.data()); EFetchResult::One != status) - return status; - std::transform(Self->RightInputColumns.cbegin(), Self->RightInputColumns.cend(), output, [this] (ui32 index) { return std::move(this->Values[index]); }); - return EFetchResult::One; - }; + auto fetcher = IsAnyJoinRight(Self->AnyJoinSettings) ? + TLiveFetcher(std::bind(Fetcher, std::placeholders::_1, Stubs.data())): + [this] (TComputationContext& ctx, NUdf::TUnboxedValue* output) { + if (const auto status = Fetcher(ctx, Fields.data()); EFetchResult::One != status) + return status; + std::transform(Self->RightInputColumns.cbegin(), Self->RightInputColumns.cend(), output, [this] (ui32 index) { return std::move(this->Values[index]); }); + return EFetchResult::One; + }; std::transform(Self->RightInputColumns.cbegin(), Self->RightInputColumns.cend(), Values.data(), [this] (ui32 index) { return std::move(this->Values[index]); }); List2.Live(std::move(fetcher), Values.data()); EatInput = false; @@ -1118,6 +1123,7 @@ public: NUdf::TUnboxedValue* ResItems = nullptr; const std::vector<NUdf::TUnboxedValue*> Fields; + const std::vector<NUdf::TUnboxedValue*> Stubs; }; TWideCommonJoinCoreWrapper(TComputationMutables& mutables, IComputationWideFlowNode* flow, const TTupleType* inputLeftType, const TTupleType* inputRightType, diff --git a/ydb/library/yql/minikql/comp_nodes/ut/mkql_join_ut.cpp b/ydb/library/yql/minikql/comp_nodes/ut/mkql_join_ut.cpp index 89162e220b4..fb25e1ad1be 100644 --- a/ydb/library/yql/minikql/comp_nodes/ut/mkql_join_ut.cpp +++ b/ydb/library/yql/minikql/comp_nodes/ut/mkql_join_ut.cpp @@ -240,6 +240,88 @@ Y_UNIT_TEST_SUITE(TMiniKQLCommonJoinCoreWideTest) { UNIT_ASSERT(!iterator.Next(item)); UNIT_ASSERT(!iterator.Next(item)); } + + Y_UNIT_TEST_LLVM(ExclusionOrderLeftFirstAny) { + TSetup<LLVM> setup; + TProgramBuilder& pb = *setup.PgmBuilder; + + const auto indexType = pb.NewDataType(NUdf::TDataType<ui32>::Id); + const auto stringType = pb.NewDataType(NUdf::TDataType<NUdf::TUtf8>::Id); + const auto optStrType = pb.NewOptionalType(stringType); + const auto optionalType = pb.NewOptionalType(pb.NewDataType(NUdf::TDataType<i32>::Id)); + const auto tupleType = pb.NewTupleType({optionalType, optStrType, optStrType, indexType}); + + const auto value1 = pb.NewOptional(pb.NewDataLiteral<NUdf::EDataSlot::Utf8>("very long value 1")); + const auto value2 = pb.NewOptional(pb.NewDataLiteral<NUdf::EDataSlot::Utf8>("very long value 2")); + const auto value3 = pb.NewOptional(pb.NewDataLiteral<NUdf::EDataSlot::Utf8>("very long value 3")); + + const auto data1 = pb.NewTuple(tupleType, {pb.NewOptional(pb.NewDataLiteral<i32>(1)), pb.NewEmptyOptional(optStrType), value1, pb.NewDataLiteral<ui32>(1)}); + const auto data2 = pb.NewTuple(tupleType, {pb.NewOptional(pb.NewDataLiteral<i32>(2)), pb.NewEmptyOptional(optStrType), value2, pb.NewDataLiteral<ui32>(1)}); + const auto data3 = pb.NewTuple(tupleType, {pb.NewOptional(pb.NewDataLiteral<i32>(3)), pb.NewEmptyOptional(optStrType), value3, pb.NewDataLiteral<ui32>(1)}); + const auto data4 = pb.NewTuple(tupleType, {pb.NewOptional(pb.NewDataLiteral<i32>(4)), pb.NewEmptyOptional(optStrType), pb.NewEmptyOptional(optStrType), pb.NewDataLiteral<ui32>(1)}); + + const auto list = pb.NewList(tupleType, {data1, data2, data3, data4}); + + const auto outputType = pb.NewFlowType(pb.NewTupleType({optStrType, optStrType})); + + const auto landmine = pb.NewDataLiteral<NUdf::EDataSlot::Utf8>("ACHTUNG MINEN!"); + + const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.CommonJoinCore(pb.ExpandMap(pb.ToFlow(list), + [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U), pb.NewOptional(pb.Unwrap(pb.Nth(item, 2U), landmine, __FILE__, __LINE__, 0)), pb.Nth(item, 3U)}; }), + EJoinKind::Exclusion, {1U, 0U}, {2U, 1U}, {0U}, {0U}, 0ULL, {0U}, EAnyJoinSettings::Right, 3U, outputType), + [&](TRuntimeNode::TList items) -> TRuntimeNode { return pb.NewTuple(items); }) + ); + + const auto graph = setup.BuildGraph(pgmReturn); + const auto iterator = graph->GetValue().GetListIterator(); + NUdf::TUnboxedValue item; + UNIT_ASSERT(iterator.Next(item)); + UNIT_ASSERT(!item.GetElement(0)); + UNBOXED_VALUE_STR_EQUAL(item.GetElement(1), "very long value 1"); + UNIT_ASSERT(!iterator.Next(item)); + UNIT_ASSERT(!iterator.Next(item)); + } + + Y_UNIT_TEST_LLVM(ExclusionOrderRightFirstAny) { + TSetup<LLVM> setup; + TProgramBuilder& pb = *setup.PgmBuilder; + + const auto indexType = pb.NewDataType(NUdf::TDataType<ui32>::Id); + const auto stringType = pb.NewDataType(NUdf::TDataType<NUdf::TUtf8>::Id); + const auto optStrType = pb.NewOptionalType(stringType); + const auto optionalType = pb.NewOptionalType(pb.NewDataType(NUdf::TDataType<i32>::Id)); + const auto tupleType = pb.NewTupleType({optionalType, optStrType, optStrType, indexType}); + + const auto value1 = pb.NewOptional(pb.NewDataLiteral<NUdf::EDataSlot::Utf8>("very long value 1")); + const auto value2 = pb.NewOptional(pb.NewDataLiteral<NUdf::EDataSlot::Utf8>("very long value 2")); + const auto value3 = pb.NewOptional(pb.NewDataLiteral<NUdf::EDataSlot::Utf8>("very long value 3")); + + const auto data1 = pb.NewTuple(tupleType, {pb.NewOptional(pb.NewDataLiteral<i32>(1)), value1, pb.NewEmptyOptional(optStrType), pb.NewDataLiteral<ui32>(0)}); + const auto data2 = pb.NewTuple(tupleType, {pb.NewOptional(pb.NewDataLiteral<i32>(2)), value2, pb.NewEmptyOptional(optStrType), pb.NewDataLiteral<ui32>(0)}); + const auto data3 = pb.NewTuple(tupleType, {pb.NewOptional(pb.NewDataLiteral<i32>(3)), value3, pb.NewEmptyOptional(optStrType), pb.NewDataLiteral<ui32>(0)}); + const auto data4 = pb.NewTuple(tupleType, {pb.NewOptional(pb.NewDataLiteral<i32>(4)), pb.NewEmptyOptional(optStrType), pb.NewEmptyOptional(optStrType), pb.NewDataLiteral<ui32>(0)}); + + const auto list = pb.NewList(tupleType, {data1, data2, data3, data4}); + + const auto outputType = pb.NewFlowType(pb.NewTupleType({optStrType, optStrType})); + + const auto landmine = pb.NewDataLiteral<NUdf::EDataSlot::Utf8>("ACHTUNG MINEN!"); + + const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.CommonJoinCore(pb.ExpandMap(pb.ToFlow(list), + [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.NewOptional(pb.Unwrap(pb.Nth(item, 1U), landmine, __FILE__, __LINE__, 0)), pb.Nth(item, 2U), pb.Nth(item, 3U)}; }), + EJoinKind::Exclusion, {1U, 0U}, {2U, 1U}, {0U}, {0U}, 0ULL, {1U}, EAnyJoinSettings::Left, 3U, outputType), + [&](TRuntimeNode::TList items) -> TRuntimeNode { return pb.NewTuple(items); }) + ); + + const auto graph = setup.BuildGraph(pgmReturn); + const auto iterator = graph->GetValue().GetListIterator(); + NUdf::TUnboxedValue item; + UNIT_ASSERT(iterator.Next(item)); + UNIT_ASSERT(!item.GetElement(1)); + UNBOXED_VALUE_STR_EQUAL(item.GetElement(0), "very long value 1"); + UNIT_ASSERT(!iterator.Next(item)); + UNIT_ASSERT(!iterator.Next(item)); + } } #endif } |