aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorzverevgeny <zverevgeny@ydb.tech>2023-09-22 12:56:57 +0300
committerzverevgeny <zverevgeny@ydb.tech>2023-09-22 13:35:24 +0300
commitfc805a145e9dd59a78bca584f85fd5178a6f75ad (patch)
tree0c4b2c33f8724974cbed0fcb7bb937c3f040cfcc
parent91ddf2d8e0ffdd440b939e76a29b8c4c538f6c65 (diff)
downloadydb-fc805a145e9dd59a78bca584f85fd5178a6f75ad.tar.gz
YQL-16443 remove duplicate
-rw-r--r--ydb/library/yql/minikql/comp_nodes/ut/mkql_computation_node_ut.h6
-rw-r--r--ydb/library/yql/minikql/comp_nodes/ut/mkql_multihopping_ut.cpp66
2 files changed, 14 insertions, 58 deletions
diff --git a/ydb/library/yql/minikql/comp_nodes/ut/mkql_computation_node_ut.h b/ydb/library/yql/minikql/comp_nodes/ut/mkql_computation_node_ut.h
index 41eb68dd0c8..9b5ebdad7ce 100644
--- a/ydb/library/yql/minikql/comp_nodes/ut/mkql_computation_node_ut.h
+++ b/ydb/library/yql/minikql/comp_nodes/ut/mkql_computation_node_ut.h
@@ -83,6 +83,7 @@ template<bool UseLLVM>
struct TSetup {
TSetup(TComputationNodeFactory nodeFactory = {}, TVector<TUdfModuleInfo>&& modules = {})
: Alloc(__LOCATION__)
+ , StatsRegistry(CreateDefaultStatsRegistry())
{
NodeFactory = nodeFactory;
FunctionRegistry = CreateFunctionRegistry(CreateBuiltinRegistry());
@@ -107,7 +108,7 @@ struct TSetup {
Explorer.Walk(pgm.GetNode(), *Env);
TComputationPatternOpts opts(Alloc.Ref(), *Env, GetTestFactory(NodeFactory),
FunctionRegistry.Get(), NUdf::EValidateMode::None, NUdf::EValidatePolicy::Exception,
- UseLLVM ? "" : "OFF", EGraphPerProcess::Multi, nullptr, nullptr, nullptr);
+ UseLLVM ? "" : "OFF", EGraphPerProcess::Multi, StatsRegistry.Get(), nullptr, nullptr);
Pattern = MakeComputationPattern(Explorer, pgm, entryPoints, opts);
auto graph = Pattern->Clone(opts.ToComputationOptions(*RandomProvider, *TimeProvider));
Terminator.Reset(new TBindTerminator(graph->GetTerminator()));
@@ -119,12 +120,13 @@ struct TSetup {
Pattern.Reset();
}
+ TScopedAlloc Alloc;
TComputationNodeFactory NodeFactory;
TIntrusivePtr<IFunctionRegistry> FunctionRegistry;
TIntrusivePtr<IRandomProvider> RandomProvider;
TIntrusivePtr<ITimeProvider> TimeProvider;
+ IStatsRegistryPtr StatsRegistry;
- TScopedAlloc Alloc;
THolder<TTypeEnvironment> Env;
THolder<TProgramBuilder> PgmBuilder;
diff --git a/ydb/library/yql/minikql/comp_nodes/ut/mkql_multihopping_ut.cpp b/ydb/library/yql/minikql/comp_nodes/ut/mkql_multihopping_ut.cpp
index b47cfc222ef..bb2e22a00c3 100644
--- a/ydb/library/yql/minikql/comp_nodes/ut/mkql_multihopping_ut.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/ut/mkql_multihopping_ut.cpp
@@ -1,4 +1,5 @@
#include "../mkql_multihopping.h"
+#include "mkql_computation_node_ut.h"
#include <ydb/library/yql/minikql/mkql_node.h>
#include <ydb/library/yql/minikql/mkql_node_cast.h>
#include <ydb/library/yql/minikql/mkql_program_builder.h>
@@ -56,13 +57,7 @@ namespace {
return output;
}
- TIntrusivePtr<IRandomProvider> CreateRandomProvider() {
- return CreateDeterministicRandomProvider(1);
- }
- TIntrusivePtr<ITimeProvider> CreateTimeProvider() {
- return CreateDeterministicTimeProvider(10000000);
- }
TComputationNodeFactory GetAuxCallableFactory(TWatermark& watermark) {
return [&watermark](TCallable& callable, const TComputationNodeFactoryContext& ctx) -> IComputationNode* {
@@ -76,47 +71,6 @@ namespace {
};
}
- struct TSetup {
- TSetup(TScopedAlloc& alloc, TWatermark& watermark, bool watermarkMode = false)
- : FunctionRegistry(CreateFunctionRegistry(CreateBuiltinRegistry()))
- , RandomProvider(CreateRandomProvider())
- , TimeProvider(CreateTimeProvider())
- , StatsResgistry(CreateDefaultStatsRegistry())
- , Alloc(alloc)
- , Watermark(watermark)
- , WatermarkMode(watermarkMode)
- {
- Env.Reset(new TTypeEnvironment(Alloc));
- PgmBuilder.Reset(new TProgramBuilder(*Env, *FunctionRegistry));
- }
-
- THolder<IComputationGraph> BuildGraph(TRuntimeNode pgm, const std::vector<TNode*>& entryPoints = std::vector<TNode*>()) {
- Explorer.Walk(pgm.GetNode(), *Env);
- TComputationPatternOpts opts(Alloc.Ref(), *Env, GetAuxCallableFactory(Watermark),
- FunctionRegistry.Get(),
- NUdf::EValidateMode::None, NUdf::EValidatePolicy::Fail, "OFF", EGraphPerProcess::Multi,
- StatsResgistry.Get());
- Pattern = MakeComputationPattern(Explorer, pgm, entryPoints, opts);
- TComputationOptsFull compOpts = opts.ToComputationOptions(*RandomProvider, *TimeProvider);
- return Pattern->Clone(compOpts);
- }
-
- TIntrusivePtr<IFunctionRegistry> FunctionRegistry;
- TIntrusivePtr<IRandomProvider> RandomProvider;
- TIntrusivePtr<ITimeProvider> TimeProvider;
- IStatsRegistryPtr StatsResgistry;
-
- TScopedAlloc& Alloc;
- THolder<TTypeEnvironment> Env;
- THolder<TProgramBuilder> PgmBuilder;
-
- TExploringNodeVisitor Explorer;
- IComputationPattern::TPtr Pattern;
-
- TWatermark& Watermark;
- bool WatermarkMode;
- };
-
struct TStream : public NUdf::TBoxedValue {
TStream(const TUnboxedValueVector& items, std::function<void()> fetchCallback, bool* yield)
: Items(items)
@@ -143,7 +97,8 @@ namespace {
};
THolder<IComputationGraph> BuildGraph(
- TSetup& setup,
+ TSetup<false>& setup,
+ bool watermarkMode,
const std::vector<TInputItem> items,
std::function<void()> fetchCallback,
bool dataWatermarks,
@@ -218,7 +173,7 @@ namespace {
pgmBuilder.NewDataLiteral<NUdf::EDataSlot::Interval>(NUdf::TStringRef((const char*)&interval, sizeof(interval))), // interval
pgmBuilder.NewDataLiteral<NUdf::EDataSlot::Interval>(NUdf::TStringRef((const char*)&delay, sizeof(delay))), // delay
pgmBuilder.NewDataLiteral<bool>(dataWatermarks),
- pgmBuilder.NewDataLiteral<bool>(setup.WatermarkMode)
+ pgmBuilder.NewDataLiteral<bool>(watermarkMode)
);
auto graph = setup.BuildGraph(pgmReturn, {streamNode});
@@ -247,7 +202,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLMultiHoppingTest) {
ui64 hop = 10,
ui64 interval = 30,
ui64 delay = 20,
- std::function<void(ui32, TSetup&)> customCheck = [](ui32, TSetup&){},
+ std::function<void(ui32, TSetup<false>&)> customCheck = [](ui32, TSetup<false>&){},
TWatermark* watermark = nullptr,
bool* yield = nullptr,
std::function<void()> fetch_callback= [](){},
@@ -264,8 +219,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLMultiHoppingTest) {
if (watermark == nullptr) {
watermark = &watermark_clone;
}
- TScopedAlloc alloc(__LOCATION__);
- TSetup setup1(alloc, *watermark, watermarkMode);
+ TSetup<false> setup1(GetAuxCallableFactory(*watermark));
ui32 curGroupId = 0;
std::vector<TOutputItem> curResult;
@@ -280,7 +234,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLMultiHoppingTest) {
curResult.clear();
};
- auto graph1 = BuildGraph(setup1, input, check, dataWatermarks, yield, hop, interval, delay);
+ auto graph1 = BuildGraph(setup1, watermarkMode, input, check, dataWatermarks, yield, hop, interval, delay);
auto root1 = graph1->GetValue();
@@ -319,7 +273,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLMultiHoppingTest) {
++inp_index;
}
};
- TestImpl(input, expected, false, 10, 30, 20, [](ui32, TSetup&){}, &watermark, &yield, avant_fetch, true);
+ TestImpl(input, expected, false, 10, 30, 20, [](ui32, TSetup<false>&){}, &watermark, &yield, avant_fetch, true);
}
Y_UNIT_TEST(TestThrowWatermarkFromPast) {
@@ -492,12 +446,12 @@ Y_UNIT_TEST_SUITE(TMiniKQLMultiHoppingTest) {
TOutputGroup({{2, 1, 160}, {2, 1, 170}, {2, 1, 180}}),
};
TestImpl(input, expected, true, 10, 30, 20,
- [](ui32 curGroup, TSetup& setup) {
+ [](ui32 curGroup, TSetup<false>& setup) {
if (curGroup != 2) {
return;
}
- setup.StatsResgistry->ForEachStat([](const TStatKey& key, i64 value) {
+ setup.StatsRegistry->ForEachStat([](const TStatKey& key, i64 value) {
if (key.GetName() == "MultiHop_KeysCount") {
UNIT_ASSERT_EQUAL_C(value, 1, "actual: " << value << " expected: " << 1);
}