aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorva-kuznecov <va-kuznecov@ydb.tech>2022-08-03 14:31:39 +0300
committerva-kuznecov <va-kuznecov@ydb.tech>2022-08-03 14:31:39 +0300
commit048cbb77287dd13c29d62e1353e18683d4b57b9f (patch)
tree986207f98db12f277799af47986ce3a721b210ae
parent15c8b667f31654db48aa974062cc70aade3e25e3 (diff)
downloadydb-048cbb77287dd13c29d62e1353e18683d4b57b9f.tar.gz
Fix datarace in NarrowFlatMap, NarrowMultiMap, *ToDict, MapJoin
fix datarace in *ToDict funcions fix datarace in NarrowMultiMap Fix datarace in NarrowFlatMap
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_flatmap.cpp40
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_multimap.cpp18
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_todict.cpp21
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_wide_chain_map.cpp2
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_wide_chopper.cpp2
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp2
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_wide_condense.cpp1
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_wide_filter.cpp1
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_wide_map.cpp2
-rw-r--r--ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp66
10 files changed, 113 insertions, 42 deletions
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_flatmap.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_flatmap.cpp
index a9d49e109d..8fb3894b04 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_flatmap.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_flatmap.cpp
@@ -407,16 +407,22 @@ class TNarrowFlatMapFlowWrapper : public TStatefulFlowCodegeneratorNode<TNarrowF
using TBaseComputation = TStatefulFlowCodegeneratorNode<TNarrowFlatMapFlowWrapper>;
public:
TNarrowFlatMapFlowWrapper(TComputationMutables& mutables, EValueRepresentation kind, IComputationWideFlowNode* flow, TComputationExternalNodePtrVector&& items, IComputationNode* output)
- : TBaseComputation(mutables, flow, kind, EValueRepresentation::Embedded), Flow(flow), Items(std::move(items)), Output(output), Fields(Items.size(), nullptr)
+ : TBaseComputation(mutables, flow, kind, EValueRepresentation::Embedded)
+ , Flow(flow)
+ , Items(std::move(items))
+ , Output(output)
+ , WideFieldsIndex(mutables.IncrementWideFieldsIndex(Items.size()))
{}
NUdf::TUnboxedValuePod DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx) const {
+ auto** fields = ctx.WideFields.data() + WideFieldsIndex;
+
if (state.IsInvalid()) {
- for (auto i = 0U; i < Fields.size(); ++i)
+ for (auto i = 0U; i < Items.size(); ++i)
if (Items[i]->GetDependencesCount() > 0U)
- Fields[i] = &Items[i]->RefValue(ctx);
+ fields[i] = &Items[i]->RefValue(ctx);
- switch (Flow->FetchValues(ctx, Fields.data())) {
+ switch (Flow->FetchValues(ctx, fields)) {
case EFetchResult::Finish:
return NUdf::TUnboxedValuePod::MakeFinish();
case EFetchResult::Yield:
@@ -428,11 +434,11 @@ public:
while (true) {
if (auto output = Output->GetValue(ctx); output.IsFinish()) {
- for (auto i = 0U; i < Fields.size(); ++i)
+ for (auto i = 0U; i < Items.size(); ++i)
if (Items[i]->GetDependencesCount() > 0U)
- Fields[i] = &Items[i]->RefValue(ctx);
+ fields[i] = &Items[i]->RefValue(ctx);
- switch (Flow->FetchValues(ctx, Fields.data())) {
+ switch (Flow->FetchValues(ctx, fields)) {
case EFetchResult::Finish:
return NUdf::TUnboxedValuePod::MakeFinish();
case EFetchResult::Yield:
@@ -514,7 +520,7 @@ private:
const TComputationExternalNodePtrVector Items;
IComputationNode* const Output;
- mutable std::vector<NUdf::TUnboxedValue*> Fields;
+ const ui32 WideFieldsIndex;
};
template <bool IsMultiRowPerItem, bool ResultContainerOpt>
@@ -699,17 +705,23 @@ using TBaseComputation = std::conditional_t<IsMultiRowPerItem,
TStatelessFlowCodegeneratorNode<TNarrowFlatMapWrapper<IsMultiRowPerItem, ResultContainerOpt>>>;
public:
TNarrowFlatMapWrapper(TComputationMutables& mutables, EValueRepresentation kind, IComputationWideFlowNode* flow, const TComputationExternalNodePtrVector&& items, IComputationNode* newItem)
- : TBaseComputation(mutables, flow, kind), Flow(flow), Items(std::move(items)), NewItem(newItem)
- , PasstroughItem(GetPasstroughtMap({NewItem}, Items).front()), Fields(Items.size(), nullptr)
+ : TBaseComputation(mutables, flow, kind)
+ , Flow(flow)
+ , Items(std::move(items))
+ , NewItem(newItem)
+ , PasstroughItem(GetPasstroughtMap({NewItem}, Items).front())
+ , WideFieldsIndex(mutables.IncrementWideFieldsIndex(Items.size()))
{}
NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
+ auto** fields = ctx.WideFields.data() + WideFieldsIndex;
+
while (true) {
- for (auto i = 0U; i < Fields.size(); ++i)
+ for (auto i = 0U; i < Items.size(); ++i)
if (NewItem == Items[i] || Items[i]->GetDependencesCount() > 0U)
- Fields[i] = &Items[i]->RefValue(ctx);
+ fields[i] = &Items[i]->RefValue(ctx);
- switch (const auto result = Flow->FetchValues(ctx, Fields.data())) {
+ switch (const auto result = Flow->FetchValues(ctx, fields)) {
case EFetchResult::Finish:
return NUdf::TUnboxedValuePod::MakeFinish();
case EFetchResult::Yield:
@@ -885,7 +897,7 @@ private:
const std::optional<size_t> PasstroughItem;
- mutable std::vector<NUdf::TUnboxedValue*> Fields;
+ const ui32 WideFieldsIndex;
};
template <bool MultiOptional>
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_multimap.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_multimap.cpp
index e9e93aca67..7f49641dd5 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_multimap.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_multimap.cpp
@@ -411,8 +411,12 @@ class TNarrowMultiMapWrapper : public TStatefulFlowCodegeneratorNode<TNarrowMult
using TBaseComputation = TStatefulFlowCodegeneratorNode<TNarrowMultiMapWrapper>;
public:
TNarrowMultiMapWrapper(TComputationMutables& mutables, EValueRepresentation kind, IComputationWideFlowNode* flow, TComputationExternalNodePtrVector&& items, TComputationNodePtrVector&& newItems)
- : TBaseComputation(mutables, flow, kind), Flow(flow), Items(std::move(items)), NewItems(std::move(newItems))
- , PasstroughtMap(GetPasstroughtMap(Items, NewItems)), Fields(Items.size(), nullptr)
+ : TBaseComputation(mutables, flow, kind)
+ , Flow(flow)
+ , Items(std::move(items))
+ , NewItems(std::move(newItems))
+ , PasstroughtMap(GetPasstroughtMap(Items, NewItems))
+ , WideFieldsIndex(mutables.IncrementWideFieldsIndex(Items.size()))
{}
NUdf::TUnboxedValuePod DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx) const {
@@ -421,11 +425,13 @@ public:
const auto pos = state.IsInvalid() ? 0ULL : state.Get<ui64>();
if (!pos) {
- for (auto i = 0U; i < Fields.size(); ++i)
+ auto** fields = ctx.WideFields.data() + WideFieldsIndex;
+
+ for (auto i = 0U; i < Items.size(); ++i)
if (Items[i]->GetDependencesCount() > 0U || PasstroughtMap[i])
- Fields[i] = &Items[i]->RefValue(ctx);
+ fields[i] = &Items[i]->RefValue(ctx);
- switch (Flow->FetchValues(ctx, Fields.data())) {
+ switch (Flow->FetchValues(ctx, fields)) {
case EFetchResult::Finish:
return NUdf::TUnboxedValuePod::MakeFinish();
case EFetchResult::Yield:
@@ -513,7 +519,7 @@ private:
const TPasstroughtMap PasstroughtMap;
- mutable std::vector<NUdf::TUnboxedValue*> Fields;
+ const ui32 WideFieldsIndex;
};
}
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_todict.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_todict.cpp
index 06133c0dfb..f743fb39cd 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_todict.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_todict.cpp
@@ -1025,7 +1025,7 @@ public:
, Key(key)
, ItemsCountHint(itemsCountHint)
, PasstroughKey(GetPasstroughtMap({Key}, Items).front())
- , Fields(Items.size(), nullptr)
+ , WideFieldsIndex(mutables.IncrementWideFieldsIndex(Items.size()))
{
GetDictionaryKeyTypes(KeyType, KeyTypes, IsTuple, Encoded, UseIHash);
}
@@ -1036,13 +1036,14 @@ public:
} else if (!state.HasValue()) {
MakeState(ctx, state);
}
+ auto** fields = ctx.WideFields.data() + WideFieldsIndex;
while (const auto statePtr = static_cast<TState*>(state.AsBoxed().Get())) {
- for (auto i = 0U; i < Fields.size(); ++i)
+ for (auto i = 0U; i < Items.size(); ++i)
if (Key == Items[i] || Items[i]->GetDependencesCount() > 0U)
- Fields[i] = &Items[i]->RefValue(ctx);
+ fields[i] = &Items[i]->RefValue(ctx);
- switch (const auto result = Flow->FetchValues(ctx, Fields.data())) {
+ switch (const auto result = Flow->FetchValues(ctx, fields)) {
case EFetchResult::One:
statePtr->Insert(Key->GetValue(ctx).Release());
continue;
@@ -1191,7 +1192,7 @@ private:
const std::optional<size_t> PasstroughKey;
- mutable std::vector<NUdf::TUnboxedValue*> Fields;
+ const ui32 WideFieldsIndex;
};
template <typename TMapAccumulator, bool IsStream>
@@ -1544,7 +1545,7 @@ public:
, ItemsCountHint(itemsCountHint)
, PasstroughKey(GetPasstroughtMap({Key, Payload}, Items).front())
, PasstroughPayload(GetPasstroughtMap({Key, Payload}, Items).back())
- , Fields(Items.size(), nullptr)
+ , WideFieldsIndex(mutables.IncrementWideFieldsIndex(Items.size()))
{
GetDictionaryKeyTypes(KeyType, KeyTypes, IsTuple, Encoded, UseIHash);
}
@@ -1555,13 +1556,14 @@ public:
} else if (!state.HasValue()) {
MakeState(ctx, state);
}
+ auto** fields = ctx.WideFields.data() + WideFieldsIndex;
while (const auto statePtr = static_cast<TState*>(state.AsBoxed().Get())) {
- for (auto i = 0U; i < Fields.size(); ++i)
+ for (auto i = 0U; i < Items.size(); ++i)
if (Key == Items[i] || Payload == Items[i] || Items[i]->GetDependencesCount() > 0U)
- Fields[i] = &Items[i]->RefValue(ctx);
+ fields[i] = &Items[i]->RefValue(ctx);
- switch (const auto result = Flow->FetchValues(ctx, Fields.data())) {
+ switch (const auto result = Flow->FetchValues(ctx, fields)) {
case EFetchResult::One:
statePtr->Insert(Key->GetValue(ctx).Release(), Payload->GetValue(ctx).Release());
continue;
@@ -1717,6 +1719,7 @@ private:
const std::optional<size_t> PasstroughPayload;
mutable std::vector<NUdf::TUnboxedValue*> Fields;
+ const ui32 WideFieldsIndex;
};
template <typename TAccumulator>
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_wide_chain_map.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_wide_chain_map.cpp
index d68fc71ac1..cd677661aa 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_wide_chain_map.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_wide_chain_map.cpp
@@ -143,7 +143,6 @@ public:
#endif
private:
EFetchResult CalculateFirst(TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
- Y_VERIFY_DEBUG(WideFieldsIndex + Inputs.size() <= ctx.WideFields.size());
auto** fields = ctx.WideFields.data() + WideFieldsIndex;
for (auto i = 0U; i < Inputs.size(); ++i) {
@@ -196,7 +195,6 @@ private:
}
EFetchResult CalculateOther(TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
- Y_VERIFY_DEBUG(WideFieldsIndex + Inputs.size() <= ctx.WideFields.size());
auto** fields = ctx.WideFields.data() + WideFieldsIndex;
for (auto i = 0U; i < Inputs.size(); ++i) {
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_wide_chopper.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_wide_chopper.cpp
index 183174128f..fa4bc5df4c 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_wide_chopper.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_wide_chopper.cpp
@@ -41,7 +41,6 @@ public:
}
EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
- Y_VERIFY_DEBUG(WideFieldsIndex + ItemArgs.size() <= ctx.WideFields.size());
auto** fields = ctx.WideFields.data() + WideFieldsIndex;
if (state.IsInvalid()) {
@@ -109,7 +108,6 @@ private:
return EFetchResult::One;
}
- Y_VERIFY_DEBUG(WideFieldsIndex + ItemArgs.size() <= ctx.WideFields.size());
auto** fields = ctx.WideFields.data() + WideFieldsIndex;
for (auto i = 0U; i < ItemArgs.size(); ++i)
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp
index c5346af0a0..7f2c4fcdce 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp
@@ -237,7 +237,6 @@ public:
const auto initUsage = MemLimit ? ctx.HolderFactory.GetMemoryUsed() : 0ULL;
- Y_VERIFY_DEBUG(WideFieldsIndex + Nodes.ItemNodes.size() <= ctx.WideFields.size());
auto **fields = ctx.WideFields.data() + WideFieldsIndex;
do {
@@ -623,7 +622,6 @@ public:
}
if (const auto ptr = static_cast<TState*>(state.AsBoxed().Get())) {
- Y_VERIFY_DEBUG(WideFieldsIndex + Nodes.ItemNodes.size() <= ctx.WideFields.size());
auto **fields = ctx.WideFields.data() + WideFieldsIndex;
while (EFetchResult::Finish != ptr->InputStatus) {
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_wide_condense.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_wide_condense.cpp
index e466075688..b65ba68c21 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_wide_condense.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_wide_condense.cpp
@@ -47,7 +47,6 @@ public:
State[i]->SetValue(ctx, InitState[i]->GetValue(ctx));
}
- Y_VERIFY_DEBUG(WideFieldsIndex + Items.size() <= ctx.WideFields.size());
auto** fields = ctx.WideFields.data() + WideFieldsIndex;
while (true) {
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_wide_filter.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_wide_filter.cpp
index 04823e0b26..a3fb6dd736 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_wide_filter.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_wide_filter.cpp
@@ -23,7 +23,6 @@ protected:
{}
NYql::NUdf::TUnboxedValue** GetFields(TComputationContext& ctx) const {
- Y_VERIFY_DEBUG(WideFieldsIndex + Items.size() <= ctx.WideFields.size());
return ctx.WideFields.data() + WideFieldsIndex;
}
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_wide_map.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_wide_map.cpp
index 47e7b1a5c4..32586e61a8 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_wide_map.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_wide_map.cpp
@@ -94,7 +94,6 @@ public:
{}
EFetchResult DoCalculate(TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
- Y_VERIFY_DEBUG(WideFieldsIndex + Items.size() <= ctx.WideFields.size());
auto** fields = ctx.WideFields.data() + WideFieldsIndex;
for (auto i = 0U; i < Items.size(); ++i)
@@ -188,7 +187,6 @@ public:
{}
NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
- Y_VERIFY_DEBUG(WideFieldsIndex + Items.size() <= ctx.WideFields.size());
auto** fields = ctx.WideFields.data() + WideFieldsIndex;
for (auto i = 0U; i < Items.size(); ++i) {
diff --git a/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp b/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp
index 1ff7fd10d1..4528d44c33 100644
--- a/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp
+++ b/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp
@@ -30,7 +30,7 @@ TComputationNodeFactory GetListTestFactory() {
};
}
-TRuntimeNode CreateFlow(TProgramBuilder& pb, size_t vecSize, TCallable *list = nullptr) {
+TRuntimeNode CreateFlow(TProgramBuilder& pb, size_t vecSize, TCallable *list) {
if (list) {
return pb.ToFlow(TRuntimeNode(list, false));
} else {
@@ -334,9 +334,57 @@ TRuntimeNode CreateSkip(TProgramBuilder& pb, size_t vecSize, TCallable *list = n
}
}
+template<bool Flow>
+TRuntimeNode CreateNarrowFlatMap(TProgramBuilder& pb, size_t vecSize, TCallable *list = nullptr) {
+ TTimer t(TString(__func__) + ": ");
+ auto flow = CreateFlow(pb, vecSize, list);
+
+ return pb.NarrowFlatMap(
+ pb.ExpandMap(flow,
+ [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; }
+ ),
+ [&] (TRuntimeNode::TList item) -> TRuntimeNode {
+ auto x = pb.NewOptional(item.front());
+ return Flow ? pb.ToFlow(x) : x;
+ }
+ );
+}
+
+TRuntimeNode CreateNarrowMultiMap(TProgramBuilder& pb, size_t vecSize, TCallable *list = nullptr) {
+ TTimer t(TString(__func__) + ": ");
+ auto flow = CreateFlow(pb, vecSize, list);
+
+ return pb.NarrowMultiMap(
+ pb.ExpandMap(flow,
+ [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; }
+ ),
+ [&] (TRuntimeNode::TList item) -> TRuntimeNode::TList {
+ return {item.front(), item.front()};
+ }
+ );
+}
+
+template<bool WithPayload>
+TRuntimeNode CreateSqueezeToSortedDict(TProgramBuilder& pb, size_t vecSize, TCallable *list = nullptr) {
+ TTimer t(TString(__func__) + ": ");
+ auto flow = CreateFlow(pb, vecSize, list);
+
+ return pb.FlatMap(
+ pb.NarrowSqueezeToSortedDict(
+ pb.ExpandMap(flow,
+ [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; }
+ ),
+ /*all*/ false,
+ /*keySelector*/ [&](TRuntimeNode::TList item) { return item.front(); },
+ /*payloadSelector*/ [&](TRuntimeNode::TList ) { return WithPayload ? pb.NewDataLiteral<ui64>(0) : pb.NewVoid(); }
+ ),
+ [&] (TRuntimeNode item) { return pb.DictKeys(item); }
+ );
+}
+
Y_UNIT_TEST_SUITE(ComputationGraphDataRace) {
template<class T>
- void ParallelProgTest(T f, bool useLLVM, ui64 testResult) {
+ void ParallelProgTest(T f, bool useLLVM, ui64 testResult, size_t vecSize = 100'000) {
TTimer t("total: ");
const ui32 cacheSize = 10;
const ui32 inFlight = 3;
@@ -351,7 +399,6 @@ Y_UNIT_TEST_SUITE(ComputationGraphDataRace) {
const auto listType = pb.NewListType(pb.NewDataType(NUdf::TDataType<ui64>::Id));
const auto list = TCallableBuilder(pb.GetTypeEnvironment(), "TestList", listType).Build();
- const ui32 vecSize = 100'000;
auto progReturn = f(pb, vecSize, list);
TExploringNodeVisitor explorer;
@@ -450,8 +497,21 @@ Y_UNIT_TEST_SUITE(ComputationGraphDataRace) {
Y_UNIT_TEST_QUAD(Skip, Wide, UseLLVM) {
ParallelProgTest(CreateSkip<Wide>, UseLLVM, 17389067750);
}
+
+ Y_UNIT_TEST_QUAD(NarrowFlatMap, Flow, UseLLVM) {
+ ParallelProgTest(CreateNarrowFlatMap<Flow>, UseLLVM, 17451450000);
+ }
+
+ Y_UNIT_TEST_TWIN(NarrowMultiMap, UseLLVM) {
+ ParallelProgTest(CreateNarrowMultiMap, UseLLVM, 17451450000ull * 2);
+ }
+
+ Y_UNIT_TEST_QUAD(SqueezeToSortedDict, WithPayload, UseLLVM) {
+ ParallelProgTest(CreateSqueezeToSortedDict<WithPayload>, UseLLVM, 125014500, 1000);
+ }
}
+
Y_UNIT_TEST_SUITE(ComputationPatternCache) {
Y_UNIT_TEST(Smoke) {
const ui32 cacheSize = 10;