diff options
author | va-kuznecov <va-kuznecov@ydb.tech> | 2022-07-20 19:26:43 +0300 |
---|---|---|
committer | va-kuznecov <va-kuznecov@ydb.tech> | 2022-07-20 19:26:43 +0300 |
commit | ff4f4c811479c0d1ff38f2b8b950056775bcb44c (patch) | |
tree | 40a0dc16359485e7e70532cf452355cb439bf997 | |
parent | 8626350a97f249321bd4b0c23d0eea4211d940d1 (diff) | |
download | ydb-ff4f4c811479c0d1ff38f2b8b950056775bcb44c.tar.gz |
Add UT to reproduce races in computation nodes
-rw-r--r-- | ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp | 391 |
1 files changed, 321 insertions, 70 deletions
diff --git a/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp b/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp index 34ae86148e2..7fcb1975499 100644 --- a/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp +++ b/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp @@ -44,7 +44,11 @@ TRuntimeNode CreateFlow(TProgramBuilder& pb, size_t vecSize, TCallable *list = n } } -TRuntimeNode CreateFilter(TProgramBuilder& pb, size_t vecSize, TCallable *list) { +template<bool Wide> +TRuntimeNode CreateFilter(TProgramBuilder& pb, size_t vecSize, TCallable *list); + +template<> +TRuntimeNode CreateFilter<false>(TProgramBuilder& pb, size_t vecSize, TCallable *list) { TTimer t(TString(__func__) + ": "); auto flow = CreateFlow(pb, vecSize, list); @@ -56,7 +60,8 @@ TRuntimeNode CreateFilter(TProgramBuilder& pb, size_t vecSize, TCallable *list) return pb.Filter(flow, handler); } -TRuntimeNode CreateWideFilter(TProgramBuilder& pb, size_t vecSize, TCallable *list = nullptr) { +template<> +TRuntimeNode CreateFilter<true>(TProgramBuilder& pb, size_t vecSize, TCallable *list) { TTimer t(TString(__func__) + ": "); auto flow = CreateFlow(pb, vecSize, list); @@ -76,7 +81,48 @@ TRuntimeNode CreateWideFilter(TProgramBuilder& pb, size_t vecSize, TCallable *li ); } -TRuntimeNode CreateCondense(TProgramBuilder& pb, size_t vecSize, TCallable *list = nullptr) { +template<bool Wide> +TRuntimeNode CreateMap(TProgramBuilder& pb, size_t vecSize, TCallable *list = nullptr); + +template<> +TRuntimeNode CreateMap<false>(TProgramBuilder& pb, size_t vecSize, TCallable *list) { + TTimer t(TString(__func__) + ": "); + auto flow = CreateFlow(pb, vecSize, list); + + auto handler = [&](TRuntimeNode node) -> TRuntimeNode { + return pb.AggrEquals( + pb.Mod(node, pb.NewOptional(pb.NewDataLiteral<ui64>(128))), + pb.NewOptional(pb.NewDataLiteral<ui64>(0))); + }; + return pb.Map(flow, handler); +} + +template<> +TRuntimeNode CreateMap<true>(TProgramBuilder& pb, size_t vecSize, TCallable *list) { + TTimer t(TString(__func__) + ": "); + auto flow = CreateFlow(pb, vecSize, list); + + auto handler = [&](TRuntimeNode::TList node) -> TRuntimeNode::TList { + return {pb.AggrEquals( + pb.Mod(node.front(), pb.NewOptional(pb.NewDataLiteral<ui64>(128))), + pb.NewOptional(pb.NewDataLiteral<ui64>(0)))}; + }; + return pb.NarrowMap( + pb.WideMap( + pb.ExpandMap(flow, + [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; } + ), + handler + ), + [&](TRuntimeNode::TList items) -> TRuntimeNode { return items.front(); } + ); +} + +template<bool Wide> +TRuntimeNode CreateCondense(TProgramBuilder& pb, size_t vecSize, TCallable *list = nullptr); + +template<> +TRuntimeNode CreateCondense<false>(TProgramBuilder& pb, size_t vecSize, TCallable *list) { TTimer t(TString(__func__) + ": "); auto flow = CreateFlow(pb, vecSize, list); @@ -90,80 +136,209 @@ TRuntimeNode CreateCondense(TProgramBuilder& pb, size_t vecSize, TCallable *list return pb.Condense(flow, state, switcherHandler, updateHandler); } -/* -TRuntimeNode CreateCombine(TProgramBuilder& pb, size_t vecSize) { +template<> +TRuntimeNode CreateCondense<true>(TProgramBuilder& pb, size_t vecSize, TCallable *list) { TTimer t(TString(__func__) + ": "); - std::vector<const TRuntimeNode> arr; - arr.reserve(vecSize); - for (ui64 i = 0; i < vecSize; ++i) { - arr.push_back(pb.NewDataLiteral<ui64>((i + 124515) % 6740234)); - } - TArrayRef<const TRuntimeNode> arrRef(std::move(arr)); - auto flow = pb.ToFlow(pb.AsList(arrRef)); + auto flow = CreateFlow(pb, vecSize, list); - TWideLambda keyExtractor = [&](TRuntimeNode item, TRuntimeNode state) -> TRuntimeNode { - return pb.Add(item, state); - }; - TBinaryWideLambda init; - TTernaryWideLambda update; - TBinaryWideLambda finish; - return pb.WideLastCombiner(flow, keyExtractor , init, update, finish); - //(flow, state, switcherHandler, updateHandler); + TRuntimeNode state = pb.NewDataLiteral<ui64>(0); + return pb.NarrowMap( + pb.WideCondense1( + /* stream */ + pb.ExpandMap(flow, + [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; } + ), + /* init */ + [&](TRuntimeNode::TList item) -> TRuntimeNode::TList { return {item}; }, + /* switcher */ + [&](TRuntimeNode::TList, TRuntimeNode::TList) -> TRuntimeNode { return pb.NewDataLiteral<bool>(false); }, + /* handler */ + [&](TRuntimeNode::TList item, TRuntimeNode::TList state) -> TRuntimeNode::TList { return {pb.Add(item.front(), state.front())}; } + ), + [&](TRuntimeNode::TList items) -> TRuntimeNode { return items.front(); } + ); } -*/ -Y_UNIT_TEST_SUITE(ComputationPatternCache) { - Y_UNIT_TEST(Smoke) { - const ui32 cacheSize = 10; - TComputationPatternLRUCache cache(cacheSize); +template<bool Wide> +TRuntimeNode CreateChopper(TProgramBuilder& pb, size_t vecSize, TCallable *list = nullptr); - auto functionRegistry = CreateFunctionRegistry(CreateBuiltinRegistry())->Clone(); +template<> +TRuntimeNode CreateChopper<false>(TProgramBuilder& pb, size_t vecSize, TCallable *list) { + TTimer t(TString(__func__) + ": "); + auto flow = CreateFlow(pb, vecSize, list); - for (ui32 i = 0; i < cacheSize; ++i) { - std::shared_ptr<TPatternWithEnv> patternEnv = cache.CreateEnv(); - TScopedAlloc& alloc = patternEnv->Alloc; - TTypeEnvironment& typeEnv = patternEnv->Env; + return pb.Chopper(flow, + /* keyExtractor */ + [&](TRuntimeNode item) -> TRuntimeNode { return item; }, + /* groupSwitch */ + [&](TRuntimeNode key, TRuntimeNode /*item*/) -> TRuntimeNode { + return pb.AggrEquals( + pb.Mod(key, pb.NewOptional(pb.NewDataLiteral<ui64>(128))), + pb.NewOptional(pb.NewDataLiteral<ui64>(0))); + }, + /* groupHandler */ + [&](TRuntimeNode, TRuntimeNode list) -> TRuntimeNode { return list; } + ); +}; - TProgramBuilder pb(typeEnv, *functionRegistry); +template<> +TRuntimeNode CreateChopper<true>(TProgramBuilder& pb, size_t vecSize, TCallable *list) { + TTimer t(TString(__func__) + ": "); + auto flow = CreateFlow(pb, vecSize, list); - auto progReturn = pb.NewDataLiteral<NYql::NUdf::EDataSlot::String>("qwerty"); + return pb.NarrowMap( + pb.WideChopper( + /* stream */ + pb.ExpandMap(flow, + [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; } + ), + /* keyExtractor */ + [&](TRuntimeNode::TList item) -> TRuntimeNode::TList { return item; }, + /* groupSwitch */ + [&](TRuntimeNode::TList key, TRuntimeNode::TList /*item*/) -> TRuntimeNode { + return pb.AggrEquals( + pb.Mod(key.front(), pb.NewOptional(pb.NewDataLiteral<ui64>(128))), + pb.NewOptional(pb.NewDataLiteral<ui64>(0))); + }, + /* groupHandler */ + [&](TRuntimeNode::TList, TRuntimeNode input) { return pb.WideMap(input, [](TRuntimeNode::TList items) { return items; }); } + ), + [&](TRuntimeNode::TList items) -> TRuntimeNode { return items.front(); } + ); +}; - TExploringNodeVisitor explorer; - explorer.Walk(progReturn.GetNode(), typeEnv); +template<bool Wide> +TRuntimeNode CreateCombine(TProgramBuilder& pb, size_t vecSize, TCallable *list = nullptr); - TComputationPatternOpts opts(alloc.Ref(), typeEnv, GetBuiltinFactory(), - functionRegistry.Get(), NUdf::EValidateMode::Lazy, NUdf::EValidatePolicy::Exception, - "OFF", EGraphPerProcess::Multi); +template<> +TRuntimeNode CreateCombine<false>(TProgramBuilder& pb, size_t vecSize, TCallable *list) { + TTimer t(TString(__func__) + ": "); + auto flow = CreateFlow(pb, vecSize, list); - { - auto guard = patternEnv->Env.BindAllocator(); - patternEnv->Pattern = MakeComputationPattern(explorer, progReturn, {}, opts); - } - cache.EmplacePattern(TString((char)('a' + i)), patternEnv); - } + return pb.CombineCore( + /* stream */ + flow, + /* keyExtractor */ + [&] (TRuntimeNode /*item*/) -> TRuntimeNode { return pb.NewDataLiteral<ui64>(0);}, + /* init */ + [&] (TRuntimeNode /* key */, TRuntimeNode item) -> TRuntimeNode { return item; }, + /* update */ + [&] (TRuntimeNode /* key */, TRuntimeNode item, TRuntimeNode state) -> TRuntimeNode { return pb.Add(item, state); }, + /* finish */ + [&] (TRuntimeNode /* key */, TRuntimeNode item) -> TRuntimeNode { return pb.NewOptional(item); }, + /* memlimit */ + 64 << 20 + ); +}; - for (ui32 i = 0; i < cacheSize; ++i) { - auto key = TString((char)('a' + i)); +template<> +TRuntimeNode CreateCombine<true>(TProgramBuilder& pb, size_t vecSize, TCallable *list) { - auto randomProvider = CreateDeterministicRandomProvider(1); - auto timeProvider = CreateDeterministicTimeProvider(10000000); - TScopedAlloc graphAlloc; - auto patternEnv = cache.Find(key); - UNIT_ASSERT(patternEnv); - TComputationPatternOpts opts(patternEnv->Alloc.Ref(), patternEnv->Env, GetBuiltinFactory(), - functionRegistry.Get(), NUdf::EValidateMode::Lazy, NUdf::EValidatePolicy::Exception, - "OFF", EGraphPerProcess::Multi); + TTimer t(TString(__func__) + ": "); + auto flow = CreateFlow(pb, vecSize, list); - auto graph = patternEnv->Pattern->Clone(opts.ToComputationOptions(*randomProvider, *timeProvider, &graphAlloc.Ref())); - auto value = graph->GetValue(); - UNIT_ASSERT_EQUAL(value.AsStringRef(), NYql::NUdf::TStringRef("qwerty")); - } + return pb.NarrowMap( + pb.WideCombiner( + /* stream */ + pb.ExpandMap(flow, + [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; } + ), + /* memlimit */ + 64 << 20, + /* keyExtractor */ + [&] (TRuntimeNode::TList /*item*/) -> TRuntimeNode::TList { return {pb.NewDataLiteral<ui64>(0)};}, + /* init */ + [&] (TRuntimeNode::TList /* key */, TRuntimeNode::TList item) -> TRuntimeNode::TList { return {item}; }, + /* update */ + [&] (TRuntimeNode::TList /* key */, TRuntimeNode::TList item, TRuntimeNode::TList state) -> TRuntimeNode::TList { + return {pb.Add(item.front(), state.front())}; + }, + /* finish */ + [&] (TRuntimeNode::TList /* key */, TRuntimeNode::TList item) -> TRuntimeNode::TList { return {pb.NewOptional(item.front())}; } + ), + [&](TRuntimeNode::TList items) -> TRuntimeNode { return items.front(); } + ); +}; + +template<bool Wide> +TRuntimeNode CreateChain1Map(TProgramBuilder& pb, size_t vecSize, TCallable *list = nullptr); + +template<> +TRuntimeNode CreateChain1Map<false>(TProgramBuilder& pb, size_t vecSize, TCallable *list) { + TTimer t(TString(__func__) + ": "); + auto flow = CreateFlow(pb, vecSize, list); + + return pb.Chain1Map( + flow, + /* init */ + [&] (TRuntimeNode item) -> TRuntimeNode { return item; }, + /* update */ + [&] (TRuntimeNode item, TRuntimeNode state) -> TRuntimeNode { return pb.Add(item, state); } + ); +} + +template<> +TRuntimeNode CreateChain1Map<true>(TProgramBuilder& pb, size_t vecSize, TCallable *list) { + TTimer t(TString(__func__) + ": "); + auto flow = CreateFlow(pb, vecSize, list); + + return pb.NarrowMap( + pb.WideChain1Map( + /* stream */ + pb.ExpandMap(flow, + [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; } + ), + /* init */ + [&] (TRuntimeNode::TList item) -> TRuntimeNode::TList { return item; }, + /* update */ + [&] (TRuntimeNode::TList item, TRuntimeNode::TList state) -> TRuntimeNode::TList { return {pb.Add(item.front(), state.front())}; } + ), + [&] (TRuntimeNode::TList item) -> TRuntimeNode { return item.front(); } + ); +} + +template<bool Wide> +TRuntimeNode CreateDiscard(TProgramBuilder& pb, size_t vecSize, TCallable *list = nullptr) { + TTimer t(TString(__func__) + ": "); + auto flow = CreateFlow(pb, vecSize, list); + + if (Wide) { + return pb.Discard( + pb.ExpandMap(flow, + [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; } + ) + ); + } else { + return pb.Discard(flow); + } +} + +template<bool Wide> +TRuntimeNode CreateSkip(TProgramBuilder& pb, size_t vecSize, TCallable *list = nullptr) { + TTimer t(TString(__func__) + ": "); + auto flow = CreateFlow(pb, vecSize, list); + + auto count = pb.NewDataLiteral<ui64>(500); + if (Wide) { + return pb.NarrowMap( + pb.Skip( + pb.ExpandMap(flow, + [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; } + ), + count + ), + [&] (TRuntimeNode::TList item) -> TRuntimeNode { return item.front(); } + ); + } else { + return pb.Skip(flow, count); } +} +Y_UNIT_TEST_SUITE(ComputationGraphDataRace) { template<class T> void ParallelProgTest(T f, ui64 testResult) { const ui32 cacheSize = 10; - const ui32 inFlight = 2; + const ui32 inFlight = 3; TComputationPatternLRUCache cache(cacheSize); auto functionRegistry = CreateFunctionRegistry(CreateBuiltinRegistry())->Clone(); @@ -175,7 +350,7 @@ Y_UNIT_TEST_SUITE(ComputationPatternCache) { const auto listType = pb.NewListType(pb.NewDataType(NUdf::TDataType<ui64>::Id)); const auto list = TCallableBuilder(pb.GetTypeEnvironment(), "TestList", listType).Build(); - const ui32 vecSize = 10'000; + const ui32 vecSize = 100'000; auto progReturn = f(pb, vecSize, list); TExploringNodeVisitor explorer; @@ -200,8 +375,10 @@ Y_UNIT_TEST_SUITE(ComputationPatternCache) { const auto data = genData(); + std::vector<std::vector<ui64>> results(inFlight); + NPar::LocalExecutor().RunAdditionalThreads(inFlight); - NPar::LocalExecutor().ExecRange([&](int /*id*/) { + NPar::LocalExecutor().ExecRange([&](int id) { for (ui32 i = 0; i < 100; ++i) { auto key = "a"; @@ -226,26 +403,100 @@ Y_UNIT_TEST_SUITE(ComputationPatternCache) { ui64 acc = 0; TUnboxedValue v = graph->GetValue(); - UNIT_ASSERT(v.HasValue()); while (v.HasValue()) { acc += v.Get<ui64>(); v = graph->GetValue(); } - UNIT_ASSERT_VALUES_EQUAL(acc, testResult); + results[id].push_back(acc); } }, 0, inFlight, NPar::TLocalExecutor::WAIT_COMPLETE | NPar::TLocalExecutor::MED_PRIORITY); + + for (auto threadResults : results) { + for (auto res : threadResults) { + UNIT_ASSERT_VALUES_EQUAL(res, testResult); + } + } + } + + Y_UNIT_TEST_TWIN(Filter, Wide) { + ParallelProgTest(CreateFilter<Wide>, 136480896); + } + + Y_UNIT_TEST_TWIN(Map, Wide) { + ParallelProgTest(CreateMap<Wide>, 782); + } + + Y_UNIT_TEST_TWIN(Condense, Wide) { + ParallelProgTest(CreateCondense<Wide>, 17451450000); + } + + Y_UNIT_TEST_TWIN(Chopper, Wide) { + ParallelProgTest(CreateChopper<Wide>, 17451450000); + } + + Y_UNIT_TEST_TWIN(Combine, Wide) { + ParallelProgTest(CreateCombine<Wide>, 17451450000); } - Y_UNIT_TEST(ParallelFilter) { - ParallelProgTest(CreateFilter, 10098816); + Y_UNIT_TEST_TWIN(Chain1Map, Wide) { + ParallelProgTest(CreateChain1Map<Wide>, 789247892400000); } - Y_UNIT_TEST(ParallelWideFilter) { - ParallelProgTest(CreateWideFilter, 10098816); + Y_UNIT_TEST_TWIN(Discard, Wide) { + ParallelProgTest(CreateDiscard<Wide>, 0); } - Y_UNIT_TEST(ParallelCondense) { - ParallelProgTest(CreateCondense, 1295145000); + Y_UNIT_TEST_TWIN(Skip, Wide) { + ParallelProgTest(CreateSkip<Wide>, 17389067750); + } +} + +Y_UNIT_TEST_SUITE(ComputationPatternCache) { + Y_UNIT_TEST(Smoke) { + const ui32 cacheSize = 10; + TComputationPatternLRUCache cache(cacheSize); + + auto functionRegistry = CreateFunctionRegistry(CreateBuiltinRegistry())->Clone(); + + for (ui32 i = 0; i < cacheSize; ++i) { + std::shared_ptr<TPatternWithEnv> patternEnv = cache.CreateEnv(); + TScopedAlloc& alloc = patternEnv->Alloc; + TTypeEnvironment& typeEnv = patternEnv->Env; + + TProgramBuilder pb(typeEnv, *functionRegistry); + + auto progReturn = pb.NewDataLiteral<NYql::NUdf::EDataSlot::String>("qwerty"); + + TExploringNodeVisitor explorer; + explorer.Walk(progReturn.GetNode(), typeEnv); + + TComputationPatternOpts opts(alloc.Ref(), typeEnv, GetBuiltinFactory(), + functionRegistry.Get(), NUdf::EValidateMode::Lazy, NUdf::EValidatePolicy::Exception, + "OFF", EGraphPerProcess::Multi); + + { + auto guard = patternEnv->Env.BindAllocator(); + patternEnv->Pattern = MakeComputationPattern(explorer, progReturn, {}, opts); + } + cache.EmplacePattern(TString((char)('a' + i)), patternEnv); + } + + for (ui32 i = 0; i < cacheSize; ++i) { + auto key = TString((char)('a' + i)); + + auto randomProvider = CreateDeterministicRandomProvider(1); + auto timeProvider = CreateDeterministicTimeProvider(10000000); + TScopedAlloc graphAlloc; + auto patternEnv = cache.Find(key); + UNIT_ASSERT(patternEnv); + TComputationPatternOpts opts(patternEnv->Alloc.Ref(), patternEnv->Env, GetBuiltinFactory(), + functionRegistry.Get(), NUdf::EValidateMode::Lazy, NUdf::EValidatePolicy::Exception, + "OFF", EGraphPerProcess::Multi); + + auto graph = patternEnv->Pattern->Clone(opts.ToComputationOptions(*randomProvider, *timeProvider, &graphAlloc.Ref())); + auto value = graph->GetValue(); + UNIT_ASSERT_EQUAL(value.AsStringRef(), NYql::NUdf::TStringRef("qwerty")); + } } Y_UNIT_TEST(AddPerf) { @@ -317,7 +568,7 @@ Y_UNIT_TEST_SUITE(ComputationPatternCache) { } } - Y_UNIT_TEST(FilterPerf) { + Y_UNIT_TEST_TWIN(FilterPerf, Wide) { TScopedAlloc alloc; TTypeEnvironment typeEnv(alloc); @@ -328,7 +579,7 @@ Y_UNIT_TEST_SUITE(ComputationPatternCache) { Cerr << "vecSize: " << vecSize << Endl; const auto listType = pb.NewListType(pb.NewDataType(NUdf::TDataType<ui64>::Id)); const auto list = TCallableBuilder(pb.GetTypeEnvironment(), "TestList", listType).Build(); - auto progReturn = true ? CreateFilter(pb, vecSize, list) : CreateWideFilter(pb, vecSize, list); + auto progReturn = CreateFilter<Wide>(pb, vecSize, list); TExploringNodeVisitor explorer; explorer.Walk(progReturn.GetNode(), typeEnv); |