diff options
author | va-kuznecov <va-kuznecov@ydb.tech> | 2022-08-03 14:31:39 +0300 |
---|---|---|
committer | va-kuznecov <va-kuznecov@ydb.tech> | 2022-08-03 14:31:39 +0300 |
commit | 048cbb77287dd13c29d62e1353e18683d4b57b9f (patch) | |
tree | 986207f98db12f277799af47986ce3a721b210ae | |
parent | 15c8b667f31654db48aa974062cc70aade3e25e3 (diff) | |
download | ydb-048cbb77287dd13c29d62e1353e18683d4b57b9f.tar.gz |
Fix datarace in NarrowFlatMap, NarrowMultiMap, *ToDict, MapJoin
fix datarace in *ToDict funcions
fix datarace in NarrowMultiMap
Fix datarace in NarrowFlatMap
10 files changed, 113 insertions, 42 deletions
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_flatmap.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_flatmap.cpp index a9d49e109d..8fb3894b04 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_flatmap.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_flatmap.cpp @@ -407,16 +407,22 @@ class TNarrowFlatMapFlowWrapper : public TStatefulFlowCodegeneratorNode<TNarrowF using TBaseComputation = TStatefulFlowCodegeneratorNode<TNarrowFlatMapFlowWrapper>; public: TNarrowFlatMapFlowWrapper(TComputationMutables& mutables, EValueRepresentation kind, IComputationWideFlowNode* flow, TComputationExternalNodePtrVector&& items, IComputationNode* output) - : TBaseComputation(mutables, flow, kind, EValueRepresentation::Embedded), Flow(flow), Items(std::move(items)), Output(output), Fields(Items.size(), nullptr) + : TBaseComputation(mutables, flow, kind, EValueRepresentation::Embedded) + , Flow(flow) + , Items(std::move(items)) + , Output(output) + , WideFieldsIndex(mutables.IncrementWideFieldsIndex(Items.size())) {} NUdf::TUnboxedValuePod DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx) const { + auto** fields = ctx.WideFields.data() + WideFieldsIndex; + if (state.IsInvalid()) { - for (auto i = 0U; i < Fields.size(); ++i) + for (auto i = 0U; i < Items.size(); ++i) if (Items[i]->GetDependencesCount() > 0U) - Fields[i] = &Items[i]->RefValue(ctx); + fields[i] = &Items[i]->RefValue(ctx); - switch (Flow->FetchValues(ctx, Fields.data())) { + switch (Flow->FetchValues(ctx, fields)) { case EFetchResult::Finish: return NUdf::TUnboxedValuePod::MakeFinish(); case EFetchResult::Yield: @@ -428,11 +434,11 @@ public: while (true) { if (auto output = Output->GetValue(ctx); output.IsFinish()) { - for (auto i = 0U; i < Fields.size(); ++i) + for (auto i = 0U; i < Items.size(); ++i) if (Items[i]->GetDependencesCount() > 0U) - Fields[i] = &Items[i]->RefValue(ctx); + fields[i] = &Items[i]->RefValue(ctx); - switch (Flow->FetchValues(ctx, Fields.data())) { + switch (Flow->FetchValues(ctx, fields)) { case EFetchResult::Finish: return NUdf::TUnboxedValuePod::MakeFinish(); case EFetchResult::Yield: @@ -514,7 +520,7 @@ private: const TComputationExternalNodePtrVector Items; IComputationNode* const Output; - mutable std::vector<NUdf::TUnboxedValue*> Fields; + const ui32 WideFieldsIndex; }; template <bool IsMultiRowPerItem, bool ResultContainerOpt> @@ -699,17 +705,23 @@ using TBaseComputation = std::conditional_t<IsMultiRowPerItem, TStatelessFlowCodegeneratorNode<TNarrowFlatMapWrapper<IsMultiRowPerItem, ResultContainerOpt>>>; public: TNarrowFlatMapWrapper(TComputationMutables& mutables, EValueRepresentation kind, IComputationWideFlowNode* flow, const TComputationExternalNodePtrVector&& items, IComputationNode* newItem) - : TBaseComputation(mutables, flow, kind), Flow(flow), Items(std::move(items)), NewItem(newItem) - , PasstroughItem(GetPasstroughtMap({NewItem}, Items).front()), Fields(Items.size(), nullptr) + : TBaseComputation(mutables, flow, kind) + , Flow(flow) + , Items(std::move(items)) + , NewItem(newItem) + , PasstroughItem(GetPasstroughtMap({NewItem}, Items).front()) + , WideFieldsIndex(mutables.IncrementWideFieldsIndex(Items.size())) {} NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const { + auto** fields = ctx.WideFields.data() + WideFieldsIndex; + while (true) { - for (auto i = 0U; i < Fields.size(); ++i) + for (auto i = 0U; i < Items.size(); ++i) if (NewItem == Items[i] || Items[i]->GetDependencesCount() > 0U) - Fields[i] = &Items[i]->RefValue(ctx); + fields[i] = &Items[i]->RefValue(ctx); - switch (const auto result = Flow->FetchValues(ctx, Fields.data())) { + switch (const auto result = Flow->FetchValues(ctx, fields)) { case EFetchResult::Finish: return NUdf::TUnboxedValuePod::MakeFinish(); case EFetchResult::Yield: @@ -885,7 +897,7 @@ private: const std::optional<size_t> PasstroughItem; - mutable std::vector<NUdf::TUnboxedValue*> Fields; + const ui32 WideFieldsIndex; }; template <bool MultiOptional> diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_multimap.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_multimap.cpp index e9e93aca67..7f49641dd5 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_multimap.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_multimap.cpp @@ -411,8 +411,12 @@ class TNarrowMultiMapWrapper : public TStatefulFlowCodegeneratorNode<TNarrowMult using TBaseComputation = TStatefulFlowCodegeneratorNode<TNarrowMultiMapWrapper>; public: TNarrowMultiMapWrapper(TComputationMutables& mutables, EValueRepresentation kind, IComputationWideFlowNode* flow, TComputationExternalNodePtrVector&& items, TComputationNodePtrVector&& newItems) - : TBaseComputation(mutables, flow, kind), Flow(flow), Items(std::move(items)), NewItems(std::move(newItems)) - , PasstroughtMap(GetPasstroughtMap(Items, NewItems)), Fields(Items.size(), nullptr) + : TBaseComputation(mutables, flow, kind) + , Flow(flow) + , Items(std::move(items)) + , NewItems(std::move(newItems)) + , PasstroughtMap(GetPasstroughtMap(Items, NewItems)) + , WideFieldsIndex(mutables.IncrementWideFieldsIndex(Items.size())) {} NUdf::TUnboxedValuePod DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx) const { @@ -421,11 +425,13 @@ public: const auto pos = state.IsInvalid() ? 0ULL : state.Get<ui64>(); if (!pos) { - for (auto i = 0U; i < Fields.size(); ++i) + auto** fields = ctx.WideFields.data() + WideFieldsIndex; + + for (auto i = 0U; i < Items.size(); ++i) if (Items[i]->GetDependencesCount() > 0U || PasstroughtMap[i]) - Fields[i] = &Items[i]->RefValue(ctx); + fields[i] = &Items[i]->RefValue(ctx); - switch (Flow->FetchValues(ctx, Fields.data())) { + switch (Flow->FetchValues(ctx, fields)) { case EFetchResult::Finish: return NUdf::TUnboxedValuePod::MakeFinish(); case EFetchResult::Yield: @@ -513,7 +519,7 @@ private: const TPasstroughtMap PasstroughtMap; - mutable std::vector<NUdf::TUnboxedValue*> Fields; + const ui32 WideFieldsIndex; }; } diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_todict.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_todict.cpp index 06133c0dfb..f743fb39cd 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_todict.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_todict.cpp @@ -1025,7 +1025,7 @@ public: , Key(key) , ItemsCountHint(itemsCountHint) , PasstroughKey(GetPasstroughtMap({Key}, Items).front()) - , Fields(Items.size(), nullptr) + , WideFieldsIndex(mutables.IncrementWideFieldsIndex(Items.size())) { GetDictionaryKeyTypes(KeyType, KeyTypes, IsTuple, Encoded, UseIHash); } @@ -1036,13 +1036,14 @@ public: } else if (!state.HasValue()) { MakeState(ctx, state); } + auto** fields = ctx.WideFields.data() + WideFieldsIndex; while (const auto statePtr = static_cast<TState*>(state.AsBoxed().Get())) { - for (auto i = 0U; i < Fields.size(); ++i) + for (auto i = 0U; i < Items.size(); ++i) if (Key == Items[i] || Items[i]->GetDependencesCount() > 0U) - Fields[i] = &Items[i]->RefValue(ctx); + fields[i] = &Items[i]->RefValue(ctx); - switch (const auto result = Flow->FetchValues(ctx, Fields.data())) { + switch (const auto result = Flow->FetchValues(ctx, fields)) { case EFetchResult::One: statePtr->Insert(Key->GetValue(ctx).Release()); continue; @@ -1191,7 +1192,7 @@ private: const std::optional<size_t> PasstroughKey; - mutable std::vector<NUdf::TUnboxedValue*> Fields; + const ui32 WideFieldsIndex; }; template <typename TMapAccumulator, bool IsStream> @@ -1544,7 +1545,7 @@ public: , ItemsCountHint(itemsCountHint) , PasstroughKey(GetPasstroughtMap({Key, Payload}, Items).front()) , PasstroughPayload(GetPasstroughtMap({Key, Payload}, Items).back()) - , Fields(Items.size(), nullptr) + , WideFieldsIndex(mutables.IncrementWideFieldsIndex(Items.size())) { GetDictionaryKeyTypes(KeyType, KeyTypes, IsTuple, Encoded, UseIHash); } @@ -1555,13 +1556,14 @@ public: } else if (!state.HasValue()) { MakeState(ctx, state); } + auto** fields = ctx.WideFields.data() + WideFieldsIndex; while (const auto statePtr = static_cast<TState*>(state.AsBoxed().Get())) { - for (auto i = 0U; i < Fields.size(); ++i) + for (auto i = 0U; i < Items.size(); ++i) if (Key == Items[i] || Payload == Items[i] || Items[i]->GetDependencesCount() > 0U) - Fields[i] = &Items[i]->RefValue(ctx); + fields[i] = &Items[i]->RefValue(ctx); - switch (const auto result = Flow->FetchValues(ctx, Fields.data())) { + switch (const auto result = Flow->FetchValues(ctx, fields)) { case EFetchResult::One: statePtr->Insert(Key->GetValue(ctx).Release(), Payload->GetValue(ctx).Release()); continue; @@ -1717,6 +1719,7 @@ private: const std::optional<size_t> PasstroughPayload; mutable std::vector<NUdf::TUnboxedValue*> Fields; + const ui32 WideFieldsIndex; }; template <typename TAccumulator> diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_wide_chain_map.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_wide_chain_map.cpp index d68fc71ac1..cd677661aa 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_wide_chain_map.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_wide_chain_map.cpp @@ -143,7 +143,6 @@ public: #endif private: EFetchResult CalculateFirst(TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const { - Y_VERIFY_DEBUG(WideFieldsIndex + Inputs.size() <= ctx.WideFields.size()); auto** fields = ctx.WideFields.data() + WideFieldsIndex; for (auto i = 0U; i < Inputs.size(); ++i) { @@ -196,7 +195,6 @@ private: } EFetchResult CalculateOther(TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const { - Y_VERIFY_DEBUG(WideFieldsIndex + Inputs.size() <= ctx.WideFields.size()); auto** fields = ctx.WideFields.data() + WideFieldsIndex; for (auto i = 0U; i < Inputs.size(); ++i) { diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_wide_chopper.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_wide_chopper.cpp index 183174128f..fa4bc5df4c 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_wide_chopper.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_wide_chopper.cpp @@ -41,7 +41,6 @@ public: } EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const { - Y_VERIFY_DEBUG(WideFieldsIndex + ItemArgs.size() <= ctx.WideFields.size()); auto** fields = ctx.WideFields.data() + WideFieldsIndex; if (state.IsInvalid()) { @@ -109,7 +108,6 @@ private: return EFetchResult::One; } - Y_VERIFY_DEBUG(WideFieldsIndex + ItemArgs.size() <= ctx.WideFields.size()); auto** fields = ctx.WideFields.data() + WideFieldsIndex; for (auto i = 0U; i < ItemArgs.size(); ++i) diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp index c5346af0a0..7f2c4fcdce 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp @@ -237,7 +237,6 @@ public: const auto initUsage = MemLimit ? ctx.HolderFactory.GetMemoryUsed() : 0ULL; - Y_VERIFY_DEBUG(WideFieldsIndex + Nodes.ItemNodes.size() <= ctx.WideFields.size()); auto **fields = ctx.WideFields.data() + WideFieldsIndex; do { @@ -623,7 +622,6 @@ public: } if (const auto ptr = static_cast<TState*>(state.AsBoxed().Get())) { - Y_VERIFY_DEBUG(WideFieldsIndex + Nodes.ItemNodes.size() <= ctx.WideFields.size()); auto **fields = ctx.WideFields.data() + WideFieldsIndex; while (EFetchResult::Finish != ptr->InputStatus) { diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_wide_condense.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_wide_condense.cpp index e466075688..b65ba68c21 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_wide_condense.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_wide_condense.cpp @@ -47,7 +47,6 @@ public: State[i]->SetValue(ctx, InitState[i]->GetValue(ctx)); } - Y_VERIFY_DEBUG(WideFieldsIndex + Items.size() <= ctx.WideFields.size()); auto** fields = ctx.WideFields.data() + WideFieldsIndex; while (true) { diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_wide_filter.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_wide_filter.cpp index 04823e0b26..a3fb6dd736 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_wide_filter.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_wide_filter.cpp @@ -23,7 +23,6 @@ protected: {} NYql::NUdf::TUnboxedValue** GetFields(TComputationContext& ctx) const { - Y_VERIFY_DEBUG(WideFieldsIndex + Items.size() <= ctx.WideFields.size()); return ctx.WideFields.data() + WideFieldsIndex; } diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_wide_map.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_wide_map.cpp index 47e7b1a5c4..32586e61a8 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_wide_map.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_wide_map.cpp @@ -94,7 +94,6 @@ public: {} EFetchResult DoCalculate(TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const { - Y_VERIFY_DEBUG(WideFieldsIndex + Items.size() <= ctx.WideFields.size()); auto** fields = ctx.WideFields.data() + WideFieldsIndex; for (auto i = 0U; i < Items.size(); ++i) @@ -188,7 +187,6 @@ public: {} NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const { - Y_VERIFY_DEBUG(WideFieldsIndex + Items.size() <= ctx.WideFields.size()); auto** fields = ctx.WideFields.data() + WideFieldsIndex; for (auto i = 0U; i < Items.size(); ++i) { diff --git a/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp b/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp index 1ff7fd10d1..4528d44c33 100644 --- a/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp +++ b/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp @@ -30,7 +30,7 @@ TComputationNodeFactory GetListTestFactory() { }; } -TRuntimeNode CreateFlow(TProgramBuilder& pb, size_t vecSize, TCallable *list = nullptr) { +TRuntimeNode CreateFlow(TProgramBuilder& pb, size_t vecSize, TCallable *list) { if (list) { return pb.ToFlow(TRuntimeNode(list, false)); } else { @@ -334,9 +334,57 @@ TRuntimeNode CreateSkip(TProgramBuilder& pb, size_t vecSize, TCallable *list = n } } +template<bool Flow> +TRuntimeNode CreateNarrowFlatMap(TProgramBuilder& pb, size_t vecSize, TCallable *list = nullptr) { + TTimer t(TString(__func__) + ": "); + auto flow = CreateFlow(pb, vecSize, list); + + return pb.NarrowFlatMap( + pb.ExpandMap(flow, + [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; } + ), + [&] (TRuntimeNode::TList item) -> TRuntimeNode { + auto x = pb.NewOptional(item.front()); + return Flow ? pb.ToFlow(x) : x; + } + ); +} + +TRuntimeNode CreateNarrowMultiMap(TProgramBuilder& pb, size_t vecSize, TCallable *list = nullptr) { + TTimer t(TString(__func__) + ": "); + auto flow = CreateFlow(pb, vecSize, list); + + return pb.NarrowMultiMap( + pb.ExpandMap(flow, + [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; } + ), + [&] (TRuntimeNode::TList item) -> TRuntimeNode::TList { + return {item.front(), item.front()}; + } + ); +} + +template<bool WithPayload> +TRuntimeNode CreateSqueezeToSortedDict(TProgramBuilder& pb, size_t vecSize, TCallable *list = nullptr) { + TTimer t(TString(__func__) + ": "); + auto flow = CreateFlow(pb, vecSize, list); + + return pb.FlatMap( + pb.NarrowSqueezeToSortedDict( + pb.ExpandMap(flow, + [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; } + ), + /*all*/ false, + /*keySelector*/ [&](TRuntimeNode::TList item) { return item.front(); }, + /*payloadSelector*/ [&](TRuntimeNode::TList ) { return WithPayload ? pb.NewDataLiteral<ui64>(0) : pb.NewVoid(); } + ), + [&] (TRuntimeNode item) { return pb.DictKeys(item); } + ); +} + Y_UNIT_TEST_SUITE(ComputationGraphDataRace) { template<class T> - void ParallelProgTest(T f, bool useLLVM, ui64 testResult) { + void ParallelProgTest(T f, bool useLLVM, ui64 testResult, size_t vecSize = 100'000) { TTimer t("total: "); const ui32 cacheSize = 10; const ui32 inFlight = 3; @@ -351,7 +399,6 @@ Y_UNIT_TEST_SUITE(ComputationGraphDataRace) { const auto listType = pb.NewListType(pb.NewDataType(NUdf::TDataType<ui64>::Id)); const auto list = TCallableBuilder(pb.GetTypeEnvironment(), "TestList", listType).Build(); - const ui32 vecSize = 100'000; auto progReturn = f(pb, vecSize, list); TExploringNodeVisitor explorer; @@ -450,8 +497,21 @@ Y_UNIT_TEST_SUITE(ComputationGraphDataRace) { Y_UNIT_TEST_QUAD(Skip, Wide, UseLLVM) { ParallelProgTest(CreateSkip<Wide>, UseLLVM, 17389067750); } + + Y_UNIT_TEST_QUAD(NarrowFlatMap, Flow, UseLLVM) { + ParallelProgTest(CreateNarrowFlatMap<Flow>, UseLLVM, 17451450000); + } + + Y_UNIT_TEST_TWIN(NarrowMultiMap, UseLLVM) { + ParallelProgTest(CreateNarrowMultiMap, UseLLVM, 17451450000ull * 2); + } + + Y_UNIT_TEST_QUAD(SqueezeToSortedDict, WithPayload, UseLLVM) { + ParallelProgTest(CreateSqueezeToSortedDict<WithPayload>, UseLLVM, 125014500, 1000); + } } + Y_UNIT_TEST_SUITE(ComputationPatternCache) { Y_UNIT_TEST(Smoke) { const ui32 cacheSize = 10; |