aboutsummaryrefslogtreecommitdiffstats
path: root/yql/essentials/minikql/computation/mkql_computation_pattern_cache_ut.cpp
diff options
context:
space:
mode:
authorvvvv <vvvv@yandex-team.com>2024-11-07 04:19:26 +0300
committervvvv <vvvv@yandex-team.com>2024-11-07 04:29:50 +0300
commit2661be00f3bc47590fda9218bf0386d6355c8c88 (patch)
tree3d316c07519191283d31c5f537efc6aabb42a2f0 /yql/essentials/minikql/computation/mkql_computation_pattern_cache_ut.cpp
parentcf2a23963ac10add28c50cc114fbf48953eca5aa (diff)
downloadydb-2661be00f3bc47590fda9218bf0386d6355c8c88.tar.gz
Moved yql/minikql YQL-19206
init [nodiff:caesar] commit_hash:d1182ef7d430ccf7e4d37ed933c7126d7bd5d6e4
Diffstat (limited to 'yql/essentials/minikql/computation/mkql_computation_pattern_cache_ut.cpp')
-rw-r--r--yql/essentials/minikql/computation/mkql_computation_pattern_cache_ut.cpp883
1 files changed, 883 insertions, 0 deletions
diff --git a/yql/essentials/minikql/computation/mkql_computation_pattern_cache_ut.cpp b/yql/essentials/minikql/computation/mkql_computation_pattern_cache_ut.cpp
new file mode 100644
index 0000000000..0a28de9162
--- /dev/null
+++ b/yql/essentials/minikql/computation/mkql_computation_pattern_cache_ut.cpp
@@ -0,0 +1,883 @@
+#include "library/cpp/threading/local_executor/local_executor.h"
+#include "yql/essentials/minikql/comp_nodes/ut/mkql_computation_node_ut.h"
+#include <yql/essentials/minikql/computation/mkql_computation_node_holders.h>
+#include <yql/essentials/minikql/computation/mkql_computation_pattern_cache.h>
+#include <yql/essentials/minikql/mkql_type_builder.h>
+#include <yql/essentials/minikql/mkql_node_serialization.h>
+#include <yql/essentials/utils/yql_panic.h>
+#include <yql/essentials/minikql/mkql_node.h>
+#include <yql/essentials/minikql/mkql_program_builder.h>
+#include <yql/essentials/minikql/computation/mkql_computation_node.h>
+#include <yql/essentials/minikql/computation/mkql_computation_node_impl.h>
+#include <yql/essentials/minikql/invoke_builtins/mkql_builtins.h>
+#include <yql/essentials/minikql/comp_nodes/mkql_factories.h>
+#include <contrib/ydb/library/yql/dq/proto/dq_tasks.pb.h>
+
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <util/datetime/cputimer.h>
+
+namespace NKikimr {
+namespace NMiniKQL {
+
+using namespace NYql::NUdf;
+
+TComputationNodeFactory GetListTestFactory() {
+ return [](TCallable& callable, const TComputationNodeFactoryContext& ctx) -> IComputationNode* {
+ if (callable.GetType()->GetName() == "TestList") {
+ return new TExternalComputationNode(ctx.Mutables);
+ }
+ return GetBuiltinFactory()(callable, ctx);
+ };
+}
+
+TRuntimeNode CreateFlow(TProgramBuilder& pb, size_t vecSize, TCallable* list) {
+ if (list) {
+ return pb.ToFlow(TRuntimeNode(list, false));
+ } else {
+ std::vector<const TRuntimeNode> arr;
+ arr.reserve(vecSize);
+ for (ui64 i = 0; i < vecSize; ++i) {
+ arr.push_back(pb.NewDataLiteral<ui64>((i + 124515) % 6740234));
+ }
+ TArrayRef<const TRuntimeNode> arrRef(std::move(arr));
+ return pb.ToFlow(pb.AsList(arrRef));
+ }
+}
+
+template<bool Wide>
+TRuntimeNode CreateFilter(TProgramBuilder& pb, size_t vecSize, TCallable* list);
+
+template<>
+TRuntimeNode CreateFilter<false>(TProgramBuilder& pb, size_t vecSize, TCallable* list) {
+ TTimer t(TString(__func__) + ": ");
+ auto flow = CreateFlow(pb, vecSize, list);
+
+ auto handler = [&](TRuntimeNode node) -> TRuntimeNode {
+ return pb.AggrEquals(
+ pb.Mod(node, pb.NewOptional(pb.NewDataLiteral<ui64>(128))),
+ pb.NewOptional(pb.NewDataLiteral<ui64>(0)));
+ };
+ return pb.Filter(flow, handler);
+}
+
+template<>
+TRuntimeNode CreateFilter<true>(TProgramBuilder& pb, size_t vecSize, TCallable* list) {
+ TTimer t(TString(__func__) + ": ");
+ auto flow = CreateFlow(pb, vecSize, list);
+
+ auto handler = [&](TRuntimeNode::TList node) -> TRuntimeNode {
+ return pb.AggrEquals(
+ pb.Mod(node.front(), pb.NewOptional(pb.NewDataLiteral<ui64>(128))),
+ pb.NewOptional(pb.NewDataLiteral<ui64>(0)));
+ };
+ return pb.NarrowMap(
+ pb.WideFilter(
+ pb.ExpandMap(flow,
+ [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; }
+ ),
+ handler
+ ),
+ [&](TRuntimeNode::TList items) -> TRuntimeNode { return items.front(); }
+ );
+}
+
+template<bool Wide>
+TRuntimeNode CreateMap(TProgramBuilder& pb, size_t vecSize, TCallable* list = nullptr);
+
+template<>
+TRuntimeNode CreateMap<false>(TProgramBuilder& pb, size_t vecSize, TCallable* list) {
+ TTimer t(TString(__func__) + ": ");
+ auto flow = CreateFlow(pb, vecSize, list);
+
+ auto handler = [&](TRuntimeNode node) -> TRuntimeNode {
+ return pb.AggrEquals(
+ pb.Mod(node, pb.NewOptional(pb.NewDataLiteral<ui64>(128))),
+ pb.NewOptional(pb.NewDataLiteral<ui64>(0)));
+ };
+ return pb.Map(flow, handler);
+}
+
+template<>
+TRuntimeNode CreateMap<true>(TProgramBuilder& pb, size_t vecSize, TCallable* list) {
+ TTimer t(TString(__func__) + ": ");
+ auto flow = CreateFlow(pb, vecSize, list);
+
+ auto handler = [&](TRuntimeNode::TList node) -> TRuntimeNode::TList {
+ return {pb.AggrEquals(
+ pb.Mod(node.front(), pb.NewOptional(pb.NewDataLiteral<ui64>(128))),
+ pb.NewOptional(pb.NewDataLiteral<ui64>(0)))};
+ };
+ return pb.NarrowMap(
+ pb.WideMap(
+ pb.ExpandMap(flow,
+ [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; }
+ ),
+ handler
+ ),
+ [&](TRuntimeNode::TList items) -> TRuntimeNode { return items.front(); }
+ );
+}
+
+template<bool Wide>
+TRuntimeNode CreateCondense(TProgramBuilder& pb, size_t vecSize, TCallable* list = nullptr);
+
+template<>
+TRuntimeNode CreateCondense<false>(TProgramBuilder& pb, size_t vecSize, TCallable* list) {
+ TTimer t(TString(__func__) + ": ");
+ auto flow = CreateFlow(pb, vecSize, list);
+
+ auto switcherHandler = [&](TRuntimeNode, TRuntimeNode) -> TRuntimeNode {
+ return pb.NewDataLiteral<bool>(false);
+ };
+ auto updateHandler = [&](TRuntimeNode item, TRuntimeNode state) -> TRuntimeNode {
+ return pb.Add(item, state);
+ };
+ TRuntimeNode state = pb.NewDataLiteral<ui64>(0);
+ return pb.Condense(flow, state, switcherHandler, updateHandler);
+}
+
+template<>
+TRuntimeNode CreateCondense<true>(TProgramBuilder& pb, size_t vecSize, TCallable* list) {
+ TTimer t(TString(__func__) + ": ");
+ auto flow = CreateFlow(pb, vecSize, list);
+
+ TRuntimeNode state = pb.NewDataLiteral<ui64>(0);
+ return pb.NarrowMap(
+ pb.WideCondense1(
+ /* stream */
+ pb.ExpandMap(flow,
+ [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; }
+ ),
+ /* init */
+ [&](TRuntimeNode::TList item) -> TRuntimeNode::TList { return {item}; },
+ /* switcher */
+ [&](TRuntimeNode::TList, TRuntimeNode::TList) -> TRuntimeNode { return pb.NewDataLiteral<bool>(false); },
+ /* handler */
+ [&](TRuntimeNode::TList item, TRuntimeNode::TList state) -> TRuntimeNode::TList { return {pb.Add(item.front(), state.front())}; }
+ ),
+ [&](TRuntimeNode::TList items) -> TRuntimeNode { return items.front(); }
+ );
+}
+
+template<bool Wide>
+TRuntimeNode CreateChopper(TProgramBuilder& pb, size_t vecSize, TCallable* list = nullptr);
+
+template<>
+TRuntimeNode CreateChopper<false>(TProgramBuilder& pb, size_t vecSize, TCallable* list) {
+ TTimer t(TString(__func__) + ": ");
+ auto flow = CreateFlow(pb, vecSize, list);
+
+ return pb.Chopper(flow,
+ /* keyExtractor */
+ [&](TRuntimeNode item) -> TRuntimeNode { return item; },
+ /* groupSwitch */
+ [&](TRuntimeNode key, TRuntimeNode /*item*/) -> TRuntimeNode {
+ return pb.AggrEquals(
+ pb.Mod(key, pb.NewOptional(pb.NewDataLiteral<ui64>(128))),
+ pb.NewOptional(pb.NewDataLiteral<ui64>(0)));
+ },
+ /* groupHandler */
+ [&](TRuntimeNode, TRuntimeNode list) -> TRuntimeNode { return list; }
+ );
+};
+
+template<>
+TRuntimeNode CreateChopper<true>(TProgramBuilder& pb, size_t vecSize, TCallable* list) {
+ TTimer t(TString(__func__) + ": ");
+ auto flow = CreateFlow(pb, vecSize, list);
+
+ return pb.NarrowMap(
+ pb.WideChopper(
+ /* stream */
+ pb.ExpandMap(flow,
+ [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; }
+ ),
+ /* keyExtractor */
+ [&](TRuntimeNode::TList item) -> TRuntimeNode::TList { return item; },
+ /* groupSwitch */
+ [&](TRuntimeNode::TList key, TRuntimeNode::TList /*item*/) -> TRuntimeNode {
+ return pb.AggrEquals(
+ pb.Mod(key.front(), pb.NewOptional(pb.NewDataLiteral<ui64>(128))),
+ pb.NewOptional(pb.NewDataLiteral<ui64>(0)));
+ },
+ /* groupHandler */
+ [&](TRuntimeNode::TList, TRuntimeNode input) { return pb.WideMap(input, [](TRuntimeNode::TList items) { return items; }); }
+ ),
+ [&](TRuntimeNode::TList items) -> TRuntimeNode { return items.front(); }
+ );
+};
+
+template<bool Wide>
+TRuntimeNode CreateCombine(TProgramBuilder& pb, size_t vecSize, TCallable* list = nullptr);
+
+template<>
+TRuntimeNode CreateCombine<false>(TProgramBuilder& pb, size_t vecSize, TCallable* list) {
+ TTimer t(TString(__func__) + ": ");
+ auto flow = CreateFlow(pb, vecSize, list);
+
+ return pb.CombineCore(
+ /* stream */
+ flow,
+ /* keyExtractor */
+ [&] (TRuntimeNode /*item*/) -> TRuntimeNode { return pb.NewDataLiteral<ui64>(0);},
+ /* init */
+ [&] (TRuntimeNode /* key */, TRuntimeNode item) -> TRuntimeNode { return item; },
+ /* update */
+ [&] (TRuntimeNode /* key */, TRuntimeNode item, TRuntimeNode state) -> TRuntimeNode { return pb.Add(item, state); },
+ /* finish */
+ [&] (TRuntimeNode /* key */, TRuntimeNode item) -> TRuntimeNode { return pb.NewOptional(item); },
+ /* memlimit */
+ 64 << 20
+ );
+};
+
+template<>
+TRuntimeNode CreateCombine<true>(TProgramBuilder& pb, size_t vecSize, TCallable* list) {
+
+ TTimer t(TString(__func__) + ": ");
+ auto flow = CreateFlow(pb, vecSize, list);
+
+ return pb.NarrowMap(
+ pb.WideCombiner(
+ /* stream */
+ pb.ExpandMap(flow,
+ [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; }
+ ),
+ /* memlimit */
+ 64 << 20,
+ /* keyExtractor */
+ [&] (TRuntimeNode::TList /*item*/) -> TRuntimeNode::TList { return {pb.NewDataLiteral<ui64>(0)};},
+ /* init */
+ [&] (TRuntimeNode::TList /* key */, TRuntimeNode::TList item) -> TRuntimeNode::TList { return {item}; },
+ /* update */
+ [&] (TRuntimeNode::TList /* key */, TRuntimeNode::TList item, TRuntimeNode::TList state) -> TRuntimeNode::TList {
+ return {pb.Add(item.front(), state.front())};
+ },
+ /* finish */
+ [&] (TRuntimeNode::TList /* key */, TRuntimeNode::TList item) -> TRuntimeNode::TList { return {pb.NewOptional(item.front())}; }
+ ),
+ [&](TRuntimeNode::TList items) -> TRuntimeNode { return items.front(); }
+ );
+};
+
+template<bool Wide>
+TRuntimeNode CreateChain1Map(TProgramBuilder& pb, size_t vecSize, TCallable* list = nullptr);
+
+template<>
+TRuntimeNode CreateChain1Map<false>(TProgramBuilder& pb, size_t vecSize, TCallable* list) {
+ TTimer t(TString(__func__) + ": ");
+ auto flow = CreateFlow(pb, vecSize, list);
+
+ return pb.Chain1Map(
+ flow,
+ /* init */
+ [&] (TRuntimeNode item) -> TRuntimeNode { return item; },
+ /* update */
+ [&] (TRuntimeNode item, TRuntimeNode state) -> TRuntimeNode { return pb.Add(item, state); }
+ );
+}
+
+template<>
+TRuntimeNode CreateChain1Map<true>(TProgramBuilder& pb, size_t vecSize, TCallable* list) {
+ TTimer t(TString(__func__) + ": ");
+ auto flow = CreateFlow(pb, vecSize, list);
+
+ return pb.NarrowMap(
+ pb.WideChain1Map(
+ /* stream */
+ pb.ExpandMap(flow,
+ [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; }
+ ),
+ /* init */
+ [&] (TRuntimeNode::TList item) -> TRuntimeNode::TList { return item; },
+ /* update */
+ [&] (TRuntimeNode::TList item, TRuntimeNode::TList state) -> TRuntimeNode::TList { return {pb.Add(item.front(), state.front())}; }
+ ),
+ [&] (TRuntimeNode::TList item) -> TRuntimeNode { return item.front(); }
+ );
+}
+
+template<bool Wide>
+TRuntimeNode CreateDiscard(TProgramBuilder& pb, size_t vecSize, TCallable* list = nullptr) {
+ TTimer t(TString(__func__) + ": ");
+ auto flow = CreateFlow(pb, vecSize, list);
+
+ if (Wide) {
+ return pb.Discard(
+ pb.ExpandMap(flow,
+ [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; }
+ )
+ );
+ } else {
+ return pb.Discard(flow);
+ }
+}
+
+template<bool Wide>
+TRuntimeNode CreateSkip(TProgramBuilder& pb, size_t vecSize, TCallable* list = nullptr) {
+ TTimer t(TString(__func__) + ": ");
+ auto flow = CreateFlow(pb, vecSize, list);
+
+ auto count = pb.NewDataLiteral<ui64>(500);
+ if (Wide) {
+ return pb.NarrowMap(
+ pb.Skip(
+ pb.ExpandMap(flow,
+ [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; }
+ ),
+ count
+ ),
+ [&] (TRuntimeNode::TList item) -> TRuntimeNode { return item.front(); }
+ );
+ } else {
+ return pb.Skip(flow, count);
+ }
+}
+
+template<bool Flow>
+TRuntimeNode CreateNarrowFlatMap(TProgramBuilder& pb, size_t vecSize, TCallable* list = nullptr) {
+ TTimer t(TString(__func__) + ": ");
+ auto flow = CreateFlow(pb, vecSize, list);
+
+ return pb.NarrowFlatMap(
+ pb.ExpandMap(flow,
+ [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; }
+ ),
+ [&] (TRuntimeNode::TList item) -> TRuntimeNode {
+ auto x = pb.NewOptional(item.front());
+ return Flow ? pb.ToFlow(x) : x;
+ }
+ );
+}
+
+TRuntimeNode CreateNarrowMultiMap(TProgramBuilder& pb, size_t vecSize, TCallable* list = nullptr) {
+ TTimer t(TString(__func__) + ": ");
+ auto flow = CreateFlow(pb, vecSize, list);
+
+ return pb.NarrowMultiMap(
+ pb.ExpandMap(flow,
+ [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; }
+ ),
+ [&] (TRuntimeNode::TList item) -> TRuntimeNode::TList {
+ return {item.front(), item.front()};
+ }
+ );
+}
+
+template<bool WithPayload>
+TRuntimeNode CreateSqueezeToSortedDict(TProgramBuilder& pb, size_t vecSize, TCallable* list = nullptr) {
+ TTimer t(TString(__func__) + ": ");
+ auto flow = CreateFlow(pb, vecSize, list);
+
+ return pb.FlatMap(
+ pb.NarrowSqueezeToSortedDict(
+ pb.ExpandMap(flow,
+ [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; }
+ ),
+ /*all*/ false,
+ /*keySelector*/ [&](TRuntimeNode::TList item) { return item.front(); },
+ /*payloadSelector*/ [&](TRuntimeNode::TList ) { return WithPayload ? pb.NewDataLiteral<ui64>(0) : pb.NewVoid(); }
+ ),
+ [&] (TRuntimeNode item) { return pb.DictKeys(item); }
+ );
+}
+
+TRuntimeNode CreateMapJoin(TProgramBuilder& pb, size_t vecSize, TCallable* list = nullptr) {
+ TTimer t(TString(__func__) + ": ");
+ auto flow = CreateFlow(pb, vecSize, list);
+
+ const auto tupleType = pb.NewTupleType({
+ pb.NewDataType(NUdf::TDataType<ui32>::Id),
+ pb.NewDataType(NUdf::TDataType<ui64>::Id)
+ });
+
+ const auto list1 = pb.Map(flow, [&] (TRuntimeNode item) {
+ return pb.NewTuple({pb.Mod(item, pb.NewDataLiteral<ui64>(1000)), pb.NewDataLiteral<ui32>(1)});
+ });
+
+ const auto list2 = pb.NewList(tupleType, {
+ pb.NewTuple({pb.NewDataLiteral<ui32>(1), pb.NewDataLiteral<ui64>(3 * 1000)}),
+ pb.NewTuple({pb.NewDataLiteral<ui32>(2), pb.NewDataLiteral<ui64>(4 * 1000)}),
+ pb.NewTuple({pb.NewDataLiteral<ui32>(3), pb.NewDataLiteral<ui64>(5 * 1000)}),
+ });
+
+ const auto dict = pb.ToSortedDict(list2, false,
+ [&](TRuntimeNode item) {
+ return pb.Nth(item, 0);
+ },
+ [&](TRuntimeNode item) {
+ return pb.NewTuple({pb.Nth(item, 1U)});
+ });
+
+ const auto resultType = pb.NewFlowType(pb.NewMultiType({
+ pb.NewDataType(NUdf::TDataType<char*>::Id),
+ pb.NewDataType(NUdf::TDataType<char*>::Id),
+ }));
+
+ return pb.Map(
+ pb.NarrowMap(pb.MapJoinCore(
+ pb.ExpandMap(list1, [&] (TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0), pb.Nth(item, 1)}; }),
+ dict,
+ EJoinKind::Inner,
+ {0U},
+ {1U, 0U},
+ {0U, 1U},
+ resultType
+ ),
+ [&](TRuntimeNode::TList items) { return pb.NewTuple(items); }
+ ),
+ [&](TRuntimeNode item) { return pb.Nth(item, 1); }
+ );
+}
+
+Y_UNIT_TEST_SUITE(ComputationGraphDataRace) {
+ template<class T>
+ void ParallelProgTest(T f, bool useLLVM, ui64 testResult, size_t vecSize = 10'000) {
+ TTimer t("total: ");
+ const ui32 cacheSizeInBytes = 104857600; // 100 MiB
+ const ui32 inFlight = 7;
+ TComputationPatternLRUCache cache({cacheSizeInBytes, cacheSizeInBytes});
+
+ auto functionRegistry = CreateFunctionRegistry(CreateBuiltinRegistry())->Clone();
+ auto entry = std::make_shared<TPatternCacheEntry>();
+ TScopedAlloc& alloc = entry->Alloc;
+ TTypeEnvironment& typeEnv = entry->Env;
+
+ TProgramBuilder pb(typeEnv, *functionRegistry);
+
+ const auto listType = pb.NewListType(pb.NewDataType(NUdf::TDataType<ui64>::Id));
+ const auto list = TCallableBuilder(pb.GetTypeEnvironment(), "TestList", listType).Build();
+ TRuntimeNode progReturn;
+ with_lock(alloc) {
+ progReturn = f(pb, vecSize, list);
+ }
+
+ TExploringNodeVisitor explorer;
+ explorer.Walk(progReturn.GetNode(), typeEnv);
+
+ TComputationPatternOpts opts(alloc.Ref(), typeEnv, GetListTestFactory(), functionRegistry.Get(),
+ NUdf::EValidateMode::Lazy, NUdf::EValidatePolicy::Exception, useLLVM ? "" : "OFF", EGraphPerProcess::Multi);
+
+ {
+ auto guard = entry->Env.BindAllocator();
+ entry->Pattern = MakeComputationPattern(explorer, progReturn, {list}, opts);
+ }
+ cache.EmplacePattern("a", entry);
+ auto genData = [&]() {
+ std::vector<ui64> data;
+ data.reserve(vecSize);
+ for (ui64 i = 0; i < vecSize; ++i) {
+ data.push_back((i + 124515) % 6740234);
+ }
+ return data;
+ };
+
+ const auto data = genData();
+
+ std::vector<std::vector<ui64>> results(inFlight);
+
+ NPar::LocalExecutor().RunAdditionalThreads(inFlight);
+ NPar::LocalExecutor().ExecRange([&](int id) {
+ for (ui32 i = 0; i < 100; ++i) {
+ auto key = "a";
+
+ auto randomProvider = CreateDeterministicRandomProvider(1);
+ auto timeProvider = CreateDeterministicTimeProvider(10000000);
+ TScopedAlloc graphAlloc(__LOCATION__);
+
+ auto entry = cache.Find(key);
+
+ TComputationPatternOpts opts(entry->Alloc.Ref(), entry->Env, GetListTestFactory(),
+ functionRegistry.Get(), NUdf::EValidateMode::Lazy, NUdf::EValidatePolicy::Exception,
+ useLLVM ? "" : "OFF", EGraphPerProcess::Multi);
+
+ auto graph = entry->Pattern->Clone(opts.ToComputationOptions(*randomProvider, *timeProvider, &graphAlloc.Ref()));
+ TUnboxedValue* items = nullptr;
+ graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), graph->GetHolderFactory().CreateDirectArrayHolder(data.size(), items));
+
+ std::transform(data.cbegin(), data.cend(), items,
+ [](const auto s) {
+ return ToValue<ui64>(s);
+ });
+
+ ui64 acc = 0;
+ TUnboxedValue v = graph->GetValue();
+ while (v.HasValue()) {
+ acc += v.Get<ui64>();
+ v = graph->GetValue();
+ }
+ results[id].push_back(acc);
+ }
+ }, 0, inFlight, NPar::TLocalExecutor::WAIT_COMPLETE | NPar::TLocalExecutor::MED_PRIORITY);
+
+ for (auto threadResults : results) {
+ for (auto res : threadResults) {
+ UNIT_ASSERT_VALUES_EQUAL(res, testResult);
+ }
+ }
+ }
+
+ Y_UNIT_TEST_QUAD(Filter, Wide, UseLLVM) {
+ ParallelProgTest(CreateFilter<Wide>, UseLLVM, 10098816);
+ }
+
+ Y_UNIT_TEST_QUAD(Map, Wide, UseLLVM) {
+ ParallelProgTest(CreateMap<Wide>, UseLLVM, 78);
+ }
+
+ Y_UNIT_TEST_QUAD(Condense, Wide, UseLLVM) {
+ ParallelProgTest(CreateCondense<Wide>, UseLLVM, 1295145000);
+ }
+
+ Y_UNIT_TEST_QUAD(Chopper, Wide, UseLLVM) {
+ ParallelProgTest(CreateChopper<Wide>, UseLLVM, 1295145000);
+ }
+
+ Y_UNIT_TEST_QUAD(Combine, Wide, UseLLVM) {
+ ParallelProgTest(CreateCombine<Wide>, UseLLVM, 1295145000);
+ }
+
+ Y_UNIT_TEST_QUAD(Chain1Map, Wide, UseLLVM) {
+ ParallelProgTest(CreateChain1Map<Wide>, UseLLVM, 6393039240000);
+ }
+
+ Y_UNIT_TEST_QUAD(Discard, Wide, UseLLVM) {
+ ParallelProgTest(CreateDiscard<Wide>, UseLLVM, 0);
+ }
+
+ Y_UNIT_TEST_QUAD(Skip, Wide, UseLLVM) {
+ ParallelProgTest(CreateSkip<Wide>, UseLLVM, 1232762750);
+ }
+
+ Y_UNIT_TEST_QUAD(NarrowFlatMap, Flow, UseLLVM) {
+ ParallelProgTest(CreateNarrowFlatMap<Flow>, UseLLVM, 1295145000);
+ }
+
+ Y_UNIT_TEST_TWIN(NarrowMultiMap, UseLLVM) {
+ ParallelProgTest(CreateNarrowMultiMap, UseLLVM, 1295145000ull * 2);
+ }
+
+ Y_UNIT_TEST_QUAD(SqueezeToSortedDict, WithPayload, UseLLVM) {
+ ParallelProgTest(CreateSqueezeToSortedDict<WithPayload>, UseLLVM, 125014500, 1000);
+ }
+
+ Y_UNIT_TEST_TWIN(MapJoin, UseLLVM) {
+ ParallelProgTest(CreateMapJoin, UseLLVM, 120000, 10'000);
+ }
+}
+
+
+Y_UNIT_TEST_SUITE(ComputationPatternCache) {
+ Y_UNIT_TEST(Smoke) {
+ const ui32 cacheSize = 10'000'000;
+ const ui32 cacheItems = 10;
+ TComputationPatternLRUCache cache({cacheSize, cacheSize});
+
+ auto functionRegistry = CreateFunctionRegistry(CreateBuiltinRegistry())->Clone();
+
+ for (ui32 i = 0; i < cacheItems; ++i) {
+ auto entry = std::make_shared<TPatternCacheEntry>();
+ TScopedAlloc& alloc = entry->Alloc;
+ TTypeEnvironment& typeEnv = entry->Env;
+
+ TProgramBuilder pb(typeEnv, *functionRegistry);
+
+ TRuntimeNode progReturn;
+ with_lock(alloc) {
+ progReturn = pb.NewDataLiteral<NYql::NUdf::EDataSlot::String>("qwerty");
+ }
+
+ TExploringNodeVisitor explorer;
+ explorer.Walk(progReturn.GetNode(), typeEnv);
+
+ TComputationPatternOpts opts(alloc.Ref(), typeEnv, GetBuiltinFactory(),
+ functionRegistry.Get(), NUdf::EValidateMode::Lazy, NUdf::EValidatePolicy::Exception,
+ "OFF", EGraphPerProcess::Multi);
+
+ {
+ auto guard = entry->Env.BindAllocator();
+ entry->Pattern = MakeComputationPattern(explorer, progReturn, {}, opts);
+ }
+
+ // XXX: There is no way to accurately define how the entry's
+ // allocator obtains the memory pages: using the free ones from the
+ // global page pool or the ones directly requested by <mmap>. At the
+ // same time, it is the total allocated bytes (not just the number
+ // of the borrowed pages) that is a good estimate of the memory
+ // consumed by the pattern cache entry for real life workload.
+ // Hence, to avoid undesired cache flushes, release the free pages
+ // of the allocator of the particular entry.
+ alloc.ReleaseFreePages();
+ cache.EmplacePattern(TString((char)('a' + i)), entry);
+ }
+
+ for (ui32 i = 0; i < cacheItems; ++i) {
+ auto key = TString((char)('a' + i));
+
+ auto randomProvider = CreateDeterministicRandomProvider(1);
+ auto timeProvider = CreateDeterministicTimeProvider(10000000);
+ TScopedAlloc graphAlloc(__LOCATION__);
+ auto entry = cache.Find(key);
+ UNIT_ASSERT(entry);
+ TComputationPatternOpts opts(entry->Alloc.Ref(), entry->Env, GetBuiltinFactory(),
+ functionRegistry.Get(), NUdf::EValidateMode::Lazy, NUdf::EValidatePolicy::Exception,
+ "OFF", EGraphPerProcess::Multi);
+
+ auto graph = entry->Pattern->Clone(opts.ToComputationOptions(*randomProvider, *timeProvider, &graphAlloc.Ref()));
+ auto value = graph->GetValue();
+ UNIT_ASSERT_EQUAL(NYql::NUdf::TStringRef("qwerty"), value.AsStringRef());
+ }
+ }
+
+ Y_UNIT_TEST(DoubleNotifyPatternCompiled) {
+ class TMockComputationPattern final : public IComputationPattern {
+ public:
+ explicit TMockComputationPattern(size_t codeSize) : Size_(codeSize) {}
+
+ void Compile(TString, IStatsRegistry*) override { Compiled_ = true; }
+ bool IsCompiled() const override { return Compiled_; }
+ size_t CompiledCodeSize() const override { return Size_; }
+ void RemoveCompiledCode() override { Compiled_ = false; }
+ THolder<IComputationGraph> Clone(const TComputationOptsFull&) override { return {}; }
+ bool GetSuitableForCache() const override { return true; }
+
+ private:
+ const size_t Size_;
+ bool Compiled_ = false;
+ };
+
+ const TString key = "program";
+ const ui32 cacheSize = 2;
+ TComputationPatternLRUCache cache({cacheSize, cacheSize});
+
+ auto entry = std::make_shared<TPatternCacheEntry>();
+ entry->Pattern = MakeIntrusive<TMockComputationPattern>(1u);
+ cache.EmplacePattern(key, entry);
+
+ for (ui32 i = 0; i < cacheSize + 1; ++i) {
+ entry->Pattern->Compile("", nullptr);
+ cache.NotifyPatternCompiled(key);
+ }
+
+ entry = std::make_shared<TPatternCacheEntry>();
+ entry->Pattern = MakeIntrusive<TMockComputationPattern>(cacheSize + 1);
+ entry->Pattern->Compile("", nullptr);
+ cache.EmplacePattern(key, entry);
+ }
+
+ Y_UNIT_TEST(AddPerf) {
+ TTimer t("all: ");
+ TScopedAlloc alloc(__LOCATION__);
+ TTypeEnvironment typeEnv(alloc);
+
+ auto functionRegistry = CreateFunctionRegistry(CreateBuiltinRegistry())->Clone();
+
+ TProgramBuilder pb(typeEnv, *functionRegistry);
+ auto prog1 = pb.NewDataLiteral<ui64>(123591592ULL);
+ auto prog2 = pb.NewDataLiteral<ui64>(323591592ULL);
+ auto progReturn = pb.Add(prog1, prog2);
+
+ TExploringNodeVisitor explorer;
+ explorer.Walk(progReturn.GetNode(), typeEnv);
+
+ NUdf::EValidateMode validateMode = NUdf::EValidateMode::Lazy;
+ TComputationPatternOpts opts(alloc.Ref(), typeEnv, GetBuiltinFactory(),
+ functionRegistry.Get(), validateMode, NUdf::EValidatePolicy::Exception,
+ "OFF", EGraphPerProcess::Multi);
+
+ auto t_make_pattern = std::make_unique<TTimer>("make_pattern: ");
+ auto pattern = MakeComputationPattern(explorer, progReturn, {}, opts);
+ t_make_pattern.reset();
+ auto randomProvider = CreateDeterministicRandomProvider(1);
+ auto timeProvider = CreateDeterministicTimeProvider(10000000);
+
+ auto t_clone = std::make_unique<TTimer>("clone: ");
+ auto graph = pattern->Clone(opts.ToComputationOptions(*randomProvider, *timeProvider));
+ t_clone.reset();
+
+ const ui64 repeats = 100'000;
+
+ {
+ TTimer t("graph: ");
+ ui64 acc = 0;
+ for (ui64 i = 0; i < repeats; ++i) {
+ acc += graph->GetValue().Get<ui64>();
+ }
+ Y_DO_NOT_OPTIMIZE_AWAY(acc);
+ }
+ {
+ std::function<ui64(ui64, ui64)> add = [](ui64 a, ui64 b) {
+ return a + b;
+ };
+
+ TTimer t("lambda: ");
+ ui64 acc = 0;
+ for (ui64 i = 0; i < repeats; ++i) {
+ acc += add(123591592ULL, 323591592ULL);
+ }
+ Y_DO_NOT_OPTIMIZE_AWAY(acc);
+ }
+ {
+ std::function<TUnboxedValue(TUnboxedValue&, TUnboxedValue&)> add =
+ [](TUnboxedValue& a, TUnboxedValue& b) {
+ return TUnboxedValuePod(a.Get<ui64>() + b.Get<ui64>());
+ };
+ Y_DO_NOT_OPTIMIZE_AWAY(add);
+
+ TTimer t("lambda unboxed value: ");
+ TUnboxedValue acc(TUnboxedValuePod(0));
+ TUnboxedValue v1(TUnboxedValuePod(ui64{123591592UL}));
+ TUnboxedValue v2(TUnboxedValuePod(ui64{323591592UL}));
+ for (ui64 i = 0; i < repeats; ++i) {
+ auto r = add(v1, v2);
+ acc = add(r, acc);
+ }
+ Y_DO_NOT_OPTIMIZE_AWAY(acc.Get<ui64>());
+ }
+ }
+
+ Y_UNIT_TEST_TWIN(FilterPerf, Wide) {
+ TScopedAlloc alloc(__LOCATION__);
+ TTypeEnvironment typeEnv(alloc);
+
+ auto functionRegistry = CreateFunctionRegistry(CreateBuiltinRegistry())->Clone();
+
+ TProgramBuilder pb(typeEnv, *functionRegistry);
+ const ui64 vecSize = 100'000;
+ Cerr << "vecSize: " << vecSize << Endl;
+ const auto listType = pb.NewListType(pb.NewDataType(NUdf::TDataType<ui64>::Id));
+ const auto list = TCallableBuilder(pb.GetTypeEnvironment(), "TestList", listType).Build();
+ auto progReturn = CreateFilter<Wide>(pb, vecSize, list);
+
+ TExploringNodeVisitor explorer;
+ explorer.Walk(progReturn.GetNode(), typeEnv);
+
+ NUdf::EValidateMode validateMode = NUdf::EValidateMode::Max;
+ TComputationPatternOpts opts(alloc.Ref(), typeEnv, GetListTestFactory(),
+ functionRegistry.Get(), validateMode, NUdf::EValidatePolicy::Exception,
+ "OFF", EGraphPerProcess::Multi);
+
+ auto t_make_pattern = std::make_unique<TTimer>("make_pattern: ");
+ auto pattern = MakeComputationPattern(explorer, progReturn, {list}, opts);
+ t_make_pattern.reset();
+
+ auto randomProvider = CreateDeterministicRandomProvider(1);
+ auto timeProvider = CreateDeterministicTimeProvider(10000000);
+
+ auto t_clone = std::make_unique<TTimer>("clone: ");
+ auto graph = pattern->Clone(opts.ToComputationOptions(*randomProvider, *timeProvider));
+
+ t_clone.reset();
+
+ auto genData = [&]() {
+ std::vector<ui64> data;
+ data.reserve(vecSize);
+ for (ui64 i = 0; i < vecSize; ++i) {
+ data.push_back((i + 124515) % 6740234);
+ }
+ return data;
+ };
+
+ auto testResult = [&] (ui64 acc, ui64 count) {
+ if (vecSize == 100'000'000) {
+ UNIT_ASSERT_VALUES_EQUAL(acc, 2614128386688);
+ UNIT_ASSERT_VALUES_EQUAL(count, 781263);
+ } else if (vecSize == 10'000'000) {
+ UNIT_ASSERT_VALUES_EQUAL(acc, 222145217664);
+ } else if (vecSize == 100'000) {
+ UNIT_ASSERT_VALUES_EQUAL(acc, 136480896);
+ UNIT_ASSERT_VALUES_EQUAL(count, 782);
+ } else {
+ UNIT_FAIL("result is not checked");
+ }
+ };
+
+ ui64 kIter = 2;
+ {
+ TDuration total;
+ for (ui64 i = 0; i < kIter; ++i) {
+ ui64 acc = 0;
+ ui64 count = 0;
+
+ auto graph = pattern->Clone(opts.ToComputationOptions(*randomProvider, *timeProvider));
+ auto data = genData();
+ TUnboxedValue* items = nullptr;
+ graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), graph->GetHolderFactory().CreateDirectArrayHolder(data.size(), items));
+
+ std::transform(data.cbegin(), data.cend(), items,
+ [](const auto s) {
+ return ToValue<ui64>(s);
+ });
+
+ TSimpleTimer t;
+ TUnboxedValue v = graph->GetValue();
+ while (v.HasValue()) {
+ acc += v.Get<ui64>();
+ ++count;
+ v = graph->GetValue();
+ }
+ testResult(acc, count);
+
+ total += t.Get();
+ }
+ Cerr << "graph: " << Sprintf("%.3f", total.SecondsFloat()) << "s" << Endl;
+ }
+
+ {
+ auto data = genData();
+ std::function<bool(ui64)> predicate = [](ui64 a) {
+ return a % 128 == 0;
+ };
+ Y_DO_NOT_OPTIMIZE_AWAY(predicate);
+
+ TDuration total;
+
+ for (ui64 i = 0; i < kIter; ++i) {
+ TSimpleTimer t;
+ ui64 acc = 0;
+ ui64 count = 0;
+ for (ui64 j = 0; j < data.size(); ++j) {
+ if (predicate(data[j])) {
+ acc += data[j];
+ ++count;
+ }
+ }
+
+ total += t.Get();
+
+ testResult(acc, count);
+ }
+ Cerr << "std::function: " << Sprintf("%.3f", total.SecondsFloat()) << "s" << Endl;
+ }
+
+ {
+ auto data = genData();
+ static auto predicate = [](ui64 a) {
+ return a % 128 == 0;
+ };
+ Y_DO_NOT_OPTIMIZE_AWAY(predicate);
+
+ TDuration total;
+ for (ui64 i = 0; i < kIter; ++i) {
+ TSimpleTimer t;
+ ui64 acc = 0;
+ ui64 count = 0;
+ for (ui64 j = 0; j < data.size(); ++j) {
+ if (predicate(data[j])) {
+ acc += data[j];
+ ++count;
+ }
+ }
+
+ total += t.Get();
+
+ testResult(acc, count);
+ }
+ Cerr << "lambda: " << Sprintf("%.3f", total.SecondsFloat()) << "s" << Endl;
+ }
+ }
+}
+
+}
+}