aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorva-kuznecov <va-kuznecov@ydb.tech>2022-07-19 11:52:47 +0300
committerva-kuznecov <va-kuznecov@ydb.tech>2022-07-19 11:52:47 +0300
commit5ede2b35f38901718b55de9f9517cf21405c3630 (patch)
treeae1294238c23d3f9ff9b489b1a8fcc0bd88948a5
parentc64ee34d9c321fa796868ddca03dddb174704107 (diff)
downloadydb-5ede2b35f38901718b55de9f9517cf21405c3630.tar.gz
Add UT to detect data races in computation nodes
-rw-r--r--library/python/strings/__init__.py1
-rw-r--r--library/python/strings/strings.py7
-rw-r--r--ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp318
3 files changed, 283 insertions, 43 deletions
diff --git a/library/python/strings/__init__.py b/library/python/strings/__init__.py
index bd6bf6e7ce..c7da1463cf 100644
--- a/library/python/strings/__init__.py
+++ b/library/python/strings/__init__.py
@@ -4,6 +4,7 @@ from .strings import (
DEFAULT_ENCODING,
ENCODING_ERRORS_POLICY,
encode,
+ ensure_str_deep,
fs_encoding,
get_stream_encoding,
guess_default_encoding,
diff --git a/library/python/strings/strings.py b/library/python/strings/strings.py
index 1005b2fe97..916ae96742 100644
--- a/library/python/strings/strings.py
+++ b/library/python/strings/strings.py
@@ -82,10 +82,17 @@ def _convert_deep(x, enc, convert, relaxed=True):
raise TypeError('unsupported type')
+# Result as from six.ensure_text
def unicodize_deep(x, enc=DEFAULT_ENCODING, relaxed=True):
return _convert_deep(x, enc, to_unicode, relaxed)
+# Result as from six.ensure_str
+def ensure_str_deep(x, enc=DEFAULT_ENCODING, relaxed=True):
+ return _convert_deep(x, enc, six.ensure_str, relaxed)
+
+
+# Result as from six.ensure_binary
def stringize_deep(x, enc=DEFAULT_ENCODING, relaxed=True):
return _convert_deep(x, enc, to_str, relaxed)
diff --git a/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp b/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp
index 499375743f..34ae86148e 100644
--- a/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp
+++ b/ydb/library/yql/minikql/computation/mkql_computation_pattern_cache_ut.cpp
@@ -1,3 +1,5 @@
+#include "library/cpp/threading/local_executor/local_executor.h"
+#include "ydb/library/yql/minikql/comp_nodes/ut/mkql_computation_node_ut.h"
#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
#include <ydb/library/yql/minikql/mkql_type_builder.h>
#include <ydb/library/yql/minikql/mkql_node_serialization.h>
@@ -19,6 +21,96 @@ namespace NMiniKQL {
using namespace NYql::NUdf;
+TComputationNodeFactory GetListTestFactory() {
+ return [](TCallable& callable, const TComputationNodeFactoryContext& ctx) -> IComputationNode* {
+ if (callable.GetType()->GetName() == "TestList") {
+ return new TExternalComputationNode(ctx.Mutables);
+ }
+ return GetBuiltinFactory()(callable, ctx);
+ };
+}
+
+TRuntimeNode CreateFlow(TProgramBuilder& pb, size_t vecSize, TCallable *list = nullptr) {
+ if (list) {
+ return pb.ToFlow(TRuntimeNode(list, false));
+ } else {
+ std::vector<const TRuntimeNode> arr;
+ arr.reserve(vecSize);
+ for (ui64 i = 0; i < vecSize; ++i) {
+ arr.push_back(pb.NewDataLiteral<ui64>((i + 124515) % 6740234));
+ }
+ TArrayRef<const TRuntimeNode> arrRef(std::move(arr));
+ return pb.ToFlow(pb.AsList(arrRef));
+ }
+}
+
+TRuntimeNode CreateFilter(TProgramBuilder& pb, size_t vecSize, TCallable *list) {
+ TTimer t(TString(__func__) + ": ");
+ auto flow = CreateFlow(pb, vecSize, list);
+
+ auto handler = [&](TRuntimeNode node) -> TRuntimeNode {
+ return pb.AggrEquals(
+ pb.Mod(node, pb.NewOptional(pb.NewDataLiteral<ui64>(128))),
+ pb.NewOptional(pb.NewDataLiteral<ui64>(0)));
+ };
+ return pb.Filter(flow, handler);
+}
+
+TRuntimeNode CreateWideFilter(TProgramBuilder& pb, size_t vecSize, TCallable *list = nullptr) {
+ TTimer t(TString(__func__) + ": ");
+ auto flow = CreateFlow(pb, vecSize, list);
+
+ auto handler = [&](TRuntimeNode::TList node) -> TRuntimeNode {
+ return pb.AggrEquals(
+ pb.Mod(node.front(), pb.NewOptional(pb.NewDataLiteral<ui64>(128))),
+ pb.NewOptional(pb.NewDataLiteral<ui64>(0)));
+ };
+ return pb.NarrowMap(
+ pb.WideFilter(
+ pb.ExpandMap(flow,
+ [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; }
+ ),
+ handler
+ ),
+ [&](TRuntimeNode::TList items) -> TRuntimeNode { return items.front(); }
+ );
+}
+
+TRuntimeNode CreateCondense(TProgramBuilder& pb, size_t vecSize, TCallable *list = nullptr) {
+ TTimer t(TString(__func__) + ": ");
+ auto flow = CreateFlow(pb, vecSize, list);
+
+ auto switcherHandler = [&](TRuntimeNode, TRuntimeNode) -> TRuntimeNode {
+ return pb.NewDataLiteral<bool>(false);
+ };
+ auto updateHandler = [&](TRuntimeNode item, TRuntimeNode state) -> TRuntimeNode {
+ return pb.Add(item, state);
+ };
+ TRuntimeNode state = pb.NewDataLiteral<ui64>(0);
+ return pb.Condense(flow, state, switcherHandler, updateHandler);
+}
+
+/*
+TRuntimeNode CreateCombine(TProgramBuilder& pb, size_t vecSize) {
+ TTimer t(TString(__func__) + ": ");
+ std::vector<const TRuntimeNode> arr;
+ arr.reserve(vecSize);
+ for (ui64 i = 0; i < vecSize; ++i) {
+ arr.push_back(pb.NewDataLiteral<ui64>((i + 124515) % 6740234));
+ }
+ TArrayRef<const TRuntimeNode> arrRef(std::move(arr));
+ auto flow = pb.ToFlow(pb.AsList(arrRef));
+
+ TWideLambda keyExtractor = [&](TRuntimeNode item, TRuntimeNode state) -> TRuntimeNode {
+ return pb.Add(item, state);
+ };
+ TBinaryWideLambda init;
+ TTernaryWideLambda update;
+ TBinaryWideLambda finish;
+ return pb.WideLastCombiner(flow, keyExtractor , init, update, finish);
+ //(flow, state, switcherHandler, updateHandler);
+}
+*/
Y_UNIT_TEST_SUITE(ComputationPatternCache) {
Y_UNIT_TEST(Smoke) {
@@ -57,6 +149,7 @@ Y_UNIT_TEST_SUITE(ComputationPatternCache) {
auto timeProvider = CreateDeterministicTimeProvider(10000000);
TScopedAlloc graphAlloc;
auto patternEnv = cache.Find(key);
+ UNIT_ASSERT(patternEnv);
TComputationPatternOpts opts(patternEnv->Alloc.Ref(), patternEnv->Env, GetBuiltinFactory(),
functionRegistry.Get(), NUdf::EValidateMode::Lazy, NUdf::EValidatePolicy::Exception,
"OFF", EGraphPerProcess::Multi);
@@ -67,6 +160,94 @@ Y_UNIT_TEST_SUITE(ComputationPatternCache) {
}
}
+ template<class T>
+ void ParallelProgTest(T f, ui64 testResult) {
+ const ui32 cacheSize = 10;
+ const ui32 inFlight = 2;
+ TComputationPatternLRUCache cache(cacheSize);
+
+ auto functionRegistry = CreateFunctionRegistry(CreateBuiltinRegistry())->Clone();
+ std::shared_ptr<TPatternWithEnv> patternEnv = cache.CreateEnv();
+ TScopedAlloc &alloc = patternEnv->Alloc;
+ TTypeEnvironment &typeEnv = patternEnv->Env;
+
+ TProgramBuilder pb(typeEnv, *functionRegistry);
+
+ const auto listType = pb.NewListType(pb.NewDataType(NUdf::TDataType<ui64>::Id));
+ const auto list = TCallableBuilder(pb.GetTypeEnvironment(), "TestList", listType).Build();
+ const ui32 vecSize = 10'000;
+ auto progReturn = f(pb, vecSize, list);
+
+ TExploringNodeVisitor explorer;
+ explorer.Walk(progReturn.GetNode(), typeEnv);
+
+ TComputationPatternOpts opts(alloc.Ref(), typeEnv, GetListTestFactory(), functionRegistry.Get(),
+ NUdf::EValidateMode::Lazy, NUdf::EValidatePolicy::Exception, "OFF", EGraphPerProcess::Multi);
+
+ {
+ auto guard = patternEnv->Env.BindAllocator();
+ patternEnv->Pattern = MakeComputationPattern(explorer, progReturn, {list}, opts);
+ }
+ cache.EmplacePattern("a", patternEnv);
+ auto genData = [&]() {
+ std::vector<ui64> data;
+ data.reserve(vecSize);
+ for (ui64 i = 0; i < vecSize; ++i) {
+ data.push_back((i + 124515) % 6740234);
+ }
+ return data;
+ };
+
+ const auto data = genData();
+
+ NPar::LocalExecutor().RunAdditionalThreads(inFlight);
+ NPar::LocalExecutor().ExecRange([&](int /*id*/) {
+ for (ui32 i = 0; i < 100; ++i) {
+ auto key = "a";
+
+ auto randomProvider = CreateDeterministicRandomProvider(1);
+ auto timeProvider = CreateDeterministicTimeProvider(10000000);
+ TScopedAlloc graphAlloc;
+
+ auto patternEnv = cache.Find(key);
+
+ TComputationPatternOpts opts(patternEnv->Alloc.Ref(), patternEnv->Env, GetListTestFactory(),
+ functionRegistry.Get(), NUdf::EValidateMode::Lazy, NUdf::EValidatePolicy::Exception,
+ "OFF", EGraphPerProcess::Multi);
+
+ auto graph = patternEnv->Pattern->Clone(opts.ToComputationOptions(*randomProvider, *timeProvider, &graphAlloc.Ref()));
+ TUnboxedValue* items = nullptr;
+ graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), graph->GetHolderFactory().CreateDirectArrayHolder(data.size(), items));
+
+ std::transform(data.cbegin(), data.cend(), items,
+ [](const auto s) {
+ return ToValue<ui64>(s);
+ });
+
+ ui64 acc = 0;
+ TUnboxedValue v = graph->GetValue();
+ UNIT_ASSERT(v.HasValue());
+ while (v.HasValue()) {
+ acc += v.Get<ui64>();
+ v = graph->GetValue();
+ }
+ UNIT_ASSERT_VALUES_EQUAL(acc, testResult);
+ }
+ }, 0, inFlight, NPar::TLocalExecutor::WAIT_COMPLETE | NPar::TLocalExecutor::MED_PRIORITY);
+ }
+
+ Y_UNIT_TEST(ParallelFilter) {
+ ParallelProgTest(CreateFilter, 10098816);
+ }
+
+ Y_UNIT_TEST(ParallelWideFilter) {
+ ParallelProgTest(CreateWideFilter, 10098816);
+ }
+
+ Y_UNIT_TEST(ParallelCondense) {
+ ParallelProgTest(CreateCondense, 1295145000);
+ }
+
Y_UNIT_TEST(AddPerf) {
TTimer t("all: ");
TScopedAlloc alloc;
@@ -109,7 +290,6 @@ Y_UNIT_TEST_SUITE(ComputationPatternCache) {
std::function<ui64(ui64, ui64)> add = [](ui64 a, ui64 b) {
return a + b;
};
- Y_DO_NOT_OPTIMIZE_AWAY(add);
TTimer t("lambda: ");
ui64 acc = 0;
@@ -137,89 +317,141 @@ Y_UNIT_TEST_SUITE(ComputationPatternCache) {
}
}
-TRuntimeNode CreateFilter(TProgramBuilder& pb, size_t vecSize) {
- TTimer t(__func__);
- std::vector<const TRuntimeNode> arr;
- arr.reserve(vecSize);
- for (ui64 i = 0; i < vecSize; ++i) {
- arr.push_back(pb.NewDataLiteral<ui64>((i + 124515) % 6740234));
- }
- TArrayRef<const TRuntimeNode> arrRef(std::move(arr));
- auto arrayNode = pb.AsList(arrRef);
- auto handler = [&](TRuntimeNode node) -> TRuntimeNode {
- return pb.AggrEquals(
- pb.Mod(node, pb.NewOptional(pb.NewDataLiteral<ui64>(128))),
- pb.NewOptional(pb.NewDataLiteral<ui64>(0)));
- };
- return pb.Filter(arrayNode, handler);
-}
-
Y_UNIT_TEST(FilterPerf) {
- TTimer t("all: ");
TScopedAlloc alloc;
TTypeEnvironment typeEnv(alloc);
auto functionRegistry = CreateFunctionRegistry(CreateBuiltinRegistry())->Clone();
TProgramBuilder pb(typeEnv, *functionRegistry);
- const ui64 vecSize = 1'000'000;
- auto progReturn = CreateFilter(pb, vecSize);
+ const ui64 vecSize = 100'000'000;
+ Cerr << "vecSize: " << vecSize << Endl;
+ const auto listType = pb.NewListType(pb.NewDataType(NUdf::TDataType<ui64>::Id));
+ const auto list = TCallableBuilder(pb.GetTypeEnvironment(), "TestList", listType).Build();
+ auto progReturn = true ? CreateFilter(pb, vecSize, list) : CreateWideFilter(pb, vecSize, list);
TExploringNodeVisitor explorer;
explorer.Walk(progReturn.GetNode(), typeEnv);
- NUdf::EValidateMode validateMode = NUdf::EValidateMode::Lazy;
- TComputationPatternOpts opts(alloc.Ref(), typeEnv, GetBuiltinFactory(),
+ NUdf::EValidateMode validateMode = NUdf::EValidateMode::Max;
+ TComputationPatternOpts opts(alloc.Ref(), typeEnv, GetListTestFactory(),
functionRegistry.Get(), validateMode, NUdf::EValidatePolicy::Exception,
"OFF", EGraphPerProcess::Multi);
auto t_make_pattern = std::make_unique<TTimer>("make_pattern: ");
- auto pattern = MakeComputationPattern(explorer, progReturn, {}, opts);
+ auto pattern = MakeComputationPattern(explorer, progReturn, {list}, opts);
t_make_pattern.reset();
+
auto randomProvider = CreateDeterministicRandomProvider(1);
auto timeProvider = CreateDeterministicTimeProvider(10000000);
auto t_clone = std::make_unique<TTimer>("clone: ");
auto graph = pattern->Clone(opts.ToComputationOptions(*randomProvider, *timeProvider));
+
t_clone.reset();
+ auto genData = [&]() {
+ std::vector<ui64> data;
+ data.reserve(vecSize);
+ for (ui64 i = 0; i < vecSize; ++i) {
+ data.push_back((i + 124515) % 6740234);
+ }
+ return data;
+ };
+
+ auto testResult = [&] (ui64 acc, ui64 count) {
+ if (vecSize == 100'000'000) {
+ UNIT_ASSERT_VALUES_EQUAL(acc, 2614128386688);
+ UNIT_ASSERT_VALUES_EQUAL(count, 781263);
+ } else if (vecSize == 10'000'000) {
+ UNIT_ASSERT_VALUES_EQUAL(acc, 222145217664);
+ } else {
+ UNIT_FAIL("result is not checked");
+ }
+ };
+
+ ui64 kIter = 2;
{
- TTimer t("graph: ");
- TUnboxedValue acc;
- for (ui64 i = 0; i < 20; ++i) {
+ TDuration total;
+ for (ui64 i = 0; i < kIter; ++i) {
+ ui64 acc = 0;
+ ui64 count = 0;
+
+ auto graph = pattern->Clone(opts.ToComputationOptions(*randomProvider, *timeProvider));
+ auto data = genData();
+ TUnboxedValue* items = nullptr;
+ graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), graph->GetHolderFactory().CreateDirectArrayHolder(data.size(), items));
+
+ std::transform(data.cbegin(), data.cend(), items,
+ [](const auto s) {
+ return ToValue<ui64>(s);
+ });
+
+ TSimpleTimer t;
TUnboxedValue v = graph->GetValue();
- if (i == 0) {
- Cerr << "len: " << v.GetListLength() << Endl;
+ while (v.HasValue()) {
+ acc += v.Get<ui64>();
+ ++count;
+ v = graph->GetValue();
}
+ testResult(acc, count);
+
+ total += t.Get();
}
+ Cerr << "graph: " << Sprintf("%.3f", total.SecondsFloat()) << "s" << Endl;
}
+
{
- auto t_prepare = std::make_unique<TTimer>("prepare lambda: ");
- std::vector<ui64> data;
- data.reserve(vecSize);
- for (ui64 i = 0; i < vecSize; ++i) {
- data.push_back((i + 124515) % 6740234);
- }
+ auto data = genData();
std::function<bool(ui64)> predicate = [](ui64 a) {
return a % 128 == 0;
};
Y_DO_NOT_OPTIMIZE_AWAY(predicate);
- t_prepare.reset();
- TTimer t("lambda: ");
- for (ui64 i = 0; i < 20; ++i) {
- std::vector<ui64> acc;
+ TDuration total;
+
+ for (ui64 i = 0; i < kIter; ++i) {
+ TSimpleTimer t;
+ ui64 acc = 0;
+ ui64 count = 0;
for (ui64 j = 0; j < data.size(); ++j) {
if (predicate(data[j])) {
- acc.push_back(data[j]);
+ acc += data[j];
+ ++count;
}
}
- Y_DO_NOT_OPTIMIZE_AWAY(acc);
- if (i == 0) {
- Cerr << "len: " << acc.size() << Endl;
+ total += t.Get();
+
+ testResult(acc, count);
+ }
+ Cerr << "std::function: " << Sprintf("%.3f", total.SecondsFloat()) << "s" << Endl;
+ }
+
+ {
+ auto data = genData();
+ auto predicate = [](ui64 a) {
+ return a % 128 == 0;
+ };
+ Y_DO_NOT_OPTIMIZE_AWAY(predicate);
+
+ TDuration total;
+ for (ui64 i = 0; i < kIter; ++i) {
+ TSimpleTimer t;
+ ui64 acc = 0;
+ ui64 count = 0;
+ for (ui64 j = 0; j < data.size(); ++j) {
+ if (predicate(data[j])) {
+ acc += data[j];
+ ++count;
+ }
}
+
+ total += t.Get();
+
+ testResult(acc, count);
}
+ Cerr << "lambda: " << Sprintf("%.3f", total.SecondsFloat()) << "s" << Endl;
}
}
}