aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorva-kuznecov <va-kuznecov@ydb.tech>2022-07-20 19:26:43 +0300
committerva-kuznecov <va-kuznecov@ydb.tech>2022-07-20 19:26:43 +0300
commitff4f4c811479c0d1ff38f2b8b950056775bcb44c (patch)
tree40a0dc16359485e7e70532cf452355cb439bf997
parent8626350a97f249321bd4b0c23d0eea4211d940d1 (diff)
downloadydb-ff4f4c811479c0d1ff38f2b8b950056775bcb44c.tar.gz
Add UT to reproduce races in computation nodes
-rw-r--r--ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp391
1 files changed, 321 insertions, 70 deletions
diff --git a/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp b/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp
index 34ae86148e2..7fcb1975499 100644
--- a/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp
+++ b/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp
@@ -44,7 +44,11 @@ TRuntimeNode CreateFlow(TProgramBuilder& pb, size_t vecSize, TCallable *list = n
}
}
-TRuntimeNode CreateFilter(TProgramBuilder& pb, size_t vecSize, TCallable *list) {
+template<bool Wide>
+TRuntimeNode CreateFilter(TProgramBuilder& pb, size_t vecSize, TCallable *list);
+
+template<>
+TRuntimeNode CreateFilter<false>(TProgramBuilder& pb, size_t vecSize, TCallable *list) {
TTimer t(TString(__func__) + ": ");
auto flow = CreateFlow(pb, vecSize, list);
@@ -56,7 +60,8 @@ TRuntimeNode CreateFilter(TProgramBuilder& pb, size_t vecSize, TCallable *list)
return pb.Filter(flow, handler);
}
-TRuntimeNode CreateWideFilter(TProgramBuilder& pb, size_t vecSize, TCallable *list = nullptr) {
+template<>
+TRuntimeNode CreateFilter<true>(TProgramBuilder& pb, size_t vecSize, TCallable *list) {
TTimer t(TString(__func__) + ": ");
auto flow = CreateFlow(pb, vecSize, list);
@@ -76,7 +81,48 @@ TRuntimeNode CreateWideFilter(TProgramBuilder& pb, size_t vecSize, TCallable *li
);
}
-TRuntimeNode CreateCondense(TProgramBuilder& pb, size_t vecSize, TCallable *list = nullptr) {
+template<bool Wide>
+TRuntimeNode CreateMap(TProgramBuilder& pb, size_t vecSize, TCallable *list = nullptr);
+
+template<>
+TRuntimeNode CreateMap<false>(TProgramBuilder& pb, size_t vecSize, TCallable *list) {
+ TTimer t(TString(__func__) + ": ");
+ auto flow = CreateFlow(pb, vecSize, list);
+
+ auto handler = [&](TRuntimeNode node) -> TRuntimeNode {
+ return pb.AggrEquals(
+ pb.Mod(node, pb.NewOptional(pb.NewDataLiteral<ui64>(128))),
+ pb.NewOptional(pb.NewDataLiteral<ui64>(0)));
+ };
+ return pb.Map(flow, handler);
+}
+
+template<>
+TRuntimeNode CreateMap<true>(TProgramBuilder& pb, size_t vecSize, TCallable *list) {
+ TTimer t(TString(__func__) + ": ");
+ auto flow = CreateFlow(pb, vecSize, list);
+
+ auto handler = [&](TRuntimeNode::TList node) -> TRuntimeNode::TList {
+ return {pb.AggrEquals(
+ pb.Mod(node.front(), pb.NewOptional(pb.NewDataLiteral<ui64>(128))),
+ pb.NewOptional(pb.NewDataLiteral<ui64>(0)))};
+ };
+ return pb.NarrowMap(
+ pb.WideMap(
+ pb.ExpandMap(flow,
+ [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; }
+ ),
+ handler
+ ),
+ [&](TRuntimeNode::TList items) -> TRuntimeNode { return items.front(); }
+ );
+}
+
+template<bool Wide>
+TRuntimeNode CreateCondense(TProgramBuilder& pb, size_t vecSize, TCallable *list = nullptr);
+
+template<>
+TRuntimeNode CreateCondense<false>(TProgramBuilder& pb, size_t vecSize, TCallable *list) {
TTimer t(TString(__func__) + ": ");
auto flow = CreateFlow(pb, vecSize, list);
@@ -90,80 +136,209 @@ TRuntimeNode CreateCondense(TProgramBuilder& pb, size_t vecSize, TCallable *list
return pb.Condense(flow, state, switcherHandler, updateHandler);
}
-/*
-TRuntimeNode CreateCombine(TProgramBuilder& pb, size_t vecSize) {
+template<>
+TRuntimeNode CreateCondense<true>(TProgramBuilder& pb, size_t vecSize, TCallable *list) {
TTimer t(TString(__func__) + ": ");
- std::vector<const TRuntimeNode> arr;
- arr.reserve(vecSize);
- for (ui64 i = 0; i < vecSize; ++i) {
- arr.push_back(pb.NewDataLiteral<ui64>((i + 124515) % 6740234));
- }
- TArrayRef<const TRuntimeNode> arrRef(std::move(arr));
- auto flow = pb.ToFlow(pb.AsList(arrRef));
+ auto flow = CreateFlow(pb, vecSize, list);
- TWideLambda keyExtractor = [&](TRuntimeNode item, TRuntimeNode state) -> TRuntimeNode {
- return pb.Add(item, state);
- };
- TBinaryWideLambda init;
- TTernaryWideLambda update;
- TBinaryWideLambda finish;
- return pb.WideLastCombiner(flow, keyExtractor , init, update, finish);
- //(flow, state, switcherHandler, updateHandler);
+ TRuntimeNode state = pb.NewDataLiteral<ui64>(0);
+ return pb.NarrowMap(
+ pb.WideCondense1(
+ /* stream */
+ pb.ExpandMap(flow,
+ [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; }
+ ),
+ /* init */
+ [&](TRuntimeNode::TList item) -> TRuntimeNode::TList { return {item}; },
+ /* switcher */
+ [&](TRuntimeNode::TList, TRuntimeNode::TList) -> TRuntimeNode { return pb.NewDataLiteral<bool>(false); },
+ /* handler */
+ [&](TRuntimeNode::TList item, TRuntimeNode::TList state) -> TRuntimeNode::TList { return {pb.Add(item.front(), state.front())}; }
+ ),
+ [&](TRuntimeNode::TList items) -> TRuntimeNode { return items.front(); }
+ );
}
-*/
-Y_UNIT_TEST_SUITE(ComputationPatternCache) {
- Y_UNIT_TEST(Smoke) {
- const ui32 cacheSize = 10;
- TComputationPatternLRUCache cache(cacheSize);
+template<bool Wide>
+TRuntimeNode CreateChopper(TProgramBuilder& pb, size_t vecSize, TCallable *list = nullptr);
- auto functionRegistry = CreateFunctionRegistry(CreateBuiltinRegistry())->Clone();
+template<>
+TRuntimeNode CreateChopper<false>(TProgramBuilder& pb, size_t vecSize, TCallable *list) {
+ TTimer t(TString(__func__) + ": ");
+ auto flow = CreateFlow(pb, vecSize, list);
- for (ui32 i = 0; i < cacheSize; ++i) {
- std::shared_ptr<TPatternWithEnv> patternEnv = cache.CreateEnv();
- TScopedAlloc& alloc = patternEnv->Alloc;
- TTypeEnvironment& typeEnv = patternEnv->Env;
+ return pb.Chopper(flow,
+ /* keyExtractor */
+ [&](TRuntimeNode item) -> TRuntimeNode { return item; },
+ /* groupSwitch */
+ [&](TRuntimeNode key, TRuntimeNode /*item*/) -> TRuntimeNode {
+ return pb.AggrEquals(
+ pb.Mod(key, pb.NewOptional(pb.NewDataLiteral<ui64>(128))),
+ pb.NewOptional(pb.NewDataLiteral<ui64>(0)));
+ },
+ /* groupHandler */
+ [&](TRuntimeNode, TRuntimeNode list) -> TRuntimeNode { return list; }
+ );
+};
- TProgramBuilder pb(typeEnv, *functionRegistry);
+template<>
+TRuntimeNode CreateChopper<true>(TProgramBuilder& pb, size_t vecSize, TCallable *list) {
+ TTimer t(TString(__func__) + ": ");
+ auto flow = CreateFlow(pb, vecSize, list);
- auto progReturn = pb.NewDataLiteral<NYql::NUdf::EDataSlot::String>("qwerty");
+ return pb.NarrowMap(
+ pb.WideChopper(
+ /* stream */
+ pb.ExpandMap(flow,
+ [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; }
+ ),
+ /* keyExtractor */
+ [&](TRuntimeNode::TList item) -> TRuntimeNode::TList { return item; },
+ /* groupSwitch */
+ [&](TRuntimeNode::TList key, TRuntimeNode::TList /*item*/) -> TRuntimeNode {
+ return pb.AggrEquals(
+ pb.Mod(key.front(), pb.NewOptional(pb.NewDataLiteral<ui64>(128))),
+ pb.NewOptional(pb.NewDataLiteral<ui64>(0)));
+ },
+ /* groupHandler */
+ [&](TRuntimeNode::TList, TRuntimeNode input) { return pb.WideMap(input, [](TRuntimeNode::TList items) { return items; }); }
+ ),
+ [&](TRuntimeNode::TList items) -> TRuntimeNode { return items.front(); }
+ );
+};
- TExploringNodeVisitor explorer;
- explorer.Walk(progReturn.GetNode(), typeEnv);
+template<bool Wide>
+TRuntimeNode CreateCombine(TProgramBuilder& pb, size_t vecSize, TCallable *list = nullptr);
- TComputationPatternOpts opts(alloc.Ref(), typeEnv, GetBuiltinFactory(),
- functionRegistry.Get(), NUdf::EValidateMode::Lazy, NUdf::EValidatePolicy::Exception,
- "OFF", EGraphPerProcess::Multi);
+template<>
+TRuntimeNode CreateCombine<false>(TProgramBuilder& pb, size_t vecSize, TCallable *list) {
+ TTimer t(TString(__func__) + ": ");
+ auto flow = CreateFlow(pb, vecSize, list);
- {
- auto guard = patternEnv->Env.BindAllocator();
- patternEnv->Pattern = MakeComputationPattern(explorer, progReturn, {}, opts);
- }
- cache.EmplacePattern(TString((char)('a' + i)), patternEnv);
- }
+ return pb.CombineCore(
+ /* stream */
+ flow,
+ /* keyExtractor */
+ [&] (TRuntimeNode /*item*/) -> TRuntimeNode { return pb.NewDataLiteral<ui64>(0);},
+ /* init */
+ [&] (TRuntimeNode /* key */, TRuntimeNode item) -> TRuntimeNode { return item; },
+ /* update */
+ [&] (TRuntimeNode /* key */, TRuntimeNode item, TRuntimeNode state) -> TRuntimeNode { return pb.Add(item, state); },
+ /* finish */
+ [&] (TRuntimeNode /* key */, TRuntimeNode item) -> TRuntimeNode { return pb.NewOptional(item); },
+ /* memlimit */
+ 64 << 20
+ );
+};
- for (ui32 i = 0; i < cacheSize; ++i) {
- auto key = TString((char)('a' + i));
+template<>
+TRuntimeNode CreateCombine<true>(TProgramBuilder& pb, size_t vecSize, TCallable *list) {
- auto randomProvider = CreateDeterministicRandomProvider(1);
- auto timeProvider = CreateDeterministicTimeProvider(10000000);
- TScopedAlloc graphAlloc;
- auto patternEnv = cache.Find(key);
- UNIT_ASSERT(patternEnv);
- TComputationPatternOpts opts(patternEnv->Alloc.Ref(), patternEnv->Env, GetBuiltinFactory(),
- functionRegistry.Get(), NUdf::EValidateMode::Lazy, NUdf::EValidatePolicy::Exception,
- "OFF", EGraphPerProcess::Multi);
+ TTimer t(TString(__func__) + ": ");
+ auto flow = CreateFlow(pb, vecSize, list);
- auto graph = patternEnv->Pattern->Clone(opts.ToComputationOptions(*randomProvider, *timeProvider, &graphAlloc.Ref()));
- auto value = graph->GetValue();
- UNIT_ASSERT_EQUAL(value.AsStringRef(), NYql::NUdf::TStringRef("qwerty"));
- }
+ return pb.NarrowMap(
+ pb.WideCombiner(
+ /* stream */
+ pb.ExpandMap(flow,
+ [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; }
+ ),
+ /* memlimit */
+ 64 << 20,
+ /* keyExtractor */
+ [&] (TRuntimeNode::TList /*item*/) -> TRuntimeNode::TList { return {pb.NewDataLiteral<ui64>(0)};},
+ /* init */
+ [&] (TRuntimeNode::TList /* key */, TRuntimeNode::TList item) -> TRuntimeNode::TList { return {item}; },
+ /* update */
+ [&] (TRuntimeNode::TList /* key */, TRuntimeNode::TList item, TRuntimeNode::TList state) -> TRuntimeNode::TList {
+ return {pb.Add(item.front(), state.front())};
+ },
+ /* finish */
+ [&] (TRuntimeNode::TList /* key */, TRuntimeNode::TList item) -> TRuntimeNode::TList { return {pb.NewOptional(item.front())}; }
+ ),
+ [&](TRuntimeNode::TList items) -> TRuntimeNode { return items.front(); }
+ );
+};
+
+template<bool Wide>
+TRuntimeNode CreateChain1Map(TProgramBuilder& pb, size_t vecSize, TCallable *list = nullptr);
+
+template<>
+TRuntimeNode CreateChain1Map<false>(TProgramBuilder& pb, size_t vecSize, TCallable *list) {
+ TTimer t(TString(__func__) + ": ");
+ auto flow = CreateFlow(pb, vecSize, list);
+
+ return pb.Chain1Map(
+ flow,
+ /* init */
+ [&] (TRuntimeNode item) -> TRuntimeNode { return item; },
+ /* update */
+ [&] (TRuntimeNode item, TRuntimeNode state) -> TRuntimeNode { return pb.Add(item, state); }
+ );
+}
+
+template<>
+TRuntimeNode CreateChain1Map<true>(TProgramBuilder& pb, size_t vecSize, TCallable *list) {
+ TTimer t(TString(__func__) + ": ");
+ auto flow = CreateFlow(pb, vecSize, list);
+
+ return pb.NarrowMap(
+ pb.WideChain1Map(
+ /* stream */
+ pb.ExpandMap(flow,
+ [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; }
+ ),
+ /* init */
+ [&] (TRuntimeNode::TList item) -> TRuntimeNode::TList { return item; },
+ /* update */
+ [&] (TRuntimeNode::TList item, TRuntimeNode::TList state) -> TRuntimeNode::TList { return {pb.Add(item.front(), state.front())}; }
+ ),
+ [&] (TRuntimeNode::TList item) -> TRuntimeNode { return item.front(); }
+ );
+}
+
+template<bool Wide>
+TRuntimeNode CreateDiscard(TProgramBuilder& pb, size_t vecSize, TCallable *list = nullptr) {
+ TTimer t(TString(__func__) + ": ");
+ auto flow = CreateFlow(pb, vecSize, list);
+
+ if (Wide) {
+ return pb.Discard(
+ pb.ExpandMap(flow,
+ [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; }
+ )
+ );
+ } else {
+ return pb.Discard(flow);
+ }
+}
+
+template<bool Wide>
+TRuntimeNode CreateSkip(TProgramBuilder& pb, size_t vecSize, TCallable *list = nullptr) {
+ TTimer t(TString(__func__) + ": ");
+ auto flow = CreateFlow(pb, vecSize, list);
+
+ auto count = pb.NewDataLiteral<ui64>(500);
+ if (Wide) {
+ return pb.NarrowMap(
+ pb.Skip(
+ pb.ExpandMap(flow,
+ [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; }
+ ),
+ count
+ ),
+ [&] (TRuntimeNode::TList item) -> TRuntimeNode { return item.front(); }
+ );
+ } else {
+ return pb.Skip(flow, count);
}
+}
+Y_UNIT_TEST_SUITE(ComputationGraphDataRace) {
template<class T>
void ParallelProgTest(T f, ui64 testResult) {
const ui32 cacheSize = 10;
- const ui32 inFlight = 2;
+ const ui32 inFlight = 3;
TComputationPatternLRUCache cache(cacheSize);
auto functionRegistry = CreateFunctionRegistry(CreateBuiltinRegistry())->Clone();
@@ -175,7 +350,7 @@ Y_UNIT_TEST_SUITE(ComputationPatternCache) {
const auto listType = pb.NewListType(pb.NewDataType(NUdf::TDataType<ui64>::Id));
const auto list = TCallableBuilder(pb.GetTypeEnvironment(), "TestList", listType).Build();
- const ui32 vecSize = 10'000;
+ const ui32 vecSize = 100'000;
auto progReturn = f(pb, vecSize, list);
TExploringNodeVisitor explorer;
@@ -200,8 +375,10 @@ Y_UNIT_TEST_SUITE(ComputationPatternCache) {
const auto data = genData();
+ std::vector<std::vector<ui64>> results(inFlight);
+
NPar::LocalExecutor().RunAdditionalThreads(inFlight);
- NPar::LocalExecutor().ExecRange([&](int /*id*/) {
+ NPar::LocalExecutor().ExecRange([&](int id) {
for (ui32 i = 0; i < 100; ++i) {
auto key = "a";
@@ -226,26 +403,100 @@ Y_UNIT_TEST_SUITE(ComputationPatternCache) {
ui64 acc = 0;
TUnboxedValue v = graph->GetValue();
- UNIT_ASSERT(v.HasValue());
while (v.HasValue()) {
acc += v.Get<ui64>();
v = graph->GetValue();
}
- UNIT_ASSERT_VALUES_EQUAL(acc, testResult);
+ results[id].push_back(acc);
}
}, 0, inFlight, NPar::TLocalExecutor::WAIT_COMPLETE | NPar::TLocalExecutor::MED_PRIORITY);
+
+ for (auto threadResults : results) {
+ for (auto res : threadResults) {
+ UNIT_ASSERT_VALUES_EQUAL(res, testResult);
+ }
+ }
+ }
+
+ Y_UNIT_TEST_TWIN(Filter, Wide) {
+ ParallelProgTest(CreateFilter<Wide>, 136480896);
+ }
+
+ Y_UNIT_TEST_TWIN(Map, Wide) {
+ ParallelProgTest(CreateMap<Wide>, 782);
+ }
+
+ Y_UNIT_TEST_TWIN(Condense, Wide) {
+ ParallelProgTest(CreateCondense<Wide>, 17451450000);
+ }
+
+ Y_UNIT_TEST_TWIN(Chopper, Wide) {
+ ParallelProgTest(CreateChopper<Wide>, 17451450000);
+ }
+
+ Y_UNIT_TEST_TWIN(Combine, Wide) {
+ ParallelProgTest(CreateCombine<Wide>, 17451450000);
}
- Y_UNIT_TEST(ParallelFilter) {
- ParallelProgTest(CreateFilter, 10098816);
+ Y_UNIT_TEST_TWIN(Chain1Map, Wide) {
+ ParallelProgTest(CreateChain1Map<Wide>, 789247892400000);
}
- Y_UNIT_TEST(ParallelWideFilter) {
- ParallelProgTest(CreateWideFilter, 10098816);
+ Y_UNIT_TEST_TWIN(Discard, Wide) {
+ ParallelProgTest(CreateDiscard<Wide>, 0);
}
- Y_UNIT_TEST(ParallelCondense) {
- ParallelProgTest(CreateCondense, 1295145000);
+ Y_UNIT_TEST_TWIN(Skip, Wide) {
+ ParallelProgTest(CreateSkip<Wide>, 17389067750);
+ }
+}
+
+Y_UNIT_TEST_SUITE(ComputationPatternCache) {
+ Y_UNIT_TEST(Smoke) {
+ const ui32 cacheSize = 10;
+ TComputationPatternLRUCache cache(cacheSize);
+
+ auto functionRegistry = CreateFunctionRegistry(CreateBuiltinRegistry())->Clone();
+
+ for (ui32 i = 0; i < cacheSize; ++i) {
+ std::shared_ptr<TPatternWithEnv> patternEnv = cache.CreateEnv();
+ TScopedAlloc& alloc = patternEnv->Alloc;
+ TTypeEnvironment& typeEnv = patternEnv->Env;
+
+ TProgramBuilder pb(typeEnv, *functionRegistry);
+
+ auto progReturn = pb.NewDataLiteral<NYql::NUdf::EDataSlot::String>("qwerty");
+
+ TExploringNodeVisitor explorer;
+ explorer.Walk(progReturn.GetNode(), typeEnv);
+
+ TComputationPatternOpts opts(alloc.Ref(), typeEnv, GetBuiltinFactory(),
+ functionRegistry.Get(), NUdf::EValidateMode::Lazy, NUdf::EValidatePolicy::Exception,
+ "OFF", EGraphPerProcess::Multi);
+
+ {
+ auto guard = patternEnv->Env.BindAllocator();
+ patternEnv->Pattern = MakeComputationPattern(explorer, progReturn, {}, opts);
+ }
+ cache.EmplacePattern(TString((char)('a' + i)), patternEnv);
+ }
+
+ for (ui32 i = 0; i < cacheSize; ++i) {
+ auto key = TString((char)('a' + i));
+
+ auto randomProvider = CreateDeterministicRandomProvider(1);
+ auto timeProvider = CreateDeterministicTimeProvider(10000000);
+ TScopedAlloc graphAlloc;
+ auto patternEnv = cache.Find(key);
+ UNIT_ASSERT(patternEnv);
+ TComputationPatternOpts opts(patternEnv->Alloc.Ref(), patternEnv->Env, GetBuiltinFactory(),
+ functionRegistry.Get(), NUdf::EValidateMode::Lazy, NUdf::EValidatePolicy::Exception,
+ "OFF", EGraphPerProcess::Multi);
+
+ auto graph = patternEnv->Pattern->Clone(opts.ToComputationOptions(*randomProvider, *timeProvider, &graphAlloc.Ref()));
+ auto value = graph->GetValue();
+ UNIT_ASSERT_EQUAL(value.AsStringRef(), NYql::NUdf::TStringRef("qwerty"));
+ }
}
Y_UNIT_TEST(AddPerf) {
@@ -317,7 +568,7 @@ Y_UNIT_TEST_SUITE(ComputationPatternCache) {
}
}
- Y_UNIT_TEST(FilterPerf) {
+ Y_UNIT_TEST_TWIN(FilterPerf, Wide) {
TScopedAlloc alloc;
TTypeEnvironment typeEnv(alloc);
@@ -328,7 +579,7 @@ Y_UNIT_TEST_SUITE(ComputationPatternCache) {
Cerr << "vecSize: " << vecSize << Endl;
const auto listType = pb.NewListType(pb.NewDataType(NUdf::TDataType<ui64>::Id));
const auto list = TCallableBuilder(pb.GetTypeEnvironment(), "TestList", listType).Build();
- auto progReturn = true ? CreateFilter(pb, vecSize, list) : CreateWideFilter(pb, vecSize, list);
+ auto progReturn = CreateFilter<Wide>(pb, vecSize, list);
TExploringNodeVisitor explorer;
explorer.Walk(progReturn.GetNode(), typeEnv);