diff options
author | vvvv <vvvv@yandex-team.com> | 2024-11-07 04:19:26 +0300 |
---|---|---|
committer | vvvv <vvvv@yandex-team.com> | 2024-11-07 04:29:50 +0300 |
commit | 2661be00f3bc47590fda9218bf0386d6355c8c88 (patch) | |
tree | 3d316c07519191283d31c5f537efc6aabb42a2f0 /yql/essentials/minikql/computation/mkql_computation_pattern_cache_ut.cpp | |
parent | cf2a23963ac10add28c50cc114fbf48953eca5aa (diff) | |
download | ydb-2661be00f3bc47590fda9218bf0386d6355c8c88.tar.gz |
Moved yql/minikql YQL-19206
init
[nodiff:caesar]
commit_hash:d1182ef7d430ccf7e4d37ed933c7126d7bd5d6e4
Diffstat (limited to 'yql/essentials/minikql/computation/mkql_computation_pattern_cache_ut.cpp')
-rw-r--r-- | yql/essentials/minikql/computation/mkql_computation_pattern_cache_ut.cpp | 883 |
1 files changed, 883 insertions, 0 deletions
diff --git a/yql/essentials/minikql/computation/mkql_computation_pattern_cache_ut.cpp b/yql/essentials/minikql/computation/mkql_computation_pattern_cache_ut.cpp new file mode 100644 index 0000000000..0a28de9162 --- /dev/null +++ b/yql/essentials/minikql/computation/mkql_computation_pattern_cache_ut.cpp @@ -0,0 +1,883 @@ +#include "library/cpp/threading/local_executor/local_executor.h" +#include "yql/essentials/minikql/comp_nodes/ut/mkql_computation_node_ut.h" +#include <yql/essentials/minikql/computation/mkql_computation_node_holders.h> +#include <yql/essentials/minikql/computation/mkql_computation_pattern_cache.h> +#include <yql/essentials/minikql/mkql_type_builder.h> +#include <yql/essentials/minikql/mkql_node_serialization.h> +#include <yql/essentials/utils/yql_panic.h> +#include <yql/essentials/minikql/mkql_node.h> +#include <yql/essentials/minikql/mkql_program_builder.h> +#include <yql/essentials/minikql/computation/mkql_computation_node.h> +#include <yql/essentials/minikql/computation/mkql_computation_node_impl.h> +#include <yql/essentials/minikql/invoke_builtins/mkql_builtins.h> +#include <yql/essentials/minikql/comp_nodes/mkql_factories.h> +#include <contrib/ydb/library/yql/dq/proto/dq_tasks.pb.h> + +#include <library/cpp/testing/unittest/registar.h> + +#include <util/datetime/cputimer.h> + +namespace NKikimr { +namespace NMiniKQL { + +using namespace NYql::NUdf; + +TComputationNodeFactory GetListTestFactory() { + return [](TCallable& callable, const TComputationNodeFactoryContext& ctx) -> IComputationNode* { + if (callable.GetType()->GetName() == "TestList") { + return new TExternalComputationNode(ctx.Mutables); + } + return GetBuiltinFactory()(callable, ctx); + }; +} + +TRuntimeNode CreateFlow(TProgramBuilder& pb, size_t vecSize, TCallable* list) { + if (list) { + return pb.ToFlow(TRuntimeNode(list, false)); + } else { + std::vector<const TRuntimeNode> arr; + arr.reserve(vecSize); + for (ui64 i = 0; i < vecSize; ++i) { + arr.push_back(pb.NewDataLiteral<ui64>((i + 124515) % 6740234)); + } + TArrayRef<const TRuntimeNode> arrRef(std::move(arr)); + return pb.ToFlow(pb.AsList(arrRef)); + } +} + +template<bool Wide> +TRuntimeNode CreateFilter(TProgramBuilder& pb, size_t vecSize, TCallable* list); + +template<> +TRuntimeNode CreateFilter<false>(TProgramBuilder& pb, size_t vecSize, TCallable* list) { + TTimer t(TString(__func__) + ": "); + auto flow = CreateFlow(pb, vecSize, list); + + auto handler = [&](TRuntimeNode node) -> TRuntimeNode { + return pb.AggrEquals( + pb.Mod(node, pb.NewOptional(pb.NewDataLiteral<ui64>(128))), + pb.NewOptional(pb.NewDataLiteral<ui64>(0))); + }; + return pb.Filter(flow, handler); +} + +template<> +TRuntimeNode CreateFilter<true>(TProgramBuilder& pb, size_t vecSize, TCallable* list) { + TTimer t(TString(__func__) + ": "); + auto flow = CreateFlow(pb, vecSize, list); + + auto handler = [&](TRuntimeNode::TList node) -> TRuntimeNode { + return pb.AggrEquals( + pb.Mod(node.front(), pb.NewOptional(pb.NewDataLiteral<ui64>(128))), + pb.NewOptional(pb.NewDataLiteral<ui64>(0))); + }; + return pb.NarrowMap( + pb.WideFilter( + pb.ExpandMap(flow, + [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; } + ), + handler + ), + [&](TRuntimeNode::TList items) -> TRuntimeNode { return items.front(); } + ); +} + +template<bool Wide> +TRuntimeNode CreateMap(TProgramBuilder& pb, size_t vecSize, TCallable* list = nullptr); + +template<> +TRuntimeNode CreateMap<false>(TProgramBuilder& pb, size_t vecSize, TCallable* list) { + TTimer t(TString(__func__) + ": "); + auto flow = CreateFlow(pb, vecSize, list); + + auto handler = [&](TRuntimeNode node) -> TRuntimeNode { + return pb.AggrEquals( + pb.Mod(node, pb.NewOptional(pb.NewDataLiteral<ui64>(128))), + pb.NewOptional(pb.NewDataLiteral<ui64>(0))); + }; + return pb.Map(flow, handler); +} + +template<> +TRuntimeNode CreateMap<true>(TProgramBuilder& pb, size_t vecSize, TCallable* list) { + TTimer t(TString(__func__) + ": "); + auto flow = CreateFlow(pb, vecSize, list); + + auto handler = [&](TRuntimeNode::TList node) -> TRuntimeNode::TList { + return {pb.AggrEquals( + pb.Mod(node.front(), pb.NewOptional(pb.NewDataLiteral<ui64>(128))), + pb.NewOptional(pb.NewDataLiteral<ui64>(0)))}; + }; + return pb.NarrowMap( + pb.WideMap( + pb.ExpandMap(flow, + [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; } + ), + handler + ), + [&](TRuntimeNode::TList items) -> TRuntimeNode { return items.front(); } + ); +} + +template<bool Wide> +TRuntimeNode CreateCondense(TProgramBuilder& pb, size_t vecSize, TCallable* list = nullptr); + +template<> +TRuntimeNode CreateCondense<false>(TProgramBuilder& pb, size_t vecSize, TCallable* list) { + TTimer t(TString(__func__) + ": "); + auto flow = CreateFlow(pb, vecSize, list); + + auto switcherHandler = [&](TRuntimeNode, TRuntimeNode) -> TRuntimeNode { + return pb.NewDataLiteral<bool>(false); + }; + auto updateHandler = [&](TRuntimeNode item, TRuntimeNode state) -> TRuntimeNode { + return pb.Add(item, state); + }; + TRuntimeNode state = pb.NewDataLiteral<ui64>(0); + return pb.Condense(flow, state, switcherHandler, updateHandler); +} + +template<> +TRuntimeNode CreateCondense<true>(TProgramBuilder& pb, size_t vecSize, TCallable* list) { + TTimer t(TString(__func__) + ": "); + auto flow = CreateFlow(pb, vecSize, list); + + TRuntimeNode state = pb.NewDataLiteral<ui64>(0); + return pb.NarrowMap( + pb.WideCondense1( + /* stream */ + pb.ExpandMap(flow, + [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; } + ), + /* init */ + [&](TRuntimeNode::TList item) -> TRuntimeNode::TList { return {item}; }, + /* switcher */ + [&](TRuntimeNode::TList, TRuntimeNode::TList) -> TRuntimeNode { return pb.NewDataLiteral<bool>(false); }, + /* handler */ + [&](TRuntimeNode::TList item, TRuntimeNode::TList state) -> TRuntimeNode::TList { return {pb.Add(item.front(), state.front())}; } + ), + [&](TRuntimeNode::TList items) -> TRuntimeNode { return items.front(); } + ); +} + +template<bool Wide> +TRuntimeNode CreateChopper(TProgramBuilder& pb, size_t vecSize, TCallable* list = nullptr); + +template<> +TRuntimeNode CreateChopper<false>(TProgramBuilder& pb, size_t vecSize, TCallable* list) { + TTimer t(TString(__func__) + ": "); + auto flow = CreateFlow(pb, vecSize, list); + + return pb.Chopper(flow, + /* keyExtractor */ + [&](TRuntimeNode item) -> TRuntimeNode { return item; }, + /* groupSwitch */ + [&](TRuntimeNode key, TRuntimeNode /*item*/) -> TRuntimeNode { + return pb.AggrEquals( + pb.Mod(key, pb.NewOptional(pb.NewDataLiteral<ui64>(128))), + pb.NewOptional(pb.NewDataLiteral<ui64>(0))); + }, + /* groupHandler */ + [&](TRuntimeNode, TRuntimeNode list) -> TRuntimeNode { return list; } + ); +}; + +template<> +TRuntimeNode CreateChopper<true>(TProgramBuilder& pb, size_t vecSize, TCallable* list) { + TTimer t(TString(__func__) + ": "); + auto flow = CreateFlow(pb, vecSize, list); + + return pb.NarrowMap( + pb.WideChopper( + /* stream */ + pb.ExpandMap(flow, + [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; } + ), + /* keyExtractor */ + [&](TRuntimeNode::TList item) -> TRuntimeNode::TList { return item; }, + /* groupSwitch */ + [&](TRuntimeNode::TList key, TRuntimeNode::TList /*item*/) -> TRuntimeNode { + return pb.AggrEquals( + pb.Mod(key.front(), pb.NewOptional(pb.NewDataLiteral<ui64>(128))), + pb.NewOptional(pb.NewDataLiteral<ui64>(0))); + }, + /* groupHandler */ + [&](TRuntimeNode::TList, TRuntimeNode input) { return pb.WideMap(input, [](TRuntimeNode::TList items) { return items; }); } + ), + [&](TRuntimeNode::TList items) -> TRuntimeNode { return items.front(); } + ); +}; + +template<bool Wide> +TRuntimeNode CreateCombine(TProgramBuilder& pb, size_t vecSize, TCallable* list = nullptr); + +template<> +TRuntimeNode CreateCombine<false>(TProgramBuilder& pb, size_t vecSize, TCallable* list) { + TTimer t(TString(__func__) + ": "); + auto flow = CreateFlow(pb, vecSize, list); + + return pb.CombineCore( + /* stream */ + flow, + /* keyExtractor */ + [&] (TRuntimeNode /*item*/) -> TRuntimeNode { return pb.NewDataLiteral<ui64>(0);}, + /* init */ + [&] (TRuntimeNode /* key */, TRuntimeNode item) -> TRuntimeNode { return item; }, + /* update */ + [&] (TRuntimeNode /* key */, TRuntimeNode item, TRuntimeNode state) -> TRuntimeNode { return pb.Add(item, state); }, + /* finish */ + [&] (TRuntimeNode /* key */, TRuntimeNode item) -> TRuntimeNode { return pb.NewOptional(item); }, + /* memlimit */ + 64 << 20 + ); +}; + +template<> +TRuntimeNode CreateCombine<true>(TProgramBuilder& pb, size_t vecSize, TCallable* list) { + + TTimer t(TString(__func__) + ": "); + auto flow = CreateFlow(pb, vecSize, list); + + return pb.NarrowMap( + pb.WideCombiner( + /* stream */ + pb.ExpandMap(flow, + [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; } + ), + /* memlimit */ + 64 << 20, + /* keyExtractor */ + [&] (TRuntimeNode::TList /*item*/) -> TRuntimeNode::TList { return {pb.NewDataLiteral<ui64>(0)};}, + /* init */ + [&] (TRuntimeNode::TList /* key */, TRuntimeNode::TList item) -> TRuntimeNode::TList { return {item}; }, + /* update */ + [&] (TRuntimeNode::TList /* key */, TRuntimeNode::TList item, TRuntimeNode::TList state) -> TRuntimeNode::TList { + return {pb.Add(item.front(), state.front())}; + }, + /* finish */ + [&] (TRuntimeNode::TList /* key */, TRuntimeNode::TList item) -> TRuntimeNode::TList { return {pb.NewOptional(item.front())}; } + ), + [&](TRuntimeNode::TList items) -> TRuntimeNode { return items.front(); } + ); +}; + +template<bool Wide> +TRuntimeNode CreateChain1Map(TProgramBuilder& pb, size_t vecSize, TCallable* list = nullptr); + +template<> +TRuntimeNode CreateChain1Map<false>(TProgramBuilder& pb, size_t vecSize, TCallable* list) { + TTimer t(TString(__func__) + ": "); + auto flow = CreateFlow(pb, vecSize, list); + + return pb.Chain1Map( + flow, + /* init */ + [&] (TRuntimeNode item) -> TRuntimeNode { return item; }, + /* update */ + [&] (TRuntimeNode item, TRuntimeNode state) -> TRuntimeNode { return pb.Add(item, state); } + ); +} + +template<> +TRuntimeNode CreateChain1Map<true>(TProgramBuilder& pb, size_t vecSize, TCallable* list) { + TTimer t(TString(__func__) + ": "); + auto flow = CreateFlow(pb, vecSize, list); + + return pb.NarrowMap( + pb.WideChain1Map( + /* stream */ + pb.ExpandMap(flow, + [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; } + ), + /* init */ + [&] (TRuntimeNode::TList item) -> TRuntimeNode::TList { return item; }, + /* update */ + [&] (TRuntimeNode::TList item, TRuntimeNode::TList state) -> TRuntimeNode::TList { return {pb.Add(item.front(), state.front())}; } + ), + [&] (TRuntimeNode::TList item) -> TRuntimeNode { return item.front(); } + ); +} + +template<bool Wide> +TRuntimeNode CreateDiscard(TProgramBuilder& pb, size_t vecSize, TCallable* list = nullptr) { + TTimer t(TString(__func__) + ": "); + auto flow = CreateFlow(pb, vecSize, list); + + if (Wide) { + return pb.Discard( + pb.ExpandMap(flow, + [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; } + ) + ); + } else { + return pb.Discard(flow); + } +} + +template<bool Wide> +TRuntimeNode CreateSkip(TProgramBuilder& pb, size_t vecSize, TCallable* list = nullptr) { + TTimer t(TString(__func__) + ": "); + auto flow = CreateFlow(pb, vecSize, list); + + auto count = pb.NewDataLiteral<ui64>(500); + if (Wide) { + return pb.NarrowMap( + pb.Skip( + pb.ExpandMap(flow, + [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; } + ), + count + ), + [&] (TRuntimeNode::TList item) -> TRuntimeNode { return item.front(); } + ); + } else { + return pb.Skip(flow, count); + } +} + +template<bool Flow> +TRuntimeNode CreateNarrowFlatMap(TProgramBuilder& pb, size_t vecSize, TCallable* list = nullptr) { + TTimer t(TString(__func__) + ": "); + auto flow = CreateFlow(pb, vecSize, list); + + return pb.NarrowFlatMap( + pb.ExpandMap(flow, + [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; } + ), + [&] (TRuntimeNode::TList item) -> TRuntimeNode { + auto x = pb.NewOptional(item.front()); + return Flow ? pb.ToFlow(x) : x; + } + ); +} + +TRuntimeNode CreateNarrowMultiMap(TProgramBuilder& pb, size_t vecSize, TCallable* list = nullptr) { + TTimer t(TString(__func__) + ": "); + auto flow = CreateFlow(pb, vecSize, list); + + return pb.NarrowMultiMap( + pb.ExpandMap(flow, + [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; } + ), + [&] (TRuntimeNode::TList item) -> TRuntimeNode::TList { + return {item.front(), item.front()}; + } + ); +} + +template<bool WithPayload> +TRuntimeNode CreateSqueezeToSortedDict(TProgramBuilder& pb, size_t vecSize, TCallable* list = nullptr) { + TTimer t(TString(__func__) + ": "); + auto flow = CreateFlow(pb, vecSize, list); + + return pb.FlatMap( + pb.NarrowSqueezeToSortedDict( + pb.ExpandMap(flow, + [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; } + ), + /*all*/ false, + /*keySelector*/ [&](TRuntimeNode::TList item) { return item.front(); }, + /*payloadSelector*/ [&](TRuntimeNode::TList ) { return WithPayload ? pb.NewDataLiteral<ui64>(0) : pb.NewVoid(); } + ), + [&] (TRuntimeNode item) { return pb.DictKeys(item); } + ); +} + +TRuntimeNode CreateMapJoin(TProgramBuilder& pb, size_t vecSize, TCallable* list = nullptr) { + TTimer t(TString(__func__) + ": "); + auto flow = CreateFlow(pb, vecSize, list); + + const auto tupleType = pb.NewTupleType({ + pb.NewDataType(NUdf::TDataType<ui32>::Id), + pb.NewDataType(NUdf::TDataType<ui64>::Id) + }); + + const auto list1 = pb.Map(flow, [&] (TRuntimeNode item) { + return pb.NewTuple({pb.Mod(item, pb.NewDataLiteral<ui64>(1000)), pb.NewDataLiteral<ui32>(1)}); + }); + + const auto list2 = pb.NewList(tupleType, { + pb.NewTuple({pb.NewDataLiteral<ui32>(1), pb.NewDataLiteral<ui64>(3 * 1000)}), + pb.NewTuple({pb.NewDataLiteral<ui32>(2), pb.NewDataLiteral<ui64>(4 * 1000)}), + pb.NewTuple({pb.NewDataLiteral<ui32>(3), pb.NewDataLiteral<ui64>(5 * 1000)}), + }); + + const auto dict = pb.ToSortedDict(list2, false, + [&](TRuntimeNode item) { + return pb.Nth(item, 0); + }, + [&](TRuntimeNode item) { + return pb.NewTuple({pb.Nth(item, 1U)}); + }); + + const auto resultType = pb.NewFlowType(pb.NewMultiType({ + pb.NewDataType(NUdf::TDataType<char*>::Id), + pb.NewDataType(NUdf::TDataType<char*>::Id), + })); + + return pb.Map( + pb.NarrowMap(pb.MapJoinCore( + pb.ExpandMap(list1, [&] (TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0), pb.Nth(item, 1)}; }), + dict, + EJoinKind::Inner, + {0U}, + {1U, 0U}, + {0U, 1U}, + resultType + ), + [&](TRuntimeNode::TList items) { return pb.NewTuple(items); } + ), + [&](TRuntimeNode item) { return pb.Nth(item, 1); } + ); +} + +Y_UNIT_TEST_SUITE(ComputationGraphDataRace) { + template<class T> + void ParallelProgTest(T f, bool useLLVM, ui64 testResult, size_t vecSize = 10'000) { + TTimer t("total: "); + const ui32 cacheSizeInBytes = 104857600; // 100 MiB + const ui32 inFlight = 7; + TComputationPatternLRUCache cache({cacheSizeInBytes, cacheSizeInBytes}); + + auto functionRegistry = CreateFunctionRegistry(CreateBuiltinRegistry())->Clone(); + auto entry = std::make_shared<TPatternCacheEntry>(); + TScopedAlloc& alloc = entry->Alloc; + TTypeEnvironment& typeEnv = entry->Env; + + TProgramBuilder pb(typeEnv, *functionRegistry); + + const auto listType = pb.NewListType(pb.NewDataType(NUdf::TDataType<ui64>::Id)); + const auto list = TCallableBuilder(pb.GetTypeEnvironment(), "TestList", listType).Build(); + TRuntimeNode progReturn; + with_lock(alloc) { + progReturn = f(pb, vecSize, list); + } + + TExploringNodeVisitor explorer; + explorer.Walk(progReturn.GetNode(), typeEnv); + + TComputationPatternOpts opts(alloc.Ref(), typeEnv, GetListTestFactory(), functionRegistry.Get(), + NUdf::EValidateMode::Lazy, NUdf::EValidatePolicy::Exception, useLLVM ? "" : "OFF", EGraphPerProcess::Multi); + + { + auto guard = entry->Env.BindAllocator(); + entry->Pattern = MakeComputationPattern(explorer, progReturn, {list}, opts); + } + cache.EmplacePattern("a", entry); + auto genData = [&]() { + std::vector<ui64> data; + data.reserve(vecSize); + for (ui64 i = 0; i < vecSize; ++i) { + data.push_back((i + 124515) % 6740234); + } + return data; + }; + + const auto data = genData(); + + std::vector<std::vector<ui64>> results(inFlight); + + NPar::LocalExecutor().RunAdditionalThreads(inFlight); + NPar::LocalExecutor().ExecRange([&](int id) { + for (ui32 i = 0; i < 100; ++i) { + auto key = "a"; + + auto randomProvider = CreateDeterministicRandomProvider(1); + auto timeProvider = CreateDeterministicTimeProvider(10000000); + TScopedAlloc graphAlloc(__LOCATION__); + + auto entry = cache.Find(key); + + TComputationPatternOpts opts(entry->Alloc.Ref(), entry->Env, GetListTestFactory(), + functionRegistry.Get(), NUdf::EValidateMode::Lazy, NUdf::EValidatePolicy::Exception, + useLLVM ? "" : "OFF", EGraphPerProcess::Multi); + + auto graph = entry->Pattern->Clone(opts.ToComputationOptions(*randomProvider, *timeProvider, &graphAlloc.Ref())); + TUnboxedValue* items = nullptr; + graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), graph->GetHolderFactory().CreateDirectArrayHolder(data.size(), items)); + + std::transform(data.cbegin(), data.cend(), items, + [](const auto s) { + return ToValue<ui64>(s); + }); + + ui64 acc = 0; + TUnboxedValue v = graph->GetValue(); + while (v.HasValue()) { + acc += v.Get<ui64>(); + v = graph->GetValue(); + } + results[id].push_back(acc); + } + }, 0, inFlight, NPar::TLocalExecutor::WAIT_COMPLETE | NPar::TLocalExecutor::MED_PRIORITY); + + for (auto threadResults : results) { + for (auto res : threadResults) { + UNIT_ASSERT_VALUES_EQUAL(res, testResult); + } + } + } + + Y_UNIT_TEST_QUAD(Filter, Wide, UseLLVM) { + ParallelProgTest(CreateFilter<Wide>, UseLLVM, 10098816); + } + + Y_UNIT_TEST_QUAD(Map, Wide, UseLLVM) { + ParallelProgTest(CreateMap<Wide>, UseLLVM, 78); + } + + Y_UNIT_TEST_QUAD(Condense, Wide, UseLLVM) { + ParallelProgTest(CreateCondense<Wide>, UseLLVM, 1295145000); + } + + Y_UNIT_TEST_QUAD(Chopper, Wide, UseLLVM) { + ParallelProgTest(CreateChopper<Wide>, UseLLVM, 1295145000); + } + + Y_UNIT_TEST_QUAD(Combine, Wide, UseLLVM) { + ParallelProgTest(CreateCombine<Wide>, UseLLVM, 1295145000); + } + + Y_UNIT_TEST_QUAD(Chain1Map, Wide, UseLLVM) { + ParallelProgTest(CreateChain1Map<Wide>, UseLLVM, 6393039240000); + } + + Y_UNIT_TEST_QUAD(Discard, Wide, UseLLVM) { + ParallelProgTest(CreateDiscard<Wide>, UseLLVM, 0); + } + + Y_UNIT_TEST_QUAD(Skip, Wide, UseLLVM) { + ParallelProgTest(CreateSkip<Wide>, UseLLVM, 1232762750); + } + + Y_UNIT_TEST_QUAD(NarrowFlatMap, Flow, UseLLVM) { + ParallelProgTest(CreateNarrowFlatMap<Flow>, UseLLVM, 1295145000); + } + + Y_UNIT_TEST_TWIN(NarrowMultiMap, UseLLVM) { + ParallelProgTest(CreateNarrowMultiMap, UseLLVM, 1295145000ull * 2); + } + + Y_UNIT_TEST_QUAD(SqueezeToSortedDict, WithPayload, UseLLVM) { + ParallelProgTest(CreateSqueezeToSortedDict<WithPayload>, UseLLVM, 125014500, 1000); + } + + Y_UNIT_TEST_TWIN(MapJoin, UseLLVM) { + ParallelProgTest(CreateMapJoin, UseLLVM, 120000, 10'000); + } +} + + +Y_UNIT_TEST_SUITE(ComputationPatternCache) { + Y_UNIT_TEST(Smoke) { + const ui32 cacheSize = 10'000'000; + const ui32 cacheItems = 10; + TComputationPatternLRUCache cache({cacheSize, cacheSize}); + + auto functionRegistry = CreateFunctionRegistry(CreateBuiltinRegistry())->Clone(); + + for (ui32 i = 0; i < cacheItems; ++i) { + auto entry = std::make_shared<TPatternCacheEntry>(); + TScopedAlloc& alloc = entry->Alloc; + TTypeEnvironment& typeEnv = entry->Env; + + TProgramBuilder pb(typeEnv, *functionRegistry); + + TRuntimeNode progReturn; + with_lock(alloc) { + progReturn = pb.NewDataLiteral<NYql::NUdf::EDataSlot::String>("qwerty"); + } + + TExploringNodeVisitor explorer; + explorer.Walk(progReturn.GetNode(), typeEnv); + + TComputationPatternOpts opts(alloc.Ref(), typeEnv, GetBuiltinFactory(), + functionRegistry.Get(), NUdf::EValidateMode::Lazy, NUdf::EValidatePolicy::Exception, + "OFF", EGraphPerProcess::Multi); + + { + auto guard = entry->Env.BindAllocator(); + entry->Pattern = MakeComputationPattern(explorer, progReturn, {}, opts); + } + + // XXX: There is no way to accurately define how the entry's + // allocator obtains the memory pages: using the free ones from the + // global page pool or the ones directly requested by <mmap>. At the + // same time, it is the total allocated bytes (not just the number + // of the borrowed pages) that is a good estimate of the memory + // consumed by the pattern cache entry for real life workload. + // Hence, to avoid undesired cache flushes, release the free pages + // of the allocator of the particular entry. + alloc.ReleaseFreePages(); + cache.EmplacePattern(TString((char)('a' + i)), entry); + } + + for (ui32 i = 0; i < cacheItems; ++i) { + auto key = TString((char)('a' + i)); + + auto randomProvider = CreateDeterministicRandomProvider(1); + auto timeProvider = CreateDeterministicTimeProvider(10000000); + TScopedAlloc graphAlloc(__LOCATION__); + auto entry = cache.Find(key); + UNIT_ASSERT(entry); + TComputationPatternOpts opts(entry->Alloc.Ref(), entry->Env, GetBuiltinFactory(), + functionRegistry.Get(), NUdf::EValidateMode::Lazy, NUdf::EValidatePolicy::Exception, + "OFF", EGraphPerProcess::Multi); + + auto graph = entry->Pattern->Clone(opts.ToComputationOptions(*randomProvider, *timeProvider, &graphAlloc.Ref())); + auto value = graph->GetValue(); + UNIT_ASSERT_EQUAL(NYql::NUdf::TStringRef("qwerty"), value.AsStringRef()); + } + } + + Y_UNIT_TEST(DoubleNotifyPatternCompiled) { + class TMockComputationPattern final : public IComputationPattern { + public: + explicit TMockComputationPattern(size_t codeSize) : Size_(codeSize) {} + + void Compile(TString, IStatsRegistry*) override { Compiled_ = true; } + bool IsCompiled() const override { return Compiled_; } + size_t CompiledCodeSize() const override { return Size_; } + void RemoveCompiledCode() override { Compiled_ = false; } + THolder<IComputationGraph> Clone(const TComputationOptsFull&) override { return {}; } + bool GetSuitableForCache() const override { return true; } + + private: + const size_t Size_; + bool Compiled_ = false; + }; + + const TString key = "program"; + const ui32 cacheSize = 2; + TComputationPatternLRUCache cache({cacheSize, cacheSize}); + + auto entry = std::make_shared<TPatternCacheEntry>(); + entry->Pattern = MakeIntrusive<TMockComputationPattern>(1u); + cache.EmplacePattern(key, entry); + + for (ui32 i = 0; i < cacheSize + 1; ++i) { + entry->Pattern->Compile("", nullptr); + cache.NotifyPatternCompiled(key); + } + + entry = std::make_shared<TPatternCacheEntry>(); + entry->Pattern = MakeIntrusive<TMockComputationPattern>(cacheSize + 1); + entry->Pattern->Compile("", nullptr); + cache.EmplacePattern(key, entry); + } + + Y_UNIT_TEST(AddPerf) { + TTimer t("all: "); + TScopedAlloc alloc(__LOCATION__); + TTypeEnvironment typeEnv(alloc); + + auto functionRegistry = CreateFunctionRegistry(CreateBuiltinRegistry())->Clone(); + + TProgramBuilder pb(typeEnv, *functionRegistry); + auto prog1 = pb.NewDataLiteral<ui64>(123591592ULL); + auto prog2 = pb.NewDataLiteral<ui64>(323591592ULL); + auto progReturn = pb.Add(prog1, prog2); + + TExploringNodeVisitor explorer; + explorer.Walk(progReturn.GetNode(), typeEnv); + + NUdf::EValidateMode validateMode = NUdf::EValidateMode::Lazy; + TComputationPatternOpts opts(alloc.Ref(), typeEnv, GetBuiltinFactory(), + functionRegistry.Get(), validateMode, NUdf::EValidatePolicy::Exception, + "OFF", EGraphPerProcess::Multi); + + auto t_make_pattern = std::make_unique<TTimer>("make_pattern: "); + auto pattern = MakeComputationPattern(explorer, progReturn, {}, opts); + t_make_pattern.reset(); + auto randomProvider = CreateDeterministicRandomProvider(1); + auto timeProvider = CreateDeterministicTimeProvider(10000000); + + auto t_clone = std::make_unique<TTimer>("clone: "); + auto graph = pattern->Clone(opts.ToComputationOptions(*randomProvider, *timeProvider)); + t_clone.reset(); + + const ui64 repeats = 100'000; + + { + TTimer t("graph: "); + ui64 acc = 0; + for (ui64 i = 0; i < repeats; ++i) { + acc += graph->GetValue().Get<ui64>(); + } + Y_DO_NOT_OPTIMIZE_AWAY(acc); + } + { + std::function<ui64(ui64, ui64)> add = [](ui64 a, ui64 b) { + return a + b; + }; + + TTimer t("lambda: "); + ui64 acc = 0; + for (ui64 i = 0; i < repeats; ++i) { + acc += add(123591592ULL, 323591592ULL); + } + Y_DO_NOT_OPTIMIZE_AWAY(acc); + } + { + std::function<TUnboxedValue(TUnboxedValue&, TUnboxedValue&)> add = + [](TUnboxedValue& a, TUnboxedValue& b) { + return TUnboxedValuePod(a.Get<ui64>() + b.Get<ui64>()); + }; + Y_DO_NOT_OPTIMIZE_AWAY(add); + + TTimer t("lambda unboxed value: "); + TUnboxedValue acc(TUnboxedValuePod(0)); + TUnboxedValue v1(TUnboxedValuePod(ui64{123591592UL})); + TUnboxedValue v2(TUnboxedValuePod(ui64{323591592UL})); + for (ui64 i = 0; i < repeats; ++i) { + auto r = add(v1, v2); + acc = add(r, acc); + } + Y_DO_NOT_OPTIMIZE_AWAY(acc.Get<ui64>()); + } + } + + Y_UNIT_TEST_TWIN(FilterPerf, Wide) { + TScopedAlloc alloc(__LOCATION__); + TTypeEnvironment typeEnv(alloc); + + auto functionRegistry = CreateFunctionRegistry(CreateBuiltinRegistry())->Clone(); + + TProgramBuilder pb(typeEnv, *functionRegistry); + const ui64 vecSize = 100'000; + Cerr << "vecSize: " << vecSize << Endl; + const auto listType = pb.NewListType(pb.NewDataType(NUdf::TDataType<ui64>::Id)); + const auto list = TCallableBuilder(pb.GetTypeEnvironment(), "TestList", listType).Build(); + auto progReturn = CreateFilter<Wide>(pb, vecSize, list); + + TExploringNodeVisitor explorer; + explorer.Walk(progReturn.GetNode(), typeEnv); + + NUdf::EValidateMode validateMode = NUdf::EValidateMode::Max; + TComputationPatternOpts opts(alloc.Ref(), typeEnv, GetListTestFactory(), + functionRegistry.Get(), validateMode, NUdf::EValidatePolicy::Exception, + "OFF", EGraphPerProcess::Multi); + + auto t_make_pattern = std::make_unique<TTimer>("make_pattern: "); + auto pattern = MakeComputationPattern(explorer, progReturn, {list}, opts); + t_make_pattern.reset(); + + auto randomProvider = CreateDeterministicRandomProvider(1); + auto timeProvider = CreateDeterministicTimeProvider(10000000); + + auto t_clone = std::make_unique<TTimer>("clone: "); + auto graph = pattern->Clone(opts.ToComputationOptions(*randomProvider, *timeProvider)); + + t_clone.reset(); + + auto genData = [&]() { + std::vector<ui64> data; + data.reserve(vecSize); + for (ui64 i = 0; i < vecSize; ++i) { + data.push_back((i + 124515) % 6740234); + } + return data; + }; + + auto testResult = [&] (ui64 acc, ui64 count) { + if (vecSize == 100'000'000) { + UNIT_ASSERT_VALUES_EQUAL(acc, 2614128386688); + UNIT_ASSERT_VALUES_EQUAL(count, 781263); + } else if (vecSize == 10'000'000) { + UNIT_ASSERT_VALUES_EQUAL(acc, 222145217664); + } else if (vecSize == 100'000) { + UNIT_ASSERT_VALUES_EQUAL(acc, 136480896); + UNIT_ASSERT_VALUES_EQUAL(count, 782); + } else { + UNIT_FAIL("result is not checked"); + } + }; + + ui64 kIter = 2; + { + TDuration total; + for (ui64 i = 0; i < kIter; ++i) { + ui64 acc = 0; + ui64 count = 0; + + auto graph = pattern->Clone(opts.ToComputationOptions(*randomProvider, *timeProvider)); + auto data = genData(); + TUnboxedValue* items = nullptr; + graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), graph->GetHolderFactory().CreateDirectArrayHolder(data.size(), items)); + + std::transform(data.cbegin(), data.cend(), items, + [](const auto s) { + return ToValue<ui64>(s); + }); + + TSimpleTimer t; + TUnboxedValue v = graph->GetValue(); + while (v.HasValue()) { + acc += v.Get<ui64>(); + ++count; + v = graph->GetValue(); + } + testResult(acc, count); + + total += t.Get(); + } + Cerr << "graph: " << Sprintf("%.3f", total.SecondsFloat()) << "s" << Endl; + } + + { + auto data = genData(); + std::function<bool(ui64)> predicate = [](ui64 a) { + return a % 128 == 0; + }; + Y_DO_NOT_OPTIMIZE_AWAY(predicate); + + TDuration total; + + for (ui64 i = 0; i < kIter; ++i) { + TSimpleTimer t; + ui64 acc = 0; + ui64 count = 0; + for (ui64 j = 0; j < data.size(); ++j) { + if (predicate(data[j])) { + acc += data[j]; + ++count; + } + } + + total += t.Get(); + + testResult(acc, count); + } + Cerr << "std::function: " << Sprintf("%.3f", total.SecondsFloat()) << "s" << Endl; + } + + { + auto data = genData(); + static auto predicate = [](ui64 a) { + return a % 128 == 0; + }; + Y_DO_NOT_OPTIMIZE_AWAY(predicate); + + TDuration total; + for (ui64 i = 0; i < kIter; ++i) { + TSimpleTimer t; + ui64 acc = 0; + ui64 count = 0; + for (ui64 j = 0; j < data.size(); ++j) { + if (predicate(data[j])) { + acc += data[j]; + ++count; + } + } + + total += t.Get(); + + testResult(acc, count); + } + Cerr << "lambda: " << Sprintf("%.3f", total.SecondsFloat()) << "s" << Endl; + } + } +} + +} +} |