diff options
author | Pavel Zuev <uniquelogin@ydb.tech> | 2025-04-03 21:27:38 +0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-04-03 16:27:38 +0000 |
commit | d2c5ec0ba9115cf7dcb2c6f1525677361c8898cc (patch) | |
tree | 63970175f52ee05f303992d7e0eeb44f096592ce | |
parent | 54026678e90cc945c55076444303cebd36837d0e (diff) | |
download | ydb-d2c5ec0ba9115cf7dcb2c6f1525677361c8898cc.tar.gz |
update all tests to use <string, ui64> tuples; add markdown-table result printer (#16746)
-rw-r--r-- | ydb/core/kqp/tools/combiner_perf/bin/main.cpp | 167 | ||||
-rw-r--r-- | ydb/core/kqp/tools/combiner_perf/bin/ya.make | 1 | ||||
-rw-r--r-- | ydb/core/kqp/tools/combiner_perf/converters.h | 2 | ||||
-rw-r--r-- | ydb/core/kqp/tools/combiner_perf/printout.h | 15 | ||||
-rw-r--r-- | ydb/core/kqp/tools/combiner_perf/run_params.h | 1 | ||||
-rw-r--r-- | ydb/core/kqp/tools/combiner_perf/simple.cpp | 106 | ||||
-rw-r--r-- | ydb/core/kqp/tools/combiner_perf/simple.h | 5 | ||||
-rw-r--r-- | ydb/core/kqp/tools/combiner_perf/simple_block.cpp | 216 | ||||
-rw-r--r-- | ydb/core/kqp/tools/combiner_perf/simple_block.h | 3 | ||||
-rw-r--r-- | ydb/core/kqp/tools/combiner_perf/simple_last.cpp | 266 | ||||
-rw-r--r-- | ydb/core/kqp/tools/combiner_perf/simple_last.h | 3 | ||||
-rw-r--r-- | ydb/core/kqp/tools/combiner_perf/streams.cpp | 48 | ||||
-rw-r--r-- | ydb/core/kqp/tools/combiner_perf/streams.h | 290 | ||||
-rw-r--r-- | ydb/core/kqp/tools/combiner_perf/ya.make | 5 |
14 files changed, 652 insertions, 476 deletions
diff --git a/ydb/core/kqp/tools/combiner_perf/bin/main.cpp b/ydb/core/kqp/tools/combiner_perf/bin/main.cpp index 6894fbc09d..ef62a36e6c 100644 --- a/ydb/core/kqp/tools/combiner_perf/bin/main.cpp +++ b/ydb/core/kqp/tools/combiner_perf/bin/main.cpp @@ -1,3 +1,4 @@ +#include <ydb/core/kqp/tools/combiner_perf/printout.h> #include <ydb/core/kqp/tools/combiner_perf/simple_last.h> #include <ydb/core/kqp/tools/combiner_perf/simple.h> #include <ydb/core/kqp/tools/combiner_perf/tpch_last.h> @@ -7,61 +8,155 @@ #include <util/stream/output.h> #include <util/stream/file.h> +#include <util/string/printf.h> #include <util/system/compiler.h> -void DoFullPass(bool withSpilling) +using NKikimr::NMiniKQL::TRunParams; + +class TPrintingResultCollector : public TTestResultCollector { - using namespace NKikimr::NMiniKQL; +public: + virtual void SubmitTestNameAndParams(const TRunParams& runParams, const char* testName, const std::optional<bool> llvm, const std::optional<bool> spilling) override + { + Cout << "------------------------------" << Endl; + Cout << testName; + if (llvm.has_value()) { + Cout << ", " << (llvm.value() ? "+" : "-") << "llvm"; + } + if (spilling.has_value()) { + Cout << ", " << (spilling.value() ? "+" : "-") << "spilling"; + } + Cout << Endl; + Cout << "Data rows total: " << runParams.RowsPerRun << " x " << runParams.NumRuns << Endl; + Cout << (runParams.MaxKey + 1) << " distinct numeric keys" << Endl; + Cout << "Block size: " << runParams.BlockSize << Endl; + Cout << "Long strings: " << (runParams.LongStringKeys ? "yes" : "no") << Endl; + Cout << Endl; + } - TRunParams runParams; + virtual void SubmitTimings(const TDuration& graphTime, const TDuration& referenceTime, const std::optional<TDuration> streamTime) override + { + Cout << "Graph runtime is: " << graphTime << " vs. reference C++ implementation: " << referenceTime << Endl; - runParams.NumRuns = 20; - runParams.RowsPerRun = 5'000'000; - runParams.MaxKey = 200'000 - 1; - runParams.BlockSize = 5'000; - runParams.LongStringKeys = true; + if (streamTime.has_value()) { + Cout << "Input stream own iteration time: " << *streamTime << Endl; + Cout << "Graph time - stream own time = " << (*streamTime <= graphTime ? graphTime - *streamTime : TDuration::Zero()) << Endl; + Cout << "C++ implementation time - devnull time = " << (*streamTime <= referenceTime ? referenceTime - *streamTime : TDuration::Zero()) << Endl; + } + } +}; - RunTestBlockCombineHashedSimple<false, false>(runParams); +class TWikiResultCollector : public TTestResultCollector +{ +public: + TWikiResultCollector() + { + Cout << "#|" << Endl; + Cout << "|| Test name | LLVM | Spilling | RowsTotal | Distinct keys | Block size | Input stream own time (s) | Graph time - stream time (s) | C++ time - stream time (s) | Shame ratio ||" << Endl; + } - auto doSimpleLast = [](const TRunParams& params) { - Cerr << "LastSimple, -llvm, -spilling" << Endl; - NKikimr::NMiniKQL::RunTestCombineLastSimple<false, false>(params); + ~TWikiResultCollector() + { + Cout << "|#" << Endl; + } - if (false) { - Cerr << "LastSimple, +llvm, -spilling" << Endl; - NKikimr::NMiniKQL::RunTestCombineLastSimple<true, false>(params); + virtual void SubmitTestNameAndParams(const TRunParams& runParams, const char* testName, const std::optional<bool> llvm, const std::optional<bool> spilling) override + { + Cout << "|| "; + Cout << testName << " | "; + if (llvm.has_value()) { + Cout << (llvm.value() ? "+" : " "); } - }; + Cout << " | "; + if (spilling.has_value()) { + Cout << (spilling.value() ? "+" : " "); + } + Cout << " | "; - doSimpleLast(runParams); + Cout << (runParams.RowsPerRun * runParams.NumRuns) << " | " << (runParams.MaxKey + 1) << " | "; + if (TStringBuf(testName).Contains("Block")) { + Cout << runParams.BlockSize; + } + Cout << " | "; + } - if (false) { - Cerr << "Simple, -llvm, -spilling" << Endl; - NKikimr::NMiniKQL::RunTestSimple<false>(); + static TString FancyDuration(const TDuration duration) + { + const auto ms = duration.MilliSeconds(); + if (!ms) { + return " "; + } + return Sprintf("%.2f", (ms / 1000.0)); + } - Cerr << "Simple, +llvm, -spilling" << Endl; - NKikimr::NMiniKQL::RunTestSimple<true>(); + virtual void SubmitTimings(const TDuration& graphTime, const TDuration& referenceTime, const std::optional<TDuration> streamTime) override + { + TDuration streamTimeOrZero = (streamTime.has_value()) ? streamTime.value() : TDuration::Zero(); + TDuration corrGraphTime = streamTimeOrZero <= graphTime ? graphTime - streamTimeOrZero : TDuration::Zero(); + TDuration corrRefTime = streamTimeOrZero <= referenceTime ? referenceTime - streamTimeOrZero : TDuration::Zero(); - Cerr << "LastTpch, -llvm, -spilling" << Endl; - NKikimr::NMiniKQL::RunTestLastTpch<false, false>(); + TString diff; + if (corrRefTime.MilliSeconds() > 0) { + diff = Sprintf("%.2f", corrGraphTime.MilliSeconds() * 1.0 / corrRefTime.MilliSeconds()); + } - Cerr << "LastTpch, +llvm, -spilling" << Endl; - NKikimr::NMiniKQL::RunTestLastTpch<true, false>(); + Cout << FancyDuration(streamTimeOrZero) << " | " << FancyDuration(corrGraphTime) << " | " << FancyDuration(corrRefTime) << " | " << diff << " ||" << Endl; + Cout.Flush(); } +}; - if (withSpilling) { - Cerr << "LastSimple, -llvm, +spilling" << Endl; - NKikimr::NMiniKQL::RunTestCombineLastSimple<false, true>(runParams); +void DoFullPass(bool withSpilling) +{ + using namespace NKikimr::NMiniKQL; - Cerr << "LastSimple, +llvm, +spilling" << Endl; - NKikimr::NMiniKQL::RunTestCombineLastSimple<true, true>(runParams); + TWikiResultCollector printout; - Cerr << "LastTpch, -llvm, +spilling" << Endl; - NKikimr::NMiniKQL::RunTestLastTpch<false, true>(); + TRunParams runParams; - Cerr << "LastTpch, +llvm, +spilling" << Endl; - NKikimr::NMiniKQL::RunTestLastTpch<true, true>(); - } + runParams.NumRuns = 20; + runParams.RowsPerRun = 5'000'000; + runParams.MaxKey = 1'00 - 1; + runParams.LongStringKeys = false; + + const std::vector<size_t> numKeys = {4u, 1000u, 100'000u, 200'000u}; + const std::vector<size_t> blockSizes = {128u, 8192u}; + + auto doSimple = [&printout, numKeys](const TRunParams& params) { + for (size_t keyCount : numKeys) { + auto runParams = params; + runParams.MaxKey = keyCount - 1; + RunTestSimple<false>(runParams, printout); + RunTestSimple<true>(runParams, printout); + } + }; + + auto doSimpleLast = [&printout, &numKeys, withSpilling](const TRunParams& params) { + for (size_t keyCount : numKeys) { + auto runParams = params; + runParams.MaxKey = keyCount - 1; + RunTestCombineLastSimple<false, false>(runParams, printout); + RunTestCombineLastSimple<true, false>(runParams, printout); + if (withSpilling) { + RunTestCombineLastSimple<false, true>(runParams, printout); + RunTestCombineLastSimple<true, true>(runParams, printout); + } + } + }; + + auto doBlockHashed = [&printout, &numKeys, &blockSizes](const TRunParams& params) { + for (size_t keyCount : numKeys) { + for (size_t blockSize : blockSizes) { + auto runParams = params; + runParams.MaxKey = keyCount - 1; + runParams.BlockSize = blockSize; + RunTestBlockCombineHashedSimple<false, false>(runParams, printout); + } + } + }; + + doSimple(runParams); + doSimpleLast(runParams); + doBlockHashed(runParams); } int main(int argc, const char* argv[]) diff --git a/ydb/core/kqp/tools/combiner_perf/bin/ya.make b/ydb/core/kqp/tools/combiner_perf/bin/ya.make index ec0196df29..2bce61f2f7 100644 --- a/ydb/core/kqp/tools/combiner_perf/bin/ya.make +++ b/ydb/core/kqp/tools/combiner_perf/bin/ya.make @@ -21,4 +21,3 @@ SRCS( ) END() - diff --git a/ydb/core/kqp/tools/combiner_perf/converters.h b/ydb/core/kqp/tools/combiner_perf/converters.h index 8d63107a8d..76a76062ff 100644 --- a/ydb/core/kqp/tools/combiner_perf/converters.h +++ b/ydb/core/kqp/tools/combiner_perf/converters.h @@ -1,3 +1,5 @@ +#pragma once + #include <yql/essentials/minikql/comp_nodes/ut/mkql_computation_node_ut.h> #include <yql/essentials/minikql/computation/mkql_computation_node_holders.h> #include <yql/essentials/minikql/comp_nodes/mkql_factories.h> diff --git a/ydb/core/kqp/tools/combiner_perf/printout.h b/ydb/core/kqp/tools/combiner_perf/printout.h new file mode 100644 index 0000000000..0f2f431e89 --- /dev/null +++ b/ydb/core/kqp/tools/combiner_perf/printout.h @@ -0,0 +1,15 @@ +#pragma once + +#include "run_params.h" + +#include <util/datetime/base.h> +#include <optional> + +class TTestResultCollector { +public: + virtual void SubmitTestNameAndParams(const NKikimr::NMiniKQL::TRunParams& runParams, const char* testName, const std::optional<bool> llvm = {}, const std::optional<bool> spilling = {}) = 0; + + virtual void SubmitTimings(const TDuration& graphTime, const TDuration& referenceTime, const std::optional<TDuration> streamTime = {}) = 0; + + virtual ~TTestResultCollector() {}; +}; diff --git a/ydb/core/kqp/tools/combiner_perf/run_params.h b/ydb/core/kqp/tools/combiner_perf/run_params.h index c3c350d259..a12576e2d2 100644 --- a/ydb/core/kqp/tools/combiner_perf/run_params.h +++ b/ydb/core/kqp/tools/combiner_perf/run_params.h @@ -2,7 +2,6 @@ #include <util/system/defaults.h> - namespace NKikimr { namespace NMiniKQL { diff --git a/ydb/core/kqp/tools/combiner_perf/simple.cpp b/ydb/core/kqp/tools/combiner_perf/simple.cpp index b9cc26eaae..6ada8c91d3 100644 --- a/ydb/core/kqp/tools/combiner_perf/simple.cpp +++ b/ydb/core/kqp/tools/combiner_perf/simple.cpp @@ -1,5 +1,8 @@ #include "simple.h" + #include "factories.h" +#include "streams.h" +#include "printout.h" #include <yql/essentials/minikql/comp_nodes/ut/mkql_computation_node_ut.h> #include <yql/essentials/minikql/computation/mkql_computation_node_holders.h> @@ -13,74 +16,71 @@ namespace NKikimr { namespace NMiniKQL { -std::vector<std::pair<i8, double>> MakeSamples() { - constexpr auto total_samples = 100'000'000ULL; - - std::default_random_engine eng; - std::uniform_int_distribution<int> keys(-100, +100); - std::uniform_real_distribution<double> unif(-999.0, +999.0); - - std::vector<std::pair<i8, double>> samples(total_samples); - - eng.seed(std::time(nullptr)); - std::generate(samples.begin(), samples.end(), std::bind(&std::make_pair<i8, double>, std::bind(std::move(keys), std::move(eng)), std::bind(std::move(unif), std::move(eng)))); - return samples; -} - -const std::vector<std::pair<i8, double>> I8Samples = MakeSamples(); - template<bool LLVM> -void RunTestSimple() +void RunTestSimple(const TRunParams& params, TTestResultCollector& printout) { TSetup<LLVM> setup(GetPerfTestFactory()); - Cerr << "Simple i8 sample has " << I8Samples.size() << " rows" << Endl; + printout.SubmitTestNameAndParams(params, __func__, LLVM); - double positive = 0.0, negative = 0.0; - const auto t = TInstant::Now(); - for (const auto& sample : I8Samples) { - (sample.second > 0.0 ? positive : negative) += sample.second; - } - const auto cppTime = TInstant::Now() - t; + TString64DataSampler sampler(params.RowsPerRun, params.MaxKey, params.NumRuns, params.LongStringKeys); + // or T6464DataSampler sampler(numSamples, maxKey, numIters); -- maybe make selectable from params + Cerr << "Sampler type: " << sampler.Describe() << Endl; TProgramBuilder& pb = *setup.PgmBuilder; - const auto listType = pb.NewListType(pb.NewDataType(NUdf::TDataType<double>::Id)); - const auto list = TCallableBuilder(pb.GetTypeEnvironment(), "TestList", listType).Build(); - - const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideCombiner(pb.ExpandMap(pb.ToFlow(TRuntimeNode(list, false)), - [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; }), 0ULL, - [&](TRuntimeNode::TList items) -> TRuntimeNode::TList { return {pb.AggrGreater(items.front(), pb.NewDataLiteral(0.0))}; }, - [&](TRuntimeNode::TList, TRuntimeNode::TList items) -> TRuntimeNode::TList { return items; }, - [&](TRuntimeNode::TList, TRuntimeNode::TList items, TRuntimeNode::TList state) -> TRuntimeNode::TList { return {pb.AggrAdd(state.front(), items.front())}; }, - [&](TRuntimeNode::TList, TRuntimeNode::TList state) -> TRuntimeNode::TList { return state; }), - [&](TRuntimeNode::TList items) { return items.front(); } + const auto streamItemType = pb.NewMultiType({sampler.GetKeyType(pb), pb.NewDataType(NUdf::TDataType<ui64>::Id)}); + const auto streamType = pb.NewStreamType(streamItemType); + const auto streamCallable = TCallableBuilder(pb.GetTypeEnvironment(), "TestList", streamType).Build(); + + const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideCombiner( + pb.ToFlow(TRuntimeNode(streamCallable, false)), + 0ULL, + [&](TRuntimeNode::TList items) -> TRuntimeNode::TList { return { items.front() }; }, + [&](TRuntimeNode::TList, TRuntimeNode::TList items) -> TRuntimeNode::TList { return { items.back() } ; }, + [&](TRuntimeNode::TList, TRuntimeNode::TList items, TRuntimeNode::TList state) -> TRuntimeNode::TList { + return {pb.AggrAdd(state.front(), items.back())}; + }, + [&](TRuntimeNode::TList keys, TRuntimeNode::TList state) -> TRuntimeNode::TList { + return {keys.front(), state.front()}; + }), + [&](TRuntimeNode::TList items) { return pb.NewTuple(items); } )); - const auto graph = setup.BuildGraph(pgmReturn, {list}); - NUdf::TUnboxedValue* items = nullptr; - graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), graph->GetHolderFactory().CreateDirectArrayHolder(I8Samples.size(), items)); - std::transform(I8Samples.cbegin(), I8Samples.cend(), items, [](const std::pair<i8, double> s){ return ToValue<double>(s.second); }); + const auto graph = setup.BuildGraph(pgmReturn, {streamCallable}); - const auto t1 = TInstant::Now(); - const auto& value = graph->GetValue(); - const auto first = value.GetElement(0); - const auto second = value.GetElement(1); - const auto t2 = TInstant::Now(); - - if (first.template Get<double>() > 0.0) { - UNIT_ASSERT_VALUES_EQUAL(first.template Get<double>(), positive); - UNIT_ASSERT_VALUES_EQUAL(second.template Get<double>(), negative); - } else { - UNIT_ASSERT_VALUES_EQUAL(first.template Get<double>(), negative); - UNIT_ASSERT_VALUES_EQUAL(second.template Get<double>(), positive); + // Measure the input stream run time + const auto devnullStream = sampler.MakeStream(graph->GetHolderFactory()); + const auto devnullStart = TInstant::Now(); + { + NUdf::TUnboxedValue columns[2]; + while (devnullStream->WideFetch(columns, 2) == NUdf::EFetchStatus::Ok) { + } } + const auto devnullTime = TInstant::Now() - devnullStart; + + // Reference implementation (sum via an std::unordered_map) + auto referenceStream = sampler.MakeStream(graph->GetHolderFactory()); + const auto t = TInstant::Now(); + sampler.ComputeReferenceResult(*referenceStream); + const auto cppTime = TInstant::Now() - t; + + // Compute graph implementation + auto myStream = NUdf::TUnboxedValuePod(sampler.MakeStream(graph->GetHolderFactory()).Release()); + graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), std::move(myStream)); + + const auto graphTimeStart = TInstant::Now(); + const auto& value = graph->GetValue(); + const auto graphTime = TInstant::Now() - graphTimeStart; + + // Verification + sampler.VerifyComputedValueVsReference(value); - Cerr << "WideCombiner graph runtime is: " << t2 - t1 << " vs. reference C++ implementation: " << cppTime << Endl << Endl; + printout.SubmitTimings(graphTime, cppTime, devnullTime); } -template void RunTestSimple<false>(); -template void RunTestSimple<true>(); +template void RunTestSimple<false>(const TRunParams& params, TTestResultCollector& printout); +template void RunTestSimple<true>(const TRunParams& params, TTestResultCollector& printout); } } diff --git a/ydb/core/kqp/tools/combiner_perf/simple.h b/ydb/core/kqp/tools/combiner_perf/simple.h index e2bc3f44da..193312919f 100644 --- a/ydb/core/kqp/tools/combiner_perf/simple.h +++ b/ydb/core/kqp/tools/combiner_perf/simple.h @@ -1,10 +1,13 @@ #pragma once +#include "run_params.h" +#include "printout.h" + namespace NKikimr { namespace NMiniKQL { template<bool LLVM> -void RunTestSimple(); +void RunTestSimple(const TRunParams& params, TTestResultCollector& printout); } } diff --git a/ydb/core/kqp/tools/combiner_perf/simple_block.cpp b/ydb/core/kqp/tools/combiner_perf/simple_block.cpp index 9b9a71351b..52ebdc61ca 100644 --- a/ydb/core/kqp/tools/combiner_perf/simple_block.cpp +++ b/ydb/core/kqp/tools/combiner_perf/simple_block.cpp @@ -1,7 +1,8 @@ #include "simple_block.h" #include "factories.h" -#include "converters.h" +#include "streams.h" +#include "printout.h" #include <yql/essentials/minikql/comp_nodes/ut/mkql_computation_node_ut.h> #include <yql/essentials/minikql/computation/mkql_computation_node_holders.h> @@ -11,6 +12,8 @@ #include <contrib/libs/apache/arrow/cpp/src/arrow/type_fwd.h> #include <contrib/libs/apache/arrow/cpp/src/arrow/array/array_primitive.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/chunked_array.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/array.h> #include <library/cpp/testing/unittest/registar.h> @@ -19,117 +22,10 @@ namespace NKikimr { namespace NMiniKQL { +template<typename K> +void UpdateMapFromBlocks(const NUdf::TUnboxedValue& key, const NUdf::TUnboxedValue& value, std::unordered_map<K, uint64_t>& result); -using T6464Samples = std::vector<std::pair<uint64_t, uint64_t>>; -using TString64Samples = std::vector<std::pair<std::string, uint64_t>>; - -T6464Samples MakeKeyed6464SamplesForBlocks(const size_t numSamples, const unsigned int maxKey) { - std::default_random_engine eng; - std::uniform_int_distribution<unsigned int> keys(0, maxKey); - std::uniform_int_distribution<uint64_t> unif(0, 100000.0); - - T6464Samples samples(numSamples); - - eng.seed(std::time(nullptr)); - std::generate(samples.begin(), samples.end(), - [&]() -> auto { - return std::make_pair<uint64_t, uint64_t>(keys(eng), unif(eng)); - } - ); - - return samples; -} - -struct IWideBlockStream : public NUdf::TBoxedValue -{ - NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& result) final override { - Y_UNUSED(result); - ythrow yexception() << "only WideFetch is supported here"; - } -}; - -template<typename K, typename V> -struct TBlockStream : public IWideBlockStream { - using TSamples = std::vector<std::pair<K, V>>; - - TBlockStream(TComputationContext& ctx, const TSamples& samples, size_t iterations, size_t blockSize, const std::vector<TType*> types) - : Context(ctx) - , Samples(samples) - , End(Samples.end()) - , MaxIterations(iterations) - , CurrSample(Samples.begin()) - , BlockSize(blockSize) - , Types(types) - { - Y_ENSURE(samples.size() > 0); - } - -private: - TComputationContext& Context; - const TSamples& Samples; - const TSamples::const_iterator End; - const size_t MaxIterations; - TSamples::const_iterator CurrSample; - size_t Iteration = 0; - size_t BlockSize = 1; - const std::vector<TType*> Types; - - const TSamples::value_type* NextSample() { - if (CurrSample == End) { - ++Iteration; - if (Iteration >= MaxIterations) { - return nullptr; - } - CurrSample = Samples.begin(); - } - return CurrSample++; - } - - bool IsAtTheEnd() const { - return (CurrSample == End) && (Iteration >= MaxIterations); - } - -public: - NUdf::EFetchStatus WideFetch(NUdf::TUnboxedValue* result, ui32 width) final override { - if (width != 3) { - ythrow yexception() << "width 3 expected"; - } - - TVector<std::unique_ptr<IArrayBuilder>> builders; - std::transform(Types.cbegin(), Types.cend(), std::back_inserter(builders), - [&](const auto& type) { - return MakeArrayBuilder(TTypeInfoHelper(), type, Context.ArrowMemoryPool, BlockSize, &Context.Builder->GetPgBuilder()); - }); - - size_t count = 0; - for (; count < BlockSize; ++count) { - const auto nextTuple = NextSample(); - if (!nextTuple) { - break; - } - - NUdf::TUnboxedValue key; - NativeToUnboxed<false>(nextTuple->first, key); - builders[0]->Add(key); - - NUdf::TUnboxedValue value; - NativeToUnboxed<false>(nextTuple->second, value); - builders[1]->Add(value); - } - - if (count > 0) { - const bool finish = IsAtTheEnd(); - result[0] = Context.HolderFactory.CreateArrowBlock(builders[0]->Build(finish)); - result[1] = Context.HolderFactory.CreateArrowBlock(builders[1]->Build(finish)); - result[2] = Context.HolderFactory.CreateArrowBlock(arrow::Datum(static_cast<uint64_t>(count))); - - return NUdf::EFetchStatus::Ok; - } else { - return NUdf::EFetchStatus::Finish; - } - } -}; - +template<> void UpdateMapFromBlocks(const NUdf::TUnboxedValue& key, const NUdf::TUnboxedValue& value, std::unordered_map<uint64_t, uint64_t>& result) { auto datumKey = TArrowBlock::From(key).GetDatum(); @@ -149,20 +45,55 @@ void UpdateMapFromBlocks(const NUdf::TUnboxedValue& key, const NUdf::TUnboxedVal } } +template<> +void UpdateMapFromBlocks(const NUdf::TUnboxedValue& key, const NUdf::TUnboxedValue& value, std::unordered_map<std::string, uint64_t>& result) +{ + auto datumKey = TArrowBlock::From(key).GetDatum(); + auto datumValue = TArrowBlock::From(value).GetDatum(); + UNIT_ASSERT(datumKey.is_arraylike()); + UNIT_ASSERT(datumValue.is_array()); + + const auto ui64values = datumValue.template array_as<arrow::UInt64Array>(); + UNIT_ASSERT(!!ui64values); + + int64_t valueOffset = 0; + + for (const auto& chunk : datumKey.chunks()) { + auto* barray = dynamic_cast<arrow::BinaryArray*>(chunk.get()); + UNIT_ASSERT(barray != nullptr); + for (int64_t i = 0; i < barray->length(); ++i) { + auto key = barray->GetString(i); + auto val = ui64values->Value(valueOffset); + result[key] += val; + ++valueOffset; + } + } +} + + +template<typename TStream, typename TMap> +void CalcRefResult(TStream& refStream, TMap& refResult) +{ + NUdf::TUnboxedValue columns[3]; + + while (refStream->WideFetch(columns, 3) == NUdf::EFetchStatus::Ok) { + UpdateMapFromBlocks(columns[0], columns[1], refResult); + } +} + + template<bool LLVM, bool Spilling> -void RunTestBlockCombineHashedSimple(const TRunParams& params) +void RunTestBlockCombineHashedSimple(const TRunParams& params, TTestResultCollector& printout) { TSetup<LLVM, Spilling> setup(GetPerfTestFactory()); - Cerr << "Data rows total: " << params.RowsPerRun << " x " << params.NumRuns << Endl; - Cerr << (params.MaxKey + 1) << " distinct numeric keys" << Endl; - Cerr << "Block size: " << params.BlockSize << Endl; - - auto samples = MakeKeyed6464SamplesForBlocks(params.RowsPerRun, params.MaxKey); + printout.SubmitTestNameAndParams(params, __func__, LLVM, Spilling); + + auto samples = MakeKeyedString64Samples(params.RowsPerRun, params.MaxKey, false); TProgramBuilder& pb = *setup.PgmBuilder; - auto keyBaseType = pb.NewDataType(NUdf::TDataType<ui64>::Id); + auto keyBaseType = pb.NewDataType(NUdf::TDataType<char*>::Id); auto valueBaseType = pb.NewDataType(NUdf::TDataType<ui64>::Id); auto keyBlockType = pb.NewBlockType(keyBaseType, TBlockType::EShape::Many); auto valueBlockType = pb.NewBlockType(valueBaseType, TBlockType::EShape::Many); @@ -194,7 +125,7 @@ void RunTestBlockCombineHashedSimple(const TRunParams& params) const auto graph = setup.BuildGraph(pgmReturn, {streamCallable}); auto streamMaker = [&]() -> auto { - return std::unique_ptr<TBlockStream<uint64_t, uint64_t>>(new TBlockStream<uint64_t, uint64_t>( + return std::unique_ptr<TBlockKVStream<std::string, uint64_t>>(new TBlockKVStream<std::string, uint64_t>( graph->GetContext(), samples, params.NumRuns, @@ -203,24 +134,30 @@ void RunTestBlockCombineHashedSimple(const TRunParams& params) )); }; - std::unordered_map<uint64_t, uint64_t> rawResult; + // Compute results directly from raw samples to test the input stream implementation + std::unordered_map<std::string, uint64_t> rawResult; for (const auto& tuple : samples) { - rawResult[tuple.first] += (tuple.second * params.NumRuns); + rawResult[tuple.first] += (tuple.second * params.NumRuns); } - std::unordered_map<uint64_t, uint64_t> refResult; - const auto refStream = streamMaker(); - - const auto cppStart = TInstant::Now(); + // Measure the input stream run time + const auto devnullStream = streamMaker(); + const auto devnullStart = TInstant::Now(); { NUdf::TUnboxedValue columns[3]; - - while (refStream->WideFetch(columns, 3) == NUdf::EFetchStatus::Ok) { - UpdateMapFromBlocks(columns[0], columns[1], refResult); + while (devnullStream->WideFetch(columns, 3) == NUdf::EFetchStatus::Ok) { } } + const auto devnullTime = TInstant::Now() - devnullStart; + + // Reference implementation (sum via an std::unordered_map) + std::unordered_map<std::string, uint64_t> refResult; + const auto refStream = streamMaker(); + const auto cppStart = TInstant::Now(); + CalcRefResult(refStream, refResult); const auto cppTime = TInstant::Now() - cppStart; + // Verify the reference stream implementation against the raw samples UNIT_ASSERT_VALUES_EQUAL(refResult.size(), rawResult.size()); for (const auto& tuple : rawResult) { auto otherIt = refResult.find(tuple.first); @@ -228,6 +165,7 @@ void RunTestBlockCombineHashedSimple(const TRunParams& params) UNIT_ASSERT_VALUES_EQUAL(tuple.second, otherIt->second); } + // Compute graph implementation auto stream = NUdf::TUnboxedValuePod(streamMaker().release()); graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), std::move(stream)); const auto graphStart = TInstant::Now(); @@ -235,16 +173,18 @@ void RunTestBlockCombineHashedSimple(const TRunParams& params) const auto graphTime = TInstant::Now() - graphStart; size_t numResultItems = resultList.GetListLength(); + Cerr << "Result block count: " << numResultItems << Endl; - std::unordered_map<uint64_t, uint64_t> graphResult; + // Verification + std::unordered_map<std::string, uint64_t> graphResult; const auto ptr = resultList.GetElements(); - for (size_t i = 0ULL; i < numResultItems; ++i) { + for (size_t i = 0ULL; i < numResultItems; ++i) { UNIT_ASSERT(ptr[i].GetListLength() >= 2); const auto elements = ptr[i].GetElements(); - - UpdateMapFromBlocks(elements[0], elements[1], graphResult); + + UpdateMapFromBlocks(elements[0], elements[1], graphResult); } UNIT_ASSERT_VALUES_EQUAL(refResult.size(), graphResult.size()); @@ -254,13 +194,15 @@ void RunTestBlockCombineHashedSimple(const TRunParams& params) UNIT_ASSERT_VALUES_EQUAL(tuple.second, graphIt->second); } - Cerr << "BlockCombineHashed graph runtime is: " << graphTime << " vs. reference C++ implementation: " << cppTime << "" << Endl << Endl; + Cerr << "Graph time raw: " << graphTime << Endl; + Cerr << "CPP time raw: " << cppTime << Endl; + printout.SubmitTimings(graphTime, cppTime, devnullTime); } -template void RunTestBlockCombineHashedSimple<false, false>(const TRunParams& params); -template void RunTestBlockCombineHashedSimple<false, true>(const TRunParams& params); -template void RunTestBlockCombineHashedSimple<true, false>(const TRunParams& params); -template void RunTestBlockCombineHashedSimple<true, true>(const TRunParams& params); +template void RunTestBlockCombineHashedSimple<false, false>(const TRunParams& params, TTestResultCollector& printout); +template void RunTestBlockCombineHashedSimple<false, true>(const TRunParams& params, TTestResultCollector& printout); +template void RunTestBlockCombineHashedSimple<true, false>(const TRunParams& params, TTestResultCollector& printout); +template void RunTestBlockCombineHashedSimple<true, true>(const TRunParams& params, TTestResultCollector& printout); } diff --git a/ydb/core/kqp/tools/combiner_perf/simple_block.h b/ydb/core/kqp/tools/combiner_perf/simple_block.h index b05c5788c4..e57554a10a 100644 --- a/ydb/core/kqp/tools/combiner_perf/simple_block.h +++ b/ydb/core/kqp/tools/combiner_perf/simple_block.h @@ -1,12 +1,13 @@ #pragma once #include "run_params.h" +#include "printout.h" namespace NKikimr { namespace NMiniKQL { template<bool LLVM, bool Spilling> -void RunTestBlockCombineHashedSimple(const TRunParams& params); +void RunTestBlockCombineHashedSimple(const TRunParams& params, TTestResultCollector& printout); } } diff --git a/ydb/core/kqp/tools/combiner_perf/simple_last.cpp b/ydb/core/kqp/tools/combiner_perf/simple_last.cpp index 1bf311a17f..fe880637c7 100644 --- a/ydb/core/kqp/tools/combiner_perf/simple_last.cpp +++ b/ydb/core/kqp/tools/combiner_perf/simple_last.cpp @@ -2,6 +2,8 @@ #include "factories.h" #include "converters.h" +#include "streams.h" +#include "printout.h" #include <yql/essentials/minikql/comp_nodes/ut/mkql_computation_node_ut.h> #include <yql/essentials/minikql/computation/mkql_computation_node_holders.h> @@ -15,251 +17,19 @@ namespace NKikimr { namespace NMiniKQL { -using T6464Samples = std::vector<std::pair<ui64, ui64>>; -using TString64Samples = std::vector<std::pair<std::string, ui64>>; - -T6464Samples MakeKeyed6464Samples(const size_t numSamples, const unsigned int maxKey) { - std::default_random_engine eng; - std::uniform_int_distribution<unsigned int> keys(0, maxKey); - std::uniform_int_distribution<ui64> unif(0, 100000.0); - - T6464Samples samples(numSamples); - - eng.seed(std::time(nullptr)); - std::generate(samples.begin(), samples.end(), - [&]() -> auto { - return std::make_pair<ui64, ui64>(keys(eng), unif(eng)); - } - ); - - return samples; -} - -TString64Samples MakeKeyedString64Samples(const size_t numSamples, const unsigned int maxKey, const bool longStrings) { - std::default_random_engine eng; - std::uniform_int_distribution<unsigned int> keys(0, maxKey); - std::uniform_int_distribution<ui64> unif(0, 100000.0); - - TString64Samples samples(numSamples); - - eng.seed(std::time(nullptr)); - std::generate(samples.begin(), samples.end(), - [&]() -> auto { - auto key = keys(eng); - std::string strKey; - if (!longStrings) { - strKey = std::string(ToString(key)); - } else { - strKey = Sprintf("%07u.%07u.%07u.", key, key, key); - } - return std::make_pair<std::string, ui64>(std::move(strKey), unif(eng)); - } - ); - - return samples; -} - -struct IWideStream : public NUdf::TBoxedValue -{ - NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& result) final { - Y_UNUSED(result); - ythrow yexception() << "only WideFetch is supported here"; - } - - NUdf::EFetchStatus WideFetch(NUdf::TUnboxedValue* result, ui32 width) = 0; -}; - -template<typename K, typename V, bool EmbeddedKeys> -struct TStream : public IWideStream { - using TSamples = std::vector<std::pair<K, V>>; - - TStream(const THolderFactory& holderFactory, const TSamples& samples, size_t iterations) - : HolderFactory(holderFactory) - , Samples(samples) - , End(Samples.end()) - , MaxIterations(iterations) - , CurrSample(Samples.begin()) - { - Y_ENSURE(samples.size() > 0); - } - -private: - const THolderFactory& HolderFactory; - const TSamples& Samples; - const TSamples::const_iterator End; - const size_t MaxIterations; - TSamples::const_iterator CurrSample = 0; - size_t Iteration = 0; - -public: - NUdf::EFetchStatus WideFetch(NUdf::TUnboxedValue* result, ui32 width) final { - Y_UNUSED(HolderFactory); // for future use - - if (CurrSample == End) { - ++Iteration; - if (Iteration >= MaxIterations) { - return NUdf::EFetchStatus::Finish; - } - CurrSample = Samples.begin(); - } - - if (width != 2) { - ythrow yexception() << "width 2 expected"; - } - - // TODO: support embedded strings in values? - NativeToUnboxed<EmbeddedKeys>(CurrSample->first, result[0]); - NativeToUnboxed<false>(CurrSample->second, result[1]); - - ++CurrSample; - - return NUdf::EFetchStatus::Ok; - } -}; - -template<typename K, typename V> -std::unordered_map<K, V> ComputeSumReferenceResult(IWideStream& referenceStream) -{ - std::unordered_map<K, V> expects; - { - NUdf::TUnboxedValue inputs[2]; - - while (referenceStream.WideFetch(inputs, 2) == NUdf::EFetchStatus::Ok) { - expects[UnboxedToNative<K>(inputs[0])] += inputs[1].Get<V>(); - } - } - return expects; -} - -template<typename K, typename V> -void VerifyListVsUnorderedMap(const NUdf::TUnboxedValue& pairList, const std::unordered_map<K, V>& map) -{ - UNIT_ASSERT_VALUES_EQUAL(pairList.GetListLength(), map.size()); - - const auto ptr = pairList.GetElements(); - for (size_t i = 0ULL; i < map.size(); ++i) { - const auto elements = ptr[i].GetElements(); - const auto key = UnboxedToNative<K>(elements[0]); - const auto value = UnboxedToNative<V>(elements[1]); - const auto ii = map.find(key); - UNIT_ASSERT(ii != map.end()); - UNIT_ASSERT_VALUES_EQUAL(value, ii->second); - } -} - -class IDataSampler -{ - virtual THolder<IWideStream> MakeStream(const THolderFactory& holderFactory) const = 0; - virtual TType* GetKeyType(TProgramBuilder& pb) const = 0; - virtual void ComputeReferenceResult(IWideStream& referenceStream) = 0; - virtual void VerifyComputedValueVsReference(const NUdf::TUnboxedValue& value) const = 0; - virtual std::string Describe() const = 0; -}; - -class T6464DataSampler : public IDataSampler -{ -public: - TStream<ui64, ui64, false>::TSamples Samples; - std::unordered_map<ui64, ui64> Expects; - - size_t StreamNumIters = 0; - - T6464DataSampler(size_t numSamples, size_t maxKey, size_t numIters) - : Samples(MakeKeyed6464Samples(numSamples, maxKey)) - , StreamNumIters(numIters) - { - } - - THolder<IWideStream> MakeStream(const THolderFactory& holderFactory) const override - { - return THolder(new TStream<ui64, ui64, false>(holderFactory, Samples, StreamNumIters)); - } - - TType* GetKeyType(TProgramBuilder& pb) const override - { - return pb.NewDataType(NUdf::TDataType<ui64>::Id); - } - - void ComputeReferenceResult(IWideStream& referenceStream) override - { - Y_ENSURE(Expects.empty()); - - Expects = ComputeSumReferenceResult<ui64, ui64>(referenceStream); - } - - void VerifyComputedValueVsReference(const NUdf::TUnboxedValue& value) const override - { - VerifyListVsUnorderedMap(value, Expects); - } - - std::string Describe() const override - { - return "ui64 keys, ui64 values"; - } -}; - -class TString64DataSampler : public IDataSampler -{ -public: - TStream<std::string, ui64, false>::TSamples Samples; - std::unordered_map<std::string, ui64> Expects; - - size_t StreamNumIters = 0; - bool LongStrings = false; - - TString64DataSampler(size_t numSamples, size_t maxKey, size_t numIters, bool longStrings) - : Samples(MakeKeyedString64Samples(numSamples, maxKey, longStrings)) - , StreamNumIters(numIters) - , LongStrings(longStrings) - { - } - - THolder<IWideStream> MakeStream(const THolderFactory& holderFactory) const override - { - if (LongStrings) { - return THolder(new TStream<std::string, ui64, false>(holderFactory, Samples, StreamNumIters)); - } else { - return THolder(new TStream<std::string, ui64, true>(holderFactory, Samples, StreamNumIters)); - } - } - - TType* GetKeyType(TProgramBuilder& pb) const override - { - return pb.NewDataType(NUdf::TDataType<char*>::Id); - } - - void ComputeReferenceResult(IWideStream& referenceStream) override - { - Y_ENSURE(Expects.empty()); - - Expects = ComputeSumReferenceResult<std::string, ui64>(referenceStream); - } - - void VerifyComputedValueVsReference(const NUdf::TUnboxedValue& value) const override - { - VerifyListVsUnorderedMap(value, Expects); - } - - std::string Describe() const override - { - return Sprintf("%s string keys, ui64 values", LongStrings ? "Long (24 byte)" : "Embedded"); - } -}; - template<bool LLVM, bool Spilling> -void RunTestCombineLastSimple(const TRunParams& params) +void RunTestCombineLastSimple(const TRunParams& params, TTestResultCollector& printout) { TSetup<LLVM, Spilling> setup(GetPerfTestFactory()); + printout.SubmitTestNameAndParams(params, __func__, LLVM, Spilling); + const size_t numSamples = params.RowsPerRun; const size_t numIters = params.NumRuns; // Will process numSamples * numIters items const size_t maxKey = params.MaxKey; // maxKey + 1 distinct keys, each key multiplied by 64 for some reason - Cerr << "Data rows total: " << numSamples << " x " << numIters << Endl; - Cerr << (maxKey + 1) << " distinct numeric keys" << Endl; - - // TString64DataSampler sampler(numSamples, maxKey, numIters, params.LongStringKeys); - T6464DataSampler sampler(numSamples, maxKey, numIters); + TString64DataSampler sampler(numSamples, maxKey, numIters, params.LongStringKeys); + // or T6464DataSampler sampler(numSamples, maxKey, numIters); -- maybe make selectable from params Cerr << "Sampler type: " << sampler.Describe() << Endl; TProgramBuilder& pb = *setup.PgmBuilder; @@ -294,6 +64,16 @@ void RunTestCombineLastSimple(const TRunParams& params) graph->GetContext().SpillerFactory = std::make_shared<TMockSpillerFactory>(); } + // Measure the input stream run time + const auto devnullStream = sampler.MakeStream(graph->GetHolderFactory()); + const auto devnullStart = TInstant::Now(); + { + NUdf::TUnboxedValue columns[2]; + while (devnullStream->WideFetch(columns, 2) == NUdf::EFetchStatus::Ok) { + } + } + const auto devnullTime = TInstant::Now() - devnullStart; + // Reference implementation (sum via an std::unordered_map) auto referenceStream = sampler.MakeStream(graph->GetHolderFactory()); const auto t = TInstant::Now(); @@ -306,18 +86,18 @@ void RunTestCombineLastSimple(const TRunParams& params) const auto t1 = TInstant::Now(); const auto& value = graph->GetValue(); - const auto t2 = TInstant::Now(); + const auto graphTime = TInstant::Now() - t1; // Verification sampler.VerifyComputedValueVsReference(value); - Cerr << "WideLastCombiner graph runtime is: " << t2 - t1 << " vs. reference C++ implementation: " << cppTime << "" << Endl << Endl; + printout.SubmitTimings(graphTime, cppTime, devnullTime); } -template void RunTestCombineLastSimple<false, false>(const TRunParams& params); -template void RunTestCombineLastSimple<false, true>(const TRunParams& params); -template void RunTestCombineLastSimple<true, false>(const TRunParams& params); -template void RunTestCombineLastSimple<true, true>(const TRunParams& params); +template void RunTestCombineLastSimple<false, false>(const TRunParams& params, TTestResultCollector& printout); +template void RunTestCombineLastSimple<false, true>(const TRunParams& params, TTestResultCollector& printout); +template void RunTestCombineLastSimple<true, false>(const TRunParams& params, TTestResultCollector& printout); +template void RunTestCombineLastSimple<true, true>(const TRunParams& params, TTestResultCollector& printout); } } diff --git a/ydb/core/kqp/tools/combiner_perf/simple_last.h b/ydb/core/kqp/tools/combiner_perf/simple_last.h index a630fd82c5..90433f8141 100644 --- a/ydb/core/kqp/tools/combiner_perf/simple_last.h +++ b/ydb/core/kqp/tools/combiner_perf/simple_last.h @@ -1,12 +1,13 @@ #pragma once #include "run_params.h" +#include "printout.h" namespace NKikimr { namespace NMiniKQL { template<bool LLVM, bool Spilling> -void RunTestCombineLastSimple(const TRunParams& params); +void RunTestCombineLastSimple(const TRunParams& params, TTestResultCollector& printout); } } diff --git a/ydb/core/kqp/tools/combiner_perf/streams.cpp b/ydb/core/kqp/tools/combiner_perf/streams.cpp new file mode 100644 index 0000000000..7f0a662250 --- /dev/null +++ b/ydb/core/kqp/tools/combiner_perf/streams.cpp @@ -0,0 +1,48 @@ +#include "streams.h" + +namespace NKikimr { +namespace NMiniKQL { + +T6464Samples MakeKeyed6464Samples(const size_t numSamples, const unsigned int maxKey) { + std::default_random_engine eng; + std::uniform_int_distribution<unsigned int> keys(0, maxKey); + std::uniform_int_distribution<uint64_t> unif(0, 100000.0); + + T6464Samples samples(numSamples); + + eng.seed(std::time(nullptr)); + std::generate(samples.begin(), samples.end(), + [&]() -> auto { + return std::make_pair<uint64_t, uint64_t>(keys(eng), unif(eng)); + } + ); + + return samples; +} + +TString64Samples MakeKeyedString64Samples(const size_t numSamples, const unsigned int maxKey, const bool longStrings) { + std::default_random_engine eng; + std::uniform_int_distribution<unsigned int> keys(0, maxKey); + std::uniform_int_distribution<uint64_t> unif(0, 100000.0); + + TString64Samples samples(numSamples); + + eng.seed(std::time(nullptr)); + std::generate(samples.begin(), samples.end(), + [&]() -> auto { + auto key = keys(eng); + std::string strKey; + if (!longStrings) { + strKey = std::string(ToString(key)); + } else { + strKey = Sprintf("%07u.%07u.%07u.", key, key, key); + } + return std::make_pair<std::string, uint64_t>(std::move(strKey), unif(eng)); + } + ); + + return samples; +} + +} +}
\ No newline at end of file diff --git a/ydb/core/kqp/tools/combiner_perf/streams.h b/ydb/core/kqp/tools/combiner_perf/streams.h new file mode 100644 index 0000000000..cb955849f9 --- /dev/null +++ b/ydb/core/kqp/tools/combiner_perf/streams.h @@ -0,0 +1,290 @@ +// Sample data generators and streams + +#pragma once + +#include "converters.h" + +#include <yql/essentials/minikql/comp_nodes/ut/mkql_computation_node_ut.h> +#include <yql/essentials/minikql/computation/mkql_block_builder.h> + +namespace NKikimr { +namespace NMiniKQL { + +using T6464Samples = std::vector<std::pair<ui64, ui64>>; +T6464Samples MakeKeyed6464Samples(const size_t numSamples, const unsigned int maxKey); + +using TString64Samples = std::vector<std::pair<std::string, ui64>>; +TString64Samples MakeKeyedString64Samples(const size_t numSamples, const unsigned int maxKey, const bool longStrings); + +struct IWideStream : public NUdf::TBoxedValue +{ + NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& result) final override { + Y_UNUSED(result); + ythrow yexception() << "only WideFetch is supported here"; + } + + NUdf::EFetchStatus WideFetch(NUdf::TUnboxedValue* result, ui32 width) override = 0; +}; + +template<typename K, typename V, bool EmbeddedKeys> +struct TKVStream : public IWideStream { + using TSamples = std::vector<std::pair<K, V>>; + + TKVStream(const THolderFactory& holderFactory, const TSamples& samples, size_t iterations) + : HolderFactory(holderFactory) + , Samples(samples) + , End(Samples.end()) + , MaxIterations(iterations) + , CurrSample(Samples.begin()) + { + Y_ENSURE(samples.size() > 0); + } + +private: + const THolderFactory& HolderFactory; + const TSamples& Samples; + const TSamples::const_iterator End; + const size_t MaxIterations; + TSamples::const_iterator CurrSample = 0; + size_t Iteration = 0; + +public: + NUdf::EFetchStatus WideFetch(NUdf::TUnboxedValue* result, ui32 width) final { + Y_UNUSED(HolderFactory); // for future use + + if (CurrSample == End) { + ++Iteration; + if (Iteration >= MaxIterations) { + return NUdf::EFetchStatus::Finish; + } + CurrSample = Samples.begin(); + } + + if (width != 2) { + ythrow yexception() << "width 2 expected"; + } + + // TODO: support embedded strings in values? + NativeToUnboxed<EmbeddedKeys>(CurrSample->first, result[0]); + NativeToUnboxed<false>(CurrSample->second, result[1]); + + ++CurrSample; + + return NUdf::EFetchStatus::Ok; + } +}; + +template<typename K, typename V> +std::unordered_map<K, V> ComputeSumReferenceResult(IWideStream& referenceStream) +{ + std::unordered_map<K, V> expects; + { + NUdf::TUnboxedValue inputs[2]; + + while (referenceStream.WideFetch(inputs, 2) == NUdf::EFetchStatus::Ok) { + expects[UnboxedToNative<K>(inputs[0])] += inputs[1].Get<V>(); + } + } + return expects; +} + +template<typename K, typename V> +void VerifyListVsUnorderedMap(const NUdf::TUnboxedValue& pairList, const std::unordered_map<K, V>& map) +{ + UNIT_ASSERT_VALUES_EQUAL(pairList.GetListLength(), map.size()); + + const auto ptr = pairList.GetElements(); + for (size_t i = 0ULL; i < map.size(); ++i) { + const auto elements = ptr[i].GetElements(); + const auto key = UnboxedToNative<K>(elements[0]); + const auto value = UnboxedToNative<V>(elements[1]); + const auto ii = map.find(key); + UNIT_ASSERT(ii != map.end()); + UNIT_ASSERT_VALUES_EQUAL(value, ii->second); + } +} + +class IDataSampler +{ + virtual THolder<IWideStream> MakeStream(const THolderFactory& holderFactory) const = 0; + virtual TType* GetKeyType(TProgramBuilder& pb) const = 0; + virtual void ComputeReferenceResult(IWideStream& referenceStream) = 0; + virtual void VerifyComputedValueVsReference(const NUdf::TUnboxedValue& value) const = 0; + virtual std::string Describe() const = 0; +}; + +class T6464DataSampler : public IDataSampler +{ +public: + TKVStream<ui64, ui64, false>::TSamples Samples; + std::unordered_map<ui64, ui64> Expects; + + size_t StreamNumIters = 0; + + T6464DataSampler(size_t numSamples, size_t maxKey, size_t numIters) + : Samples(MakeKeyed6464Samples(numSamples, maxKey)) + , StreamNumIters(numIters) + { + } + + THolder<IWideStream> MakeStream(const THolderFactory& holderFactory) const override + { + return THolder(new TKVStream<ui64, ui64, false>(holderFactory, Samples, StreamNumIters)); + } + + TType* GetKeyType(TProgramBuilder& pb) const override + { + return pb.NewDataType(NUdf::TDataType<ui64>::Id); + } + + void ComputeReferenceResult(IWideStream& referenceStream) override + { + Y_ENSURE(Expects.empty()); + + Expects = ComputeSumReferenceResult<ui64, ui64>(referenceStream); + } + + void VerifyComputedValueVsReference(const NUdf::TUnboxedValue& value) const override + { + VerifyListVsUnorderedMap(value, Expects); + } + + std::string Describe() const override + { + return "ui64 keys, ui64 values"; + } +}; + +class TString64DataSampler : public IDataSampler +{ +public: + TKVStream<std::string, ui64, false>::TSamples Samples; + std::unordered_map<std::string, ui64> Expects; + + size_t StreamNumIters = 0; + bool LongStrings = false; + + TString64DataSampler(size_t numSamples, size_t maxKey, size_t numIters, bool longStrings) + : Samples(MakeKeyedString64Samples(numSamples, maxKey, longStrings)) + , StreamNumIters(numIters) + , LongStrings(longStrings) + { + } + + THolder<IWideStream> MakeStream(const THolderFactory& holderFactory) const override + { + if (LongStrings) { + return THolder(new TKVStream<std::string, ui64, false>(holderFactory, Samples, StreamNumIters)); + } else { + return THolder(new TKVStream<std::string, ui64, true>(holderFactory, Samples, StreamNumIters)); + } + } + + TType* GetKeyType(TProgramBuilder& pb) const override + { + return pb.NewDataType(NUdf::TDataType<char*>::Id); + } + + void ComputeReferenceResult(IWideStream& referenceStream) override + { + Y_ENSURE(Expects.empty()); + + Expects = ComputeSumReferenceResult<std::string, ui64>(referenceStream); + } + + void VerifyComputedValueVsReference(const NUdf::TUnboxedValue& value) const override + { + VerifyListVsUnorderedMap(value, Expects); + } + + std::string Describe() const override + { + return Sprintf("%s string keys, ui64 values", LongStrings ? "Long (24 byte)" : "Embedded"); + } +}; + + +template<typename K, typename V> +struct TBlockKVStream : public IWideStream { + using TSamples = std::vector<std::pair<K, V>>; + + TBlockKVStream(TComputationContext& ctx, const TSamples& samples, size_t iterations, size_t blockSize, const std::vector<TType*> types) + : Context(ctx) + , Samples(samples) + , End(Samples.end()) + , MaxIterations(iterations) + , CurrSample(Samples.begin()) + , BlockSize(blockSize) + , Types(types) + { + Y_ENSURE(samples.size() > 0); + } + +private: + TComputationContext& Context; + const TSamples& Samples; + const TSamples::const_iterator End; + const size_t MaxIterations; + TSamples::const_iterator CurrSample; + size_t Iteration = 0; + size_t BlockSize = 1; + const std::vector<TType*> Types; + + const TSamples::value_type* NextSample() { + if (CurrSample == End) { + ++Iteration; + if (Iteration >= MaxIterations) { + return nullptr; + } + CurrSample = Samples.begin(); + } + return CurrSample++; + } + + bool IsAtTheEnd() const { + return (CurrSample == End) && (Iteration >= MaxIterations); + } + +public: + NUdf::EFetchStatus WideFetch(NUdf::TUnboxedValue* result, ui32 width) final override { + if (width != 3) { + ythrow yexception() << "width 3 expected"; + } + + TVector<std::unique_ptr<IArrayBuilder>> builders; + std::transform(Types.cbegin(), Types.cend(), std::back_inserter(builders), + [&](const auto& type) { + return MakeArrayBuilder(TTypeInfoHelper(), type, Context.ArrowMemoryPool, BlockSize, &Context.Builder->GetPgBuilder()); + }); + + size_t count = 0; + for (; count < BlockSize; ++count) { + const auto nextTuple = NextSample(); + if (!nextTuple) { + break; + } + + NUdf::TUnboxedValue key; + NativeToUnboxed<false>(nextTuple->first, key); + builders[0]->Add(key); + + NUdf::TUnboxedValue value; + NativeToUnboxed<false>(nextTuple->second, value); + builders[1]->Add(value); + } + + if (count > 0) { + const bool finish = IsAtTheEnd(); + result[0] = Context.HolderFactory.CreateArrowBlock(builders[0]->Build(finish)); + result[1] = Context.HolderFactory.CreateArrowBlock(builders[1]->Build(finish)); + result[2] = Context.HolderFactory.CreateArrowBlock(arrow::Datum(static_cast<uint64_t>(count))); + + return NUdf::EFetchStatus::Ok; + } else { + return NUdf::EFetchStatus::Finish; + } + } +}; + +} +} diff --git a/ydb/core/kqp/tools/combiner_perf/ya.make b/ydb/core/kqp/tools/combiner_perf/ya.make index 567a9ccf29..d2e15dca53 100644 --- a/ydb/core/kqp/tools/combiner_perf/ya.make +++ b/ydb/core/kqp/tools/combiner_perf/ya.make @@ -41,12 +41,13 @@ CFLAGS( ENDIF() SRCS( - factories.cpp converters.cpp + factories.cpp simple.cpp + simple_block.cpp simple_last.cpp + streams.cpp tpch_last.cpp - simple_block.cpp ) END() |