aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authora-romanov <Anton.Romanov@ydb.tech>2023-02-01 19:49:42 +0300
committera-romanov <Anton.Romanov@ydb.tech>2023-02-01 19:49:42 +0300
commitf7c1df39a660e1a15232fe3c1a59231e1a653a67 (patch)
treec44e8fb8b31755e7e7322c8e71074fe6395835c7
parentd6f0689a827ab421a99aee1610eebc41b0cb4dab (diff)
downloadydb-f7c1df39a660e1a15232fe3c1a59231e1a653a67.tar.gz
Fix memory leak on join any.
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_join.cpp30
-rw-r--r--ydb/library/yql/minikql/comp_nodes/ut/mkql_join_ut.cpp82
2 files changed, 100 insertions, 12 deletions
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_join.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_join.cpp
index 88682498dd8..04946f18b93 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_join.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_join.cpp
@@ -762,6 +762,7 @@ public:
, List1(Self->PackerLeft.RefMutableObject(ctx, false, Self->InputLeftType), IsAnyJoinLeft(Self->AnyJoinSettings), Self->InputLeftType->GetElementsCount())
, List2(Self->PackerRight.RefMutableObject(ctx, false, Self->InputRightType), IsAnyJoinRight(Self->AnyJoinSettings), Self->InputRightType->GetElementsCount())
, Fields(GetPointers(Values))
+ , Stubs(Values.size(), nullptr)
{
Init();
}
@@ -817,12 +818,14 @@ public:
}
if (Self->SortedTableOrder && *Self->SortedTableOrder == RightIndex) {
- TLiveFetcher fetcher = [this] (TComputationContext& ctx, NUdf::TUnboxedValue* output) {
- if (const auto status = Fetcher(ctx, Fields.data()); EFetchResult::One != status)
- return status;
- std::transform(Self->LeftInputColumns.cbegin(), Self->LeftInputColumns.cend(), output, [this] (ui32 index) { return std::move(this->Values[index]); });
- return EFetchResult::One;
- };
+ auto fetcher = IsAnyJoinLeft(Self->AnyJoinSettings) ?
+ TLiveFetcher(std::bind(Fetcher, std::placeholders::_1, Stubs.data())):
+ [this] (TComputationContext& ctx, NUdf::TUnboxedValue* output) {
+ if (const auto status = Fetcher(ctx, Fields.data()); EFetchResult::One != status)
+ return status;
+ std::transform(Self->LeftInputColumns.cbegin(), Self->LeftInputColumns.cend(), output, [this] (ui32 index) { return std::move(this->Values[index]); });
+ return EFetchResult::One;
+ };
std::transform(Self->LeftInputColumns.cbegin(), Self->LeftInputColumns.cend(), Values.data(), [this] (ui32 index) { return std::move(this->Values[index]); });
List1.Live(std::move(fetcher), Values.data());
EatInput = false;
@@ -844,12 +847,14 @@ public:
}
if (Self->SortedTableOrder && *Self->SortedTableOrder == LeftIndex) {
- TLiveFetcher fetcher = [this] (TComputationContext& ctx, NUdf::TUnboxedValue* output) {
- if (const auto status = Fetcher(ctx, Fields.data()); EFetchResult::One != status)
- return status;
- std::transform(Self->RightInputColumns.cbegin(), Self->RightInputColumns.cend(), output, [this] (ui32 index) { return std::move(this->Values[index]); });
- return EFetchResult::One;
- };
+ auto fetcher = IsAnyJoinRight(Self->AnyJoinSettings) ?
+ TLiveFetcher(std::bind(Fetcher, std::placeholders::_1, Stubs.data())):
+ [this] (TComputationContext& ctx, NUdf::TUnboxedValue* output) {
+ if (const auto status = Fetcher(ctx, Fields.data()); EFetchResult::One != status)
+ return status;
+ std::transform(Self->RightInputColumns.cbegin(), Self->RightInputColumns.cend(), output, [this] (ui32 index) { return std::move(this->Values[index]); });
+ return EFetchResult::One;
+ };
std::transform(Self->RightInputColumns.cbegin(), Self->RightInputColumns.cend(), Values.data(), [this] (ui32 index) { return std::move(this->Values[index]); });
List2.Live(std::move(fetcher), Values.data());
EatInput = false;
@@ -1118,6 +1123,7 @@ public:
NUdf::TUnboxedValue* ResItems = nullptr;
const std::vector<NUdf::TUnboxedValue*> Fields;
+ const std::vector<NUdf::TUnboxedValue*> Stubs;
};
TWideCommonJoinCoreWrapper(TComputationMutables& mutables, IComputationWideFlowNode* flow, const TTupleType* inputLeftType, const TTupleType* inputRightType,
diff --git a/ydb/library/yql/minikql/comp_nodes/ut/mkql_join_ut.cpp b/ydb/library/yql/minikql/comp_nodes/ut/mkql_join_ut.cpp
index 89162e220b4..fb25e1ad1be 100644
--- a/ydb/library/yql/minikql/comp_nodes/ut/mkql_join_ut.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/ut/mkql_join_ut.cpp
@@ -240,6 +240,88 @@ Y_UNIT_TEST_SUITE(TMiniKQLCommonJoinCoreWideTest) {
UNIT_ASSERT(!iterator.Next(item));
UNIT_ASSERT(!iterator.Next(item));
}
+
+ Y_UNIT_TEST_LLVM(ExclusionOrderLeftFirstAny) {
+ TSetup<LLVM> setup;
+ TProgramBuilder& pb = *setup.PgmBuilder;
+
+ const auto indexType = pb.NewDataType(NUdf::TDataType<ui32>::Id);
+ const auto stringType = pb.NewDataType(NUdf::TDataType<NUdf::TUtf8>::Id);
+ const auto optStrType = pb.NewOptionalType(stringType);
+ const auto optionalType = pb.NewOptionalType(pb.NewDataType(NUdf::TDataType<i32>::Id));
+ const auto tupleType = pb.NewTupleType({optionalType, optStrType, optStrType, indexType});
+
+ const auto value1 = pb.NewOptional(pb.NewDataLiteral<NUdf::EDataSlot::Utf8>("very long value 1"));
+ const auto value2 = pb.NewOptional(pb.NewDataLiteral<NUdf::EDataSlot::Utf8>("very long value 2"));
+ const auto value3 = pb.NewOptional(pb.NewDataLiteral<NUdf::EDataSlot::Utf8>("very long value 3"));
+
+ const auto data1 = pb.NewTuple(tupleType, {pb.NewOptional(pb.NewDataLiteral<i32>(1)), pb.NewEmptyOptional(optStrType), value1, pb.NewDataLiteral<ui32>(1)});
+ const auto data2 = pb.NewTuple(tupleType, {pb.NewOptional(pb.NewDataLiteral<i32>(2)), pb.NewEmptyOptional(optStrType), value2, pb.NewDataLiteral<ui32>(1)});
+ const auto data3 = pb.NewTuple(tupleType, {pb.NewOptional(pb.NewDataLiteral<i32>(3)), pb.NewEmptyOptional(optStrType), value3, pb.NewDataLiteral<ui32>(1)});
+ const auto data4 = pb.NewTuple(tupleType, {pb.NewOptional(pb.NewDataLiteral<i32>(4)), pb.NewEmptyOptional(optStrType), pb.NewEmptyOptional(optStrType), pb.NewDataLiteral<ui32>(1)});
+
+ const auto list = pb.NewList(tupleType, {data1, data2, data3, data4});
+
+ const auto outputType = pb.NewFlowType(pb.NewTupleType({optStrType, optStrType}));
+
+ const auto landmine = pb.NewDataLiteral<NUdf::EDataSlot::Utf8>("ACHTUNG MINEN!");
+
+ const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.CommonJoinCore(pb.ExpandMap(pb.ToFlow(list),
+ [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U), pb.NewOptional(pb.Unwrap(pb.Nth(item, 2U), landmine, __FILE__, __LINE__, 0)), pb.Nth(item, 3U)}; }),
+ EJoinKind::Exclusion, {1U, 0U}, {2U, 1U}, {0U}, {0U}, 0ULL, {0U}, EAnyJoinSettings::Right, 3U, outputType),
+ [&](TRuntimeNode::TList items) -> TRuntimeNode { return pb.NewTuple(items); })
+ );
+
+ const auto graph = setup.BuildGraph(pgmReturn);
+ const auto iterator = graph->GetValue().GetListIterator();
+ NUdf::TUnboxedValue item;
+ UNIT_ASSERT(iterator.Next(item));
+ UNIT_ASSERT(!item.GetElement(0));
+ UNBOXED_VALUE_STR_EQUAL(item.GetElement(1), "very long value 1");
+ UNIT_ASSERT(!iterator.Next(item));
+ UNIT_ASSERT(!iterator.Next(item));
+ }
+
+ Y_UNIT_TEST_LLVM(ExclusionOrderRightFirstAny) {
+ TSetup<LLVM> setup;
+ TProgramBuilder& pb = *setup.PgmBuilder;
+
+ const auto indexType = pb.NewDataType(NUdf::TDataType<ui32>::Id);
+ const auto stringType = pb.NewDataType(NUdf::TDataType<NUdf::TUtf8>::Id);
+ const auto optStrType = pb.NewOptionalType(stringType);
+ const auto optionalType = pb.NewOptionalType(pb.NewDataType(NUdf::TDataType<i32>::Id));
+ const auto tupleType = pb.NewTupleType({optionalType, optStrType, optStrType, indexType});
+
+ const auto value1 = pb.NewOptional(pb.NewDataLiteral<NUdf::EDataSlot::Utf8>("very long value 1"));
+ const auto value2 = pb.NewOptional(pb.NewDataLiteral<NUdf::EDataSlot::Utf8>("very long value 2"));
+ const auto value3 = pb.NewOptional(pb.NewDataLiteral<NUdf::EDataSlot::Utf8>("very long value 3"));
+
+ const auto data1 = pb.NewTuple(tupleType, {pb.NewOptional(pb.NewDataLiteral<i32>(1)), value1, pb.NewEmptyOptional(optStrType), pb.NewDataLiteral<ui32>(0)});
+ const auto data2 = pb.NewTuple(tupleType, {pb.NewOptional(pb.NewDataLiteral<i32>(2)), value2, pb.NewEmptyOptional(optStrType), pb.NewDataLiteral<ui32>(0)});
+ const auto data3 = pb.NewTuple(tupleType, {pb.NewOptional(pb.NewDataLiteral<i32>(3)), value3, pb.NewEmptyOptional(optStrType), pb.NewDataLiteral<ui32>(0)});
+ const auto data4 = pb.NewTuple(tupleType, {pb.NewOptional(pb.NewDataLiteral<i32>(4)), pb.NewEmptyOptional(optStrType), pb.NewEmptyOptional(optStrType), pb.NewDataLiteral<ui32>(0)});
+
+ const auto list = pb.NewList(tupleType, {data1, data2, data3, data4});
+
+ const auto outputType = pb.NewFlowType(pb.NewTupleType({optStrType, optStrType}));
+
+ const auto landmine = pb.NewDataLiteral<NUdf::EDataSlot::Utf8>("ACHTUNG MINEN!");
+
+ const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.CommonJoinCore(pb.ExpandMap(pb.ToFlow(list),
+ [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.NewOptional(pb.Unwrap(pb.Nth(item, 1U), landmine, __FILE__, __LINE__, 0)), pb.Nth(item, 2U), pb.Nth(item, 3U)}; }),
+ EJoinKind::Exclusion, {1U, 0U}, {2U, 1U}, {0U}, {0U}, 0ULL, {1U}, EAnyJoinSettings::Left, 3U, outputType),
+ [&](TRuntimeNode::TList items) -> TRuntimeNode { return pb.NewTuple(items); })
+ );
+
+ const auto graph = setup.BuildGraph(pgmReturn);
+ const auto iterator = graph->GetValue().GetListIterator();
+ NUdf::TUnboxedValue item;
+ UNIT_ASSERT(iterator.Next(item));
+ UNIT_ASSERT(!item.GetElement(1));
+ UNBOXED_VALUE_STR_EQUAL(item.GetElement(0), "very long value 1");
+ UNIT_ASSERT(!iterator.Next(item));
+ UNIT_ASSERT(!iterator.Next(item));
+ }
}
#endif
}