diff options
-rw-r--r-- | library/python/strings/__init__.py | 1 | ||||
-rw-r--r-- | library/python/strings/strings.py | 7 | ||||
-rw-r--r-- | ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp | 318 |
3 files changed, 283 insertions, 43 deletions
diff --git a/library/python/strings/__init__.py b/library/python/strings/__init__.py index bd6bf6e7ce..c7da1463cf 100644 --- a/library/python/strings/__init__.py +++ b/library/python/strings/__init__.py @@ -4,6 +4,7 @@ from .strings import ( DEFAULT_ENCODING, ENCODING_ERRORS_POLICY, encode, + ensure_str_deep, fs_encoding, get_stream_encoding, guess_default_encoding, diff --git a/library/python/strings/strings.py b/library/python/strings/strings.py index 1005b2fe97..916ae96742 100644 --- a/library/python/strings/strings.py +++ b/library/python/strings/strings.py @@ -82,10 +82,17 @@ def _convert_deep(x, enc, convert, relaxed=True): raise TypeError('unsupported type') +# Result as from six.ensure_text def unicodize_deep(x, enc=DEFAULT_ENCODING, relaxed=True): return _convert_deep(x, enc, to_unicode, relaxed) +# Result as from six.ensure_str +def ensure_str_deep(x, enc=DEFAULT_ENCODING, relaxed=True): + return _convert_deep(x, enc, six.ensure_str, relaxed) + + +# Result as from six.ensure_binary def stringize_deep(x, enc=DEFAULT_ENCODING, relaxed=True): return _convert_deep(x, enc, to_str, relaxed) diff --git a/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp b/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp index 499375743f..34ae86148e 100644 --- a/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp +++ b/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp @@ -1,3 +1,5 @@ +#include "library/cpp/threading/local_executor/local_executor.h" +#include "ydb/library/yql/minikql/comp_nodes/ut/mkql_computation_node_ut.h" #include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h> #include <ydb/library/yql/minikql/mkql_type_builder.h> #include <ydb/library/yql/minikql/mkql_node_serialization.h> @@ -19,6 +21,96 @@ namespace NMiniKQL { using namespace NYql::NUdf; +TComputationNodeFactory GetListTestFactory() { + return [](TCallable& callable, const TComputationNodeFactoryContext& ctx) -> IComputationNode* { + if (callable.GetType()->GetName() == "TestList") { + return new TExternalComputationNode(ctx.Mutables); + } + return GetBuiltinFactory()(callable, ctx); + }; +} + +TRuntimeNode CreateFlow(TProgramBuilder& pb, size_t vecSize, TCallable *list = nullptr) { + if (list) { + return pb.ToFlow(TRuntimeNode(list, false)); + } else { + std::vector<const TRuntimeNode> arr; + arr.reserve(vecSize); + for (ui64 i = 0; i < vecSize; ++i) { + arr.push_back(pb.NewDataLiteral<ui64>((i + 124515) % 6740234)); + } + TArrayRef<const TRuntimeNode> arrRef(std::move(arr)); + return pb.ToFlow(pb.AsList(arrRef)); + } +} + +TRuntimeNode CreateFilter(TProgramBuilder& pb, size_t vecSize, TCallable *list) { + TTimer t(TString(__func__) + ": "); + auto flow = CreateFlow(pb, vecSize, list); + + auto handler = [&](TRuntimeNode node) -> TRuntimeNode { + return pb.AggrEquals( + pb.Mod(node, pb.NewOptional(pb.NewDataLiteral<ui64>(128))), + pb.NewOptional(pb.NewDataLiteral<ui64>(0))); + }; + return pb.Filter(flow, handler); +} + +TRuntimeNode CreateWideFilter(TProgramBuilder& pb, size_t vecSize, TCallable *list = nullptr) { + TTimer t(TString(__func__) + ": "); + auto flow = CreateFlow(pb, vecSize, list); + + auto handler = [&](TRuntimeNode::TList node) -> TRuntimeNode { + return pb.AggrEquals( + pb.Mod(node.front(), pb.NewOptional(pb.NewDataLiteral<ui64>(128))), + pb.NewOptional(pb.NewDataLiteral<ui64>(0))); + }; + return pb.NarrowMap( + pb.WideFilter( + pb.ExpandMap(flow, + [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; } + ), + handler + ), + [&](TRuntimeNode::TList items) -> TRuntimeNode { return items.front(); } + ); +} + +TRuntimeNode CreateCondense(TProgramBuilder& pb, size_t vecSize, TCallable *list = nullptr) { + TTimer t(TString(__func__) + ": "); + auto flow = CreateFlow(pb, vecSize, list); + + auto switcherHandler = [&](TRuntimeNode, TRuntimeNode) -> TRuntimeNode { + return pb.NewDataLiteral<bool>(false); + }; + auto updateHandler = [&](TRuntimeNode item, TRuntimeNode state) -> TRuntimeNode { + return pb.Add(item, state); + }; + TRuntimeNode state = pb.NewDataLiteral<ui64>(0); + return pb.Condense(flow, state, switcherHandler, updateHandler); +} + +/* +TRuntimeNode CreateCombine(TProgramBuilder& pb, size_t vecSize) { + TTimer t(TString(__func__) + ": "); + std::vector<const TRuntimeNode> arr; + arr.reserve(vecSize); + for (ui64 i = 0; i < vecSize; ++i) { + arr.push_back(pb.NewDataLiteral<ui64>((i + 124515) % 6740234)); + } + TArrayRef<const TRuntimeNode> arrRef(std::move(arr)); + auto flow = pb.ToFlow(pb.AsList(arrRef)); + + TWideLambda keyExtractor = [&](TRuntimeNode item, TRuntimeNode state) -> TRuntimeNode { + return pb.Add(item, state); + }; + TBinaryWideLambda init; + TTernaryWideLambda update; + TBinaryWideLambda finish; + return pb.WideLastCombiner(flow, keyExtractor , init, update, finish); + //(flow, state, switcherHandler, updateHandler); +} +*/ Y_UNIT_TEST_SUITE(ComputationPatternCache) { Y_UNIT_TEST(Smoke) { @@ -57,6 +149,7 @@ Y_UNIT_TEST_SUITE(ComputationPatternCache) { auto timeProvider = CreateDeterministicTimeProvider(10000000); TScopedAlloc graphAlloc; auto patternEnv = cache.Find(key); + UNIT_ASSERT(patternEnv); TComputationPatternOpts opts(patternEnv->Alloc.Ref(), patternEnv->Env, GetBuiltinFactory(), functionRegistry.Get(), NUdf::EValidateMode::Lazy, NUdf::EValidatePolicy::Exception, "OFF", EGraphPerProcess::Multi); @@ -67,6 +160,94 @@ Y_UNIT_TEST_SUITE(ComputationPatternCache) { } } + template<class T> + void ParallelProgTest(T f, ui64 testResult) { + const ui32 cacheSize = 10; + const ui32 inFlight = 2; + TComputationPatternLRUCache cache(cacheSize); + + auto functionRegistry = CreateFunctionRegistry(CreateBuiltinRegistry())->Clone(); + std::shared_ptr<TPatternWithEnv> patternEnv = cache.CreateEnv(); + TScopedAlloc &alloc = patternEnv->Alloc; + TTypeEnvironment &typeEnv = patternEnv->Env; + + TProgramBuilder pb(typeEnv, *functionRegistry); + + const auto listType = pb.NewListType(pb.NewDataType(NUdf::TDataType<ui64>::Id)); + const auto list = TCallableBuilder(pb.GetTypeEnvironment(), "TestList", listType).Build(); + const ui32 vecSize = 10'000; + auto progReturn = f(pb, vecSize, list); + + TExploringNodeVisitor explorer; + explorer.Walk(progReturn.GetNode(), typeEnv); + + TComputationPatternOpts opts(alloc.Ref(), typeEnv, GetListTestFactory(), functionRegistry.Get(), + NUdf::EValidateMode::Lazy, NUdf::EValidatePolicy::Exception, "OFF", EGraphPerProcess::Multi); + + { + auto guard = patternEnv->Env.BindAllocator(); + patternEnv->Pattern = MakeComputationPattern(explorer, progReturn, {list}, opts); + } + cache.EmplacePattern("a", patternEnv); + auto genData = [&]() { + std::vector<ui64> data; + data.reserve(vecSize); + for (ui64 i = 0; i < vecSize; ++i) { + data.push_back((i + 124515) % 6740234); + } + return data; + }; + + const auto data = genData(); + + NPar::LocalExecutor().RunAdditionalThreads(inFlight); + NPar::LocalExecutor().ExecRange([&](int /*id*/) { + for (ui32 i = 0; i < 100; ++i) { + auto key = "a"; + + auto randomProvider = CreateDeterministicRandomProvider(1); + auto timeProvider = CreateDeterministicTimeProvider(10000000); + TScopedAlloc graphAlloc; + + auto patternEnv = cache.Find(key); + + TComputationPatternOpts opts(patternEnv->Alloc.Ref(), patternEnv->Env, GetListTestFactory(), + functionRegistry.Get(), NUdf::EValidateMode::Lazy, NUdf::EValidatePolicy::Exception, + "OFF", EGraphPerProcess::Multi); + + auto graph = patternEnv->Pattern->Clone(opts.ToComputationOptions(*randomProvider, *timeProvider, &graphAlloc.Ref())); + TUnboxedValue* items = nullptr; + graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), graph->GetHolderFactory().CreateDirectArrayHolder(data.size(), items)); + + std::transform(data.cbegin(), data.cend(), items, + [](const auto s) { + return ToValue<ui64>(s); + }); + + ui64 acc = 0; + TUnboxedValue v = graph->GetValue(); + UNIT_ASSERT(v.HasValue()); + while (v.HasValue()) { + acc += v.Get<ui64>(); + v = graph->GetValue(); + } + UNIT_ASSERT_VALUES_EQUAL(acc, testResult); + } + }, 0, inFlight, NPar::TLocalExecutor::WAIT_COMPLETE | NPar::TLocalExecutor::MED_PRIORITY); + } + + Y_UNIT_TEST(ParallelFilter) { + ParallelProgTest(CreateFilter, 10098816); + } + + Y_UNIT_TEST(ParallelWideFilter) { + ParallelProgTest(CreateWideFilter, 10098816); + } + + Y_UNIT_TEST(ParallelCondense) { + ParallelProgTest(CreateCondense, 1295145000); + } + Y_UNIT_TEST(AddPerf) { TTimer t("all: "); TScopedAlloc alloc; @@ -109,7 +290,6 @@ Y_UNIT_TEST_SUITE(ComputationPatternCache) { std::function<ui64(ui64, ui64)> add = [](ui64 a, ui64 b) { return a + b; }; - Y_DO_NOT_OPTIMIZE_AWAY(add); TTimer t("lambda: "); ui64 acc = 0; @@ -137,89 +317,141 @@ Y_UNIT_TEST_SUITE(ComputationPatternCache) { } } -TRuntimeNode CreateFilter(TProgramBuilder& pb, size_t vecSize) { - TTimer t(__func__); - std::vector<const TRuntimeNode> arr; - arr.reserve(vecSize); - for (ui64 i = 0; i < vecSize; ++i) { - arr.push_back(pb.NewDataLiteral<ui64>((i + 124515) % 6740234)); - } - TArrayRef<const TRuntimeNode> arrRef(std::move(arr)); - auto arrayNode = pb.AsList(arrRef); - auto handler = [&](TRuntimeNode node) -> TRuntimeNode { - return pb.AggrEquals( - pb.Mod(node, pb.NewOptional(pb.NewDataLiteral<ui64>(128))), - pb.NewOptional(pb.NewDataLiteral<ui64>(0))); - }; - return pb.Filter(arrayNode, handler); -} - Y_UNIT_TEST(FilterPerf) { - TTimer t("all: "); TScopedAlloc alloc; TTypeEnvironment typeEnv(alloc); auto functionRegistry = CreateFunctionRegistry(CreateBuiltinRegistry())->Clone(); TProgramBuilder pb(typeEnv, *functionRegistry); - const ui64 vecSize = 1'000'000; - auto progReturn = CreateFilter(pb, vecSize); + const ui64 vecSize = 100'000'000; + Cerr << "vecSize: " << vecSize << Endl; + const auto listType = pb.NewListType(pb.NewDataType(NUdf::TDataType<ui64>::Id)); + const auto list = TCallableBuilder(pb.GetTypeEnvironment(), "TestList", listType).Build(); + auto progReturn = true ? CreateFilter(pb, vecSize, list) : CreateWideFilter(pb, vecSize, list); TExploringNodeVisitor explorer; explorer.Walk(progReturn.GetNode(), typeEnv); - NUdf::EValidateMode validateMode = NUdf::EValidateMode::Lazy; - TComputationPatternOpts opts(alloc.Ref(), typeEnv, GetBuiltinFactory(), + NUdf::EValidateMode validateMode = NUdf::EValidateMode::Max; + TComputationPatternOpts opts(alloc.Ref(), typeEnv, GetListTestFactory(), functionRegistry.Get(), validateMode, NUdf::EValidatePolicy::Exception, "OFF", EGraphPerProcess::Multi); auto t_make_pattern = std::make_unique<TTimer>("make_pattern: "); - auto pattern = MakeComputationPattern(explorer, progReturn, {}, opts); + auto pattern = MakeComputationPattern(explorer, progReturn, {list}, opts); t_make_pattern.reset(); + auto randomProvider = CreateDeterministicRandomProvider(1); auto timeProvider = CreateDeterministicTimeProvider(10000000); auto t_clone = std::make_unique<TTimer>("clone: "); auto graph = pattern->Clone(opts.ToComputationOptions(*randomProvider, *timeProvider)); + t_clone.reset(); + auto genData = [&]() { + std::vector<ui64> data; + data.reserve(vecSize); + for (ui64 i = 0; i < vecSize; ++i) { + data.push_back((i + 124515) % 6740234); + } + return data; + }; + + auto testResult = [&] (ui64 acc, ui64 count) { + if (vecSize == 100'000'000) { + UNIT_ASSERT_VALUES_EQUAL(acc, 2614128386688); + UNIT_ASSERT_VALUES_EQUAL(count, 781263); + } else if (vecSize == 10'000'000) { + UNIT_ASSERT_VALUES_EQUAL(acc, 222145217664); + } else { + UNIT_FAIL("result is not checked"); + } + }; + + ui64 kIter = 2; { - TTimer t("graph: "); - TUnboxedValue acc; - for (ui64 i = 0; i < 20; ++i) { + TDuration total; + for (ui64 i = 0; i < kIter; ++i) { + ui64 acc = 0; + ui64 count = 0; + + auto graph = pattern->Clone(opts.ToComputationOptions(*randomProvider, *timeProvider)); + auto data = genData(); + TUnboxedValue* items = nullptr; + graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), graph->GetHolderFactory().CreateDirectArrayHolder(data.size(), items)); + + std::transform(data.cbegin(), data.cend(), items, + [](const auto s) { + return ToValue<ui64>(s); + }); + + TSimpleTimer t; TUnboxedValue v = graph->GetValue(); - if (i == 0) { - Cerr << "len: " << v.GetListLength() << Endl; + while (v.HasValue()) { + acc += v.Get<ui64>(); + ++count; + v = graph->GetValue(); } + testResult(acc, count); + + total += t.Get(); } + Cerr << "graph: " << Sprintf("%.3f", total.SecondsFloat()) << "s" << Endl; } + { - auto t_prepare = std::make_unique<TTimer>("prepare lambda: "); - std::vector<ui64> data; - data.reserve(vecSize); - for (ui64 i = 0; i < vecSize; ++i) { - data.push_back((i + 124515) % 6740234); - } + auto data = genData(); std::function<bool(ui64)> predicate = [](ui64 a) { return a % 128 == 0; }; Y_DO_NOT_OPTIMIZE_AWAY(predicate); - t_prepare.reset(); - TTimer t("lambda: "); - for (ui64 i = 0; i < 20; ++i) { - std::vector<ui64> acc; + TDuration total; + + for (ui64 i = 0; i < kIter; ++i) { + TSimpleTimer t; + ui64 acc = 0; + ui64 count = 0; for (ui64 j = 0; j < data.size(); ++j) { if (predicate(data[j])) { - acc.push_back(data[j]); + acc += data[j]; + ++count; } } - Y_DO_NOT_OPTIMIZE_AWAY(acc); - if (i == 0) { - Cerr << "len: " << acc.size() << Endl; + total += t.Get(); + + testResult(acc, count); + } + Cerr << "std::function: " << Sprintf("%.3f", total.SecondsFloat()) << "s" << Endl; + } + + { + auto data = genData(); + auto predicate = [](ui64 a) { + return a % 128 == 0; + }; + Y_DO_NOT_OPTIMIZE_AWAY(predicate); + + TDuration total; + for (ui64 i = 0; i < kIter; ++i) { + TSimpleTimer t; + ui64 acc = 0; + ui64 count = 0; + for (ui64 j = 0; j < data.size(); ++j) { + if (predicate(data[j])) { + acc += data[j]; + ++count; + } } + + total += t.Get(); + + testResult(acc, count); } + Cerr << "lambda: " << Sprintf("%.3f", total.SecondsFloat()) << "s" << Endl; } } } |