aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPavel Zuev <uniquelogin@ydb.tech>2025-04-03 21:27:38 +0500
committerGitHub <noreply@github.com>2025-04-03 16:27:38 +0000
commitd2c5ec0ba9115cf7dcb2c6f1525677361c8898cc (patch)
tree63970175f52ee05f303992d7e0eeb44f096592ce
parent54026678e90cc945c55076444303cebd36837d0e (diff)
downloadydb-d2c5ec0ba9115cf7dcb2c6f1525677361c8898cc.tar.gz
update all tests to use <string, ui64> tuples; add markdown-table result printer (#16746)
-rw-r--r--ydb/core/kqp/tools/combiner_perf/bin/main.cpp167
-rw-r--r--ydb/core/kqp/tools/combiner_perf/bin/ya.make1
-rw-r--r--ydb/core/kqp/tools/combiner_perf/converters.h2
-rw-r--r--ydb/core/kqp/tools/combiner_perf/printout.h15
-rw-r--r--ydb/core/kqp/tools/combiner_perf/run_params.h1
-rw-r--r--ydb/core/kqp/tools/combiner_perf/simple.cpp106
-rw-r--r--ydb/core/kqp/tools/combiner_perf/simple.h5
-rw-r--r--ydb/core/kqp/tools/combiner_perf/simple_block.cpp216
-rw-r--r--ydb/core/kqp/tools/combiner_perf/simple_block.h3
-rw-r--r--ydb/core/kqp/tools/combiner_perf/simple_last.cpp266
-rw-r--r--ydb/core/kqp/tools/combiner_perf/simple_last.h3
-rw-r--r--ydb/core/kqp/tools/combiner_perf/streams.cpp48
-rw-r--r--ydb/core/kqp/tools/combiner_perf/streams.h290
-rw-r--r--ydb/core/kqp/tools/combiner_perf/ya.make5
14 files changed, 652 insertions, 476 deletions
diff --git a/ydb/core/kqp/tools/combiner_perf/bin/main.cpp b/ydb/core/kqp/tools/combiner_perf/bin/main.cpp
index 6894fbc09d..ef62a36e6c 100644
--- a/ydb/core/kqp/tools/combiner_perf/bin/main.cpp
+++ b/ydb/core/kqp/tools/combiner_perf/bin/main.cpp
@@ -1,3 +1,4 @@
+#include <ydb/core/kqp/tools/combiner_perf/printout.h>
#include <ydb/core/kqp/tools/combiner_perf/simple_last.h>
#include <ydb/core/kqp/tools/combiner_perf/simple.h>
#include <ydb/core/kqp/tools/combiner_perf/tpch_last.h>
@@ -7,61 +8,155 @@
#include <util/stream/output.h>
#include <util/stream/file.h>
+#include <util/string/printf.h>
#include <util/system/compiler.h>
-void DoFullPass(bool withSpilling)
+using NKikimr::NMiniKQL::TRunParams;
+
+class TPrintingResultCollector : public TTestResultCollector
{
- using namespace NKikimr::NMiniKQL;
+public:
+ virtual void SubmitTestNameAndParams(const TRunParams& runParams, const char* testName, const std::optional<bool> llvm, const std::optional<bool> spilling) override
+ {
+ Cout << "------------------------------" << Endl;
+ Cout << testName;
+ if (llvm.has_value()) {
+ Cout << ", " << (llvm.value() ? "+" : "-") << "llvm";
+ }
+ if (spilling.has_value()) {
+ Cout << ", " << (spilling.value() ? "+" : "-") << "spilling";
+ }
+ Cout << Endl;
+ Cout << "Data rows total: " << runParams.RowsPerRun << " x " << runParams.NumRuns << Endl;
+ Cout << (runParams.MaxKey + 1) << " distinct numeric keys" << Endl;
+ Cout << "Block size: " << runParams.BlockSize << Endl;
+ Cout << "Long strings: " << (runParams.LongStringKeys ? "yes" : "no") << Endl;
+ Cout << Endl;
+ }
- TRunParams runParams;
+ virtual void SubmitTimings(const TDuration& graphTime, const TDuration& referenceTime, const std::optional<TDuration> streamTime) override
+ {
+ Cout << "Graph runtime is: " << graphTime << " vs. reference C++ implementation: " << referenceTime << Endl;
- runParams.NumRuns = 20;
- runParams.RowsPerRun = 5'000'000;
- runParams.MaxKey = 200'000 - 1;
- runParams.BlockSize = 5'000;
- runParams.LongStringKeys = true;
+ if (streamTime.has_value()) {
+ Cout << "Input stream own iteration time: " << *streamTime << Endl;
+ Cout << "Graph time - stream own time = " << (*streamTime <= graphTime ? graphTime - *streamTime : TDuration::Zero()) << Endl;
+ Cout << "C++ implementation time - devnull time = " << (*streamTime <= referenceTime ? referenceTime - *streamTime : TDuration::Zero()) << Endl;
+ }
+ }
+};
- RunTestBlockCombineHashedSimple<false, false>(runParams);
+class TWikiResultCollector : public TTestResultCollector
+{
+public:
+ TWikiResultCollector()
+ {
+ Cout << "#|" << Endl;
+ Cout << "|| Test name | LLVM | Spilling | RowsTotal | Distinct keys | Block size | Input stream own time (s) | Graph time - stream time (s) | C++ time - stream time (s) | Shame ratio ||" << Endl;
+ }
- auto doSimpleLast = [](const TRunParams& params) {
- Cerr << "LastSimple, -llvm, -spilling" << Endl;
- NKikimr::NMiniKQL::RunTestCombineLastSimple<false, false>(params);
+ ~TWikiResultCollector()
+ {
+ Cout << "|#" << Endl;
+ }
- if (false) {
- Cerr << "LastSimple, +llvm, -spilling" << Endl;
- NKikimr::NMiniKQL::RunTestCombineLastSimple<true, false>(params);
+ virtual void SubmitTestNameAndParams(const TRunParams& runParams, const char* testName, const std::optional<bool> llvm, const std::optional<bool> spilling) override
+ {
+ Cout << "|| ";
+ Cout << testName << " | ";
+ if (llvm.has_value()) {
+ Cout << (llvm.value() ? "+" : " ");
}
- };
+ Cout << " | ";
+ if (spilling.has_value()) {
+ Cout << (spilling.value() ? "+" : " ");
+ }
+ Cout << " | ";
- doSimpleLast(runParams);
+ Cout << (runParams.RowsPerRun * runParams.NumRuns) << " | " << (runParams.MaxKey + 1) << " | ";
+ if (TStringBuf(testName).Contains("Block")) {
+ Cout << runParams.BlockSize;
+ }
+ Cout << " | ";
+ }
- if (false) {
- Cerr << "Simple, -llvm, -spilling" << Endl;
- NKikimr::NMiniKQL::RunTestSimple<false>();
+ static TString FancyDuration(const TDuration duration)
+ {
+ const auto ms = duration.MilliSeconds();
+ if (!ms) {
+ return " ";
+ }
+ return Sprintf("%.2f", (ms / 1000.0));
+ }
- Cerr << "Simple, +llvm, -spilling" << Endl;
- NKikimr::NMiniKQL::RunTestSimple<true>();
+ virtual void SubmitTimings(const TDuration& graphTime, const TDuration& referenceTime, const std::optional<TDuration> streamTime) override
+ {
+ TDuration streamTimeOrZero = (streamTime.has_value()) ? streamTime.value() : TDuration::Zero();
+ TDuration corrGraphTime = streamTimeOrZero <= graphTime ? graphTime - streamTimeOrZero : TDuration::Zero();
+ TDuration corrRefTime = streamTimeOrZero <= referenceTime ? referenceTime - streamTimeOrZero : TDuration::Zero();
- Cerr << "LastTpch, -llvm, -spilling" << Endl;
- NKikimr::NMiniKQL::RunTestLastTpch<false, false>();
+ TString diff;
+ if (corrRefTime.MilliSeconds() > 0) {
+ diff = Sprintf("%.2f", corrGraphTime.MilliSeconds() * 1.0 / corrRefTime.MilliSeconds());
+ }
- Cerr << "LastTpch, +llvm, -spilling" << Endl;
- NKikimr::NMiniKQL::RunTestLastTpch<true, false>();
+ Cout << FancyDuration(streamTimeOrZero) << " | " << FancyDuration(corrGraphTime) << " | " << FancyDuration(corrRefTime) << " | " << diff << " ||" << Endl;
+ Cout.Flush();
}
+};
- if (withSpilling) {
- Cerr << "LastSimple, -llvm, +spilling" << Endl;
- NKikimr::NMiniKQL::RunTestCombineLastSimple<false, true>(runParams);
+void DoFullPass(bool withSpilling)
+{
+ using namespace NKikimr::NMiniKQL;
- Cerr << "LastSimple, +llvm, +spilling" << Endl;
- NKikimr::NMiniKQL::RunTestCombineLastSimple<true, true>(runParams);
+ TWikiResultCollector printout;
- Cerr << "LastTpch, -llvm, +spilling" << Endl;
- NKikimr::NMiniKQL::RunTestLastTpch<false, true>();
+ TRunParams runParams;
- Cerr << "LastTpch, +llvm, +spilling" << Endl;
- NKikimr::NMiniKQL::RunTestLastTpch<true, true>();
- }
+ runParams.NumRuns = 20;
+ runParams.RowsPerRun = 5'000'000;
+ runParams.MaxKey = 1'00 - 1;
+ runParams.LongStringKeys = false;
+
+ const std::vector<size_t> numKeys = {4u, 1000u, 100'000u, 200'000u};
+ const std::vector<size_t> blockSizes = {128u, 8192u};
+
+ auto doSimple = [&printout, numKeys](const TRunParams& params) {
+ for (size_t keyCount : numKeys) {
+ auto runParams = params;
+ runParams.MaxKey = keyCount - 1;
+ RunTestSimple<false>(runParams, printout);
+ RunTestSimple<true>(runParams, printout);
+ }
+ };
+
+ auto doSimpleLast = [&printout, &numKeys, withSpilling](const TRunParams& params) {
+ for (size_t keyCount : numKeys) {
+ auto runParams = params;
+ runParams.MaxKey = keyCount - 1;
+ RunTestCombineLastSimple<false, false>(runParams, printout);
+ RunTestCombineLastSimple<true, false>(runParams, printout);
+ if (withSpilling) {
+ RunTestCombineLastSimple<false, true>(runParams, printout);
+ RunTestCombineLastSimple<true, true>(runParams, printout);
+ }
+ }
+ };
+
+ auto doBlockHashed = [&printout, &numKeys, &blockSizes](const TRunParams& params) {
+ for (size_t keyCount : numKeys) {
+ for (size_t blockSize : blockSizes) {
+ auto runParams = params;
+ runParams.MaxKey = keyCount - 1;
+ runParams.BlockSize = blockSize;
+ RunTestBlockCombineHashedSimple<false, false>(runParams, printout);
+ }
+ }
+ };
+
+ doSimple(runParams);
+ doSimpleLast(runParams);
+ doBlockHashed(runParams);
}
int main(int argc, const char* argv[])
diff --git a/ydb/core/kqp/tools/combiner_perf/bin/ya.make b/ydb/core/kqp/tools/combiner_perf/bin/ya.make
index ec0196df29..2bce61f2f7 100644
--- a/ydb/core/kqp/tools/combiner_perf/bin/ya.make
+++ b/ydb/core/kqp/tools/combiner_perf/bin/ya.make
@@ -21,4 +21,3 @@ SRCS(
)
END()
-
diff --git a/ydb/core/kqp/tools/combiner_perf/converters.h b/ydb/core/kqp/tools/combiner_perf/converters.h
index 8d63107a8d..76a76062ff 100644
--- a/ydb/core/kqp/tools/combiner_perf/converters.h
+++ b/ydb/core/kqp/tools/combiner_perf/converters.h
@@ -1,3 +1,5 @@
+#pragma once
+
#include <yql/essentials/minikql/comp_nodes/ut/mkql_computation_node_ut.h>
#include <yql/essentials/minikql/computation/mkql_computation_node_holders.h>
#include <yql/essentials/minikql/comp_nodes/mkql_factories.h>
diff --git a/ydb/core/kqp/tools/combiner_perf/printout.h b/ydb/core/kqp/tools/combiner_perf/printout.h
new file mode 100644
index 0000000000..0f2f431e89
--- /dev/null
+++ b/ydb/core/kqp/tools/combiner_perf/printout.h
@@ -0,0 +1,15 @@
+#pragma once
+
+#include "run_params.h"
+
+#include <util/datetime/base.h>
+#include <optional>
+
+class TTestResultCollector {
+public:
+ virtual void SubmitTestNameAndParams(const NKikimr::NMiniKQL::TRunParams& runParams, const char* testName, const std::optional<bool> llvm = {}, const std::optional<bool> spilling = {}) = 0;
+
+ virtual void SubmitTimings(const TDuration& graphTime, const TDuration& referenceTime, const std::optional<TDuration> streamTime = {}) = 0;
+
+ virtual ~TTestResultCollector() {};
+};
diff --git a/ydb/core/kqp/tools/combiner_perf/run_params.h b/ydb/core/kqp/tools/combiner_perf/run_params.h
index c3c350d259..a12576e2d2 100644
--- a/ydb/core/kqp/tools/combiner_perf/run_params.h
+++ b/ydb/core/kqp/tools/combiner_perf/run_params.h
@@ -2,7 +2,6 @@
#include <util/system/defaults.h>
-
namespace NKikimr {
namespace NMiniKQL {
diff --git a/ydb/core/kqp/tools/combiner_perf/simple.cpp b/ydb/core/kqp/tools/combiner_perf/simple.cpp
index b9cc26eaae..6ada8c91d3 100644
--- a/ydb/core/kqp/tools/combiner_perf/simple.cpp
+++ b/ydb/core/kqp/tools/combiner_perf/simple.cpp
@@ -1,5 +1,8 @@
#include "simple.h"
+
#include "factories.h"
+#include "streams.h"
+#include "printout.h"
#include <yql/essentials/minikql/comp_nodes/ut/mkql_computation_node_ut.h>
#include <yql/essentials/minikql/computation/mkql_computation_node_holders.h>
@@ -13,74 +16,71 @@
namespace NKikimr {
namespace NMiniKQL {
-std::vector<std::pair<i8, double>> MakeSamples() {
- constexpr auto total_samples = 100'000'000ULL;
-
- std::default_random_engine eng;
- std::uniform_int_distribution<int> keys(-100, +100);
- std::uniform_real_distribution<double> unif(-999.0, +999.0);
-
- std::vector<std::pair<i8, double>> samples(total_samples);
-
- eng.seed(std::time(nullptr));
- std::generate(samples.begin(), samples.end(), std::bind(&std::make_pair<i8, double>, std::bind(std::move(keys), std::move(eng)), std::bind(std::move(unif), std::move(eng))));
- return samples;
-}
-
-const std::vector<std::pair<i8, double>> I8Samples = MakeSamples();
-
template<bool LLVM>
-void RunTestSimple()
+void RunTestSimple(const TRunParams& params, TTestResultCollector& printout)
{
TSetup<LLVM> setup(GetPerfTestFactory());
- Cerr << "Simple i8 sample has " << I8Samples.size() << " rows" << Endl;
+ printout.SubmitTestNameAndParams(params, __func__, LLVM);
- double positive = 0.0, negative = 0.0;
- const auto t = TInstant::Now();
- for (const auto& sample : I8Samples) {
- (sample.second > 0.0 ? positive : negative) += sample.second;
- }
- const auto cppTime = TInstant::Now() - t;
+ TString64DataSampler sampler(params.RowsPerRun, params.MaxKey, params.NumRuns, params.LongStringKeys);
+ // or T6464DataSampler sampler(numSamples, maxKey, numIters); -- maybe make selectable from params
+ Cerr << "Sampler type: " << sampler.Describe() << Endl;
TProgramBuilder& pb = *setup.PgmBuilder;
- const auto listType = pb.NewListType(pb.NewDataType(NUdf::TDataType<double>::Id));
- const auto list = TCallableBuilder(pb.GetTypeEnvironment(), "TestList", listType).Build();
-
- const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideCombiner(pb.ExpandMap(pb.ToFlow(TRuntimeNode(list, false)),
- [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; }), 0ULL,
- [&](TRuntimeNode::TList items) -> TRuntimeNode::TList { return {pb.AggrGreater(items.front(), pb.NewDataLiteral(0.0))}; },
- [&](TRuntimeNode::TList, TRuntimeNode::TList items) -> TRuntimeNode::TList { return items; },
- [&](TRuntimeNode::TList, TRuntimeNode::TList items, TRuntimeNode::TList state) -> TRuntimeNode::TList { return {pb.AggrAdd(state.front(), items.front())}; },
- [&](TRuntimeNode::TList, TRuntimeNode::TList state) -> TRuntimeNode::TList { return state; }),
- [&](TRuntimeNode::TList items) { return items.front(); }
+ const auto streamItemType = pb.NewMultiType({sampler.GetKeyType(pb), pb.NewDataType(NUdf::TDataType<ui64>::Id)});
+ const auto streamType = pb.NewStreamType(streamItemType);
+ const auto streamCallable = TCallableBuilder(pb.GetTypeEnvironment(), "TestList", streamType).Build();
+
+ const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideCombiner(
+ pb.ToFlow(TRuntimeNode(streamCallable, false)),
+ 0ULL,
+ [&](TRuntimeNode::TList items) -> TRuntimeNode::TList { return { items.front() }; },
+ [&](TRuntimeNode::TList, TRuntimeNode::TList items) -> TRuntimeNode::TList { return { items.back() } ; },
+ [&](TRuntimeNode::TList, TRuntimeNode::TList items, TRuntimeNode::TList state) -> TRuntimeNode::TList {
+ return {pb.AggrAdd(state.front(), items.back())};
+ },
+ [&](TRuntimeNode::TList keys, TRuntimeNode::TList state) -> TRuntimeNode::TList {
+ return {keys.front(), state.front()};
+ }),
+ [&](TRuntimeNode::TList items) { return pb.NewTuple(items); }
));
- const auto graph = setup.BuildGraph(pgmReturn, {list});
- NUdf::TUnboxedValue* items = nullptr;
- graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), graph->GetHolderFactory().CreateDirectArrayHolder(I8Samples.size(), items));
- std::transform(I8Samples.cbegin(), I8Samples.cend(), items, [](const std::pair<i8, double> s){ return ToValue<double>(s.second); });
+ const auto graph = setup.BuildGraph(pgmReturn, {streamCallable});
- const auto t1 = TInstant::Now();
- const auto& value = graph->GetValue();
- const auto first = value.GetElement(0);
- const auto second = value.GetElement(1);
- const auto t2 = TInstant::Now();
-
- if (first.template Get<double>() > 0.0) {
- UNIT_ASSERT_VALUES_EQUAL(first.template Get<double>(), positive);
- UNIT_ASSERT_VALUES_EQUAL(second.template Get<double>(), negative);
- } else {
- UNIT_ASSERT_VALUES_EQUAL(first.template Get<double>(), negative);
- UNIT_ASSERT_VALUES_EQUAL(second.template Get<double>(), positive);
+ // Measure the input stream run time
+ const auto devnullStream = sampler.MakeStream(graph->GetHolderFactory());
+ const auto devnullStart = TInstant::Now();
+ {
+ NUdf::TUnboxedValue columns[2];
+ while (devnullStream->WideFetch(columns, 2) == NUdf::EFetchStatus::Ok) {
+ }
}
+ const auto devnullTime = TInstant::Now() - devnullStart;
+
+ // Reference implementation (sum via an std::unordered_map)
+ auto referenceStream = sampler.MakeStream(graph->GetHolderFactory());
+ const auto t = TInstant::Now();
+ sampler.ComputeReferenceResult(*referenceStream);
+ const auto cppTime = TInstant::Now() - t;
+
+ // Compute graph implementation
+ auto myStream = NUdf::TUnboxedValuePod(sampler.MakeStream(graph->GetHolderFactory()).Release());
+ graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), std::move(myStream));
+
+ const auto graphTimeStart = TInstant::Now();
+ const auto& value = graph->GetValue();
+ const auto graphTime = TInstant::Now() - graphTimeStart;
+
+ // Verification
+ sampler.VerifyComputedValueVsReference(value);
- Cerr << "WideCombiner graph runtime is: " << t2 - t1 << " vs. reference C++ implementation: " << cppTime << Endl << Endl;
+ printout.SubmitTimings(graphTime, cppTime, devnullTime);
}
-template void RunTestSimple<false>();
-template void RunTestSimple<true>();
+template void RunTestSimple<false>(const TRunParams& params, TTestResultCollector& printout);
+template void RunTestSimple<true>(const TRunParams& params, TTestResultCollector& printout);
}
}
diff --git a/ydb/core/kqp/tools/combiner_perf/simple.h b/ydb/core/kqp/tools/combiner_perf/simple.h
index e2bc3f44da..193312919f 100644
--- a/ydb/core/kqp/tools/combiner_perf/simple.h
+++ b/ydb/core/kqp/tools/combiner_perf/simple.h
@@ -1,10 +1,13 @@
#pragma once
+#include "run_params.h"
+#include "printout.h"
+
namespace NKikimr {
namespace NMiniKQL {
template<bool LLVM>
-void RunTestSimple();
+void RunTestSimple(const TRunParams& params, TTestResultCollector& printout);
}
}
diff --git a/ydb/core/kqp/tools/combiner_perf/simple_block.cpp b/ydb/core/kqp/tools/combiner_perf/simple_block.cpp
index 9b9a71351b..52ebdc61ca 100644
--- a/ydb/core/kqp/tools/combiner_perf/simple_block.cpp
+++ b/ydb/core/kqp/tools/combiner_perf/simple_block.cpp
@@ -1,7 +1,8 @@
#include "simple_block.h"
#include "factories.h"
-#include "converters.h"
+#include "streams.h"
+#include "printout.h"
#include <yql/essentials/minikql/comp_nodes/ut/mkql_computation_node_ut.h>
#include <yql/essentials/minikql/computation/mkql_computation_node_holders.h>
@@ -11,6 +12,8 @@
#include <contrib/libs/apache/arrow/cpp/src/arrow/type_fwd.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/array/array_primitive.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/chunked_array.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/array.h>
#include <library/cpp/testing/unittest/registar.h>
@@ -19,117 +22,10 @@
namespace NKikimr {
namespace NMiniKQL {
+template<typename K>
+void UpdateMapFromBlocks(const NUdf::TUnboxedValue& key, const NUdf::TUnboxedValue& value, std::unordered_map<K, uint64_t>& result);
-using T6464Samples = std::vector<std::pair<uint64_t, uint64_t>>;
-using TString64Samples = std::vector<std::pair<std::string, uint64_t>>;
-
-T6464Samples MakeKeyed6464SamplesForBlocks(const size_t numSamples, const unsigned int maxKey) {
- std::default_random_engine eng;
- std::uniform_int_distribution<unsigned int> keys(0, maxKey);
- std::uniform_int_distribution<uint64_t> unif(0, 100000.0);
-
- T6464Samples samples(numSamples);
-
- eng.seed(std::time(nullptr));
- std::generate(samples.begin(), samples.end(),
- [&]() -> auto {
- return std::make_pair<uint64_t, uint64_t>(keys(eng), unif(eng));
- }
- );
-
- return samples;
-}
-
-struct IWideBlockStream : public NUdf::TBoxedValue
-{
- NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& result) final override {
- Y_UNUSED(result);
- ythrow yexception() << "only WideFetch is supported here";
- }
-};
-
-template<typename K, typename V>
-struct TBlockStream : public IWideBlockStream {
- using TSamples = std::vector<std::pair<K, V>>;
-
- TBlockStream(TComputationContext& ctx, const TSamples& samples, size_t iterations, size_t blockSize, const std::vector<TType*> types)
- : Context(ctx)
- , Samples(samples)
- , End(Samples.end())
- , MaxIterations(iterations)
- , CurrSample(Samples.begin())
- , BlockSize(blockSize)
- , Types(types)
- {
- Y_ENSURE(samples.size() > 0);
- }
-
-private:
- TComputationContext& Context;
- const TSamples& Samples;
- const TSamples::const_iterator End;
- const size_t MaxIterations;
- TSamples::const_iterator CurrSample;
- size_t Iteration = 0;
- size_t BlockSize = 1;
- const std::vector<TType*> Types;
-
- const TSamples::value_type* NextSample() {
- if (CurrSample == End) {
- ++Iteration;
- if (Iteration >= MaxIterations) {
- return nullptr;
- }
- CurrSample = Samples.begin();
- }
- return CurrSample++;
- }
-
- bool IsAtTheEnd() const {
- return (CurrSample == End) && (Iteration >= MaxIterations);
- }
-
-public:
- NUdf::EFetchStatus WideFetch(NUdf::TUnboxedValue* result, ui32 width) final override {
- if (width != 3) {
- ythrow yexception() << "width 3 expected";
- }
-
- TVector<std::unique_ptr<IArrayBuilder>> builders;
- std::transform(Types.cbegin(), Types.cend(), std::back_inserter(builders),
- [&](const auto& type) {
- return MakeArrayBuilder(TTypeInfoHelper(), type, Context.ArrowMemoryPool, BlockSize, &Context.Builder->GetPgBuilder());
- });
-
- size_t count = 0;
- for (; count < BlockSize; ++count) {
- const auto nextTuple = NextSample();
- if (!nextTuple) {
- break;
- }
-
- NUdf::TUnboxedValue key;
- NativeToUnboxed<false>(nextTuple->first, key);
- builders[0]->Add(key);
-
- NUdf::TUnboxedValue value;
- NativeToUnboxed<false>(nextTuple->second, value);
- builders[1]->Add(value);
- }
-
- if (count > 0) {
- const bool finish = IsAtTheEnd();
- result[0] = Context.HolderFactory.CreateArrowBlock(builders[0]->Build(finish));
- result[1] = Context.HolderFactory.CreateArrowBlock(builders[1]->Build(finish));
- result[2] = Context.HolderFactory.CreateArrowBlock(arrow::Datum(static_cast<uint64_t>(count)));
-
- return NUdf::EFetchStatus::Ok;
- } else {
- return NUdf::EFetchStatus::Finish;
- }
- }
-};
-
+template<>
void UpdateMapFromBlocks(const NUdf::TUnboxedValue& key, const NUdf::TUnboxedValue& value, std::unordered_map<uint64_t, uint64_t>& result)
{
auto datumKey = TArrowBlock::From(key).GetDatum();
@@ -149,20 +45,55 @@ void UpdateMapFromBlocks(const NUdf::TUnboxedValue& key, const NUdf::TUnboxedVal
}
}
+template<>
+void UpdateMapFromBlocks(const NUdf::TUnboxedValue& key, const NUdf::TUnboxedValue& value, std::unordered_map<std::string, uint64_t>& result)
+{
+ auto datumKey = TArrowBlock::From(key).GetDatum();
+ auto datumValue = TArrowBlock::From(value).GetDatum();
+ UNIT_ASSERT(datumKey.is_arraylike());
+ UNIT_ASSERT(datumValue.is_array());
+
+ const auto ui64values = datumValue.template array_as<arrow::UInt64Array>();
+ UNIT_ASSERT(!!ui64values);
+
+ int64_t valueOffset = 0;
+
+ for (const auto& chunk : datumKey.chunks()) {
+ auto* barray = dynamic_cast<arrow::BinaryArray*>(chunk.get());
+ UNIT_ASSERT(barray != nullptr);
+ for (int64_t i = 0; i < barray->length(); ++i) {
+ auto key = barray->GetString(i);
+ auto val = ui64values->Value(valueOffset);
+ result[key] += val;
+ ++valueOffset;
+ }
+ }
+}
+
+
+template<typename TStream, typename TMap>
+void CalcRefResult(TStream& refStream, TMap& refResult)
+{
+ NUdf::TUnboxedValue columns[3];
+
+ while (refStream->WideFetch(columns, 3) == NUdf::EFetchStatus::Ok) {
+ UpdateMapFromBlocks(columns[0], columns[1], refResult);
+ }
+}
+
+
template<bool LLVM, bool Spilling>
-void RunTestBlockCombineHashedSimple(const TRunParams& params)
+void RunTestBlockCombineHashedSimple(const TRunParams& params, TTestResultCollector& printout)
{
TSetup<LLVM, Spilling> setup(GetPerfTestFactory());
- Cerr << "Data rows total: " << params.RowsPerRun << " x " << params.NumRuns << Endl;
- Cerr << (params.MaxKey + 1) << " distinct numeric keys" << Endl;
- Cerr << "Block size: " << params.BlockSize << Endl;
-
- auto samples = MakeKeyed6464SamplesForBlocks(params.RowsPerRun, params.MaxKey);
+ printout.SubmitTestNameAndParams(params, __func__, LLVM, Spilling);
+
+ auto samples = MakeKeyedString64Samples(params.RowsPerRun, params.MaxKey, false);
TProgramBuilder& pb = *setup.PgmBuilder;
- auto keyBaseType = pb.NewDataType(NUdf::TDataType<ui64>::Id);
+ auto keyBaseType = pb.NewDataType(NUdf::TDataType<char*>::Id);
auto valueBaseType = pb.NewDataType(NUdf::TDataType<ui64>::Id);
auto keyBlockType = pb.NewBlockType(keyBaseType, TBlockType::EShape::Many);
auto valueBlockType = pb.NewBlockType(valueBaseType, TBlockType::EShape::Many);
@@ -194,7 +125,7 @@ void RunTestBlockCombineHashedSimple(const TRunParams& params)
const auto graph = setup.BuildGraph(pgmReturn, {streamCallable});
auto streamMaker = [&]() -> auto {
- return std::unique_ptr<TBlockStream<uint64_t, uint64_t>>(new TBlockStream<uint64_t, uint64_t>(
+ return std::unique_ptr<TBlockKVStream<std::string, uint64_t>>(new TBlockKVStream<std::string, uint64_t>(
graph->GetContext(),
samples,
params.NumRuns,
@@ -203,24 +134,30 @@ void RunTestBlockCombineHashedSimple(const TRunParams& params)
));
};
- std::unordered_map<uint64_t, uint64_t> rawResult;
+ // Compute results directly from raw samples to test the input stream implementation
+ std::unordered_map<std::string, uint64_t> rawResult;
for (const auto& tuple : samples) {
- rawResult[tuple.first] += (tuple.second * params.NumRuns);
+ rawResult[tuple.first] += (tuple.second * params.NumRuns);
}
- std::unordered_map<uint64_t, uint64_t> refResult;
- const auto refStream = streamMaker();
-
- const auto cppStart = TInstant::Now();
+ // Measure the input stream run time
+ const auto devnullStream = streamMaker();
+ const auto devnullStart = TInstant::Now();
{
NUdf::TUnboxedValue columns[3];
-
- while (refStream->WideFetch(columns, 3) == NUdf::EFetchStatus::Ok) {
- UpdateMapFromBlocks(columns[0], columns[1], refResult);
+ while (devnullStream->WideFetch(columns, 3) == NUdf::EFetchStatus::Ok) {
}
}
+ const auto devnullTime = TInstant::Now() - devnullStart;
+
+ // Reference implementation (sum via an std::unordered_map)
+ std::unordered_map<std::string, uint64_t> refResult;
+ const auto refStream = streamMaker();
+ const auto cppStart = TInstant::Now();
+ CalcRefResult(refStream, refResult);
const auto cppTime = TInstant::Now() - cppStart;
+ // Verify the reference stream implementation against the raw samples
UNIT_ASSERT_VALUES_EQUAL(refResult.size(), rawResult.size());
for (const auto& tuple : rawResult) {
auto otherIt = refResult.find(tuple.first);
@@ -228,6 +165,7 @@ void RunTestBlockCombineHashedSimple(const TRunParams& params)
UNIT_ASSERT_VALUES_EQUAL(tuple.second, otherIt->second);
}
+ // Compute graph implementation
auto stream = NUdf::TUnboxedValuePod(streamMaker().release());
graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), std::move(stream));
const auto graphStart = TInstant::Now();
@@ -235,16 +173,18 @@ void RunTestBlockCombineHashedSimple(const TRunParams& params)
const auto graphTime = TInstant::Now() - graphStart;
size_t numResultItems = resultList.GetListLength();
+ Cerr << "Result block count: " << numResultItems << Endl;
- std::unordered_map<uint64_t, uint64_t> graphResult;
+ // Verification
+ std::unordered_map<std::string, uint64_t> graphResult;
const auto ptr = resultList.GetElements();
- for (size_t i = 0ULL; i < numResultItems; ++i) {
+ for (size_t i = 0ULL; i < numResultItems; ++i) {
UNIT_ASSERT(ptr[i].GetListLength() >= 2);
const auto elements = ptr[i].GetElements();
-
- UpdateMapFromBlocks(elements[0], elements[1], graphResult);
+
+ UpdateMapFromBlocks(elements[0], elements[1], graphResult);
}
UNIT_ASSERT_VALUES_EQUAL(refResult.size(), graphResult.size());
@@ -254,13 +194,15 @@ void RunTestBlockCombineHashedSimple(const TRunParams& params)
UNIT_ASSERT_VALUES_EQUAL(tuple.second, graphIt->second);
}
- Cerr << "BlockCombineHashed graph runtime is: " << graphTime << " vs. reference C++ implementation: " << cppTime << "" << Endl << Endl;
+ Cerr << "Graph time raw: " << graphTime << Endl;
+ Cerr << "CPP time raw: " << cppTime << Endl;
+ printout.SubmitTimings(graphTime, cppTime, devnullTime);
}
-template void RunTestBlockCombineHashedSimple<false, false>(const TRunParams& params);
-template void RunTestBlockCombineHashedSimple<false, true>(const TRunParams& params);
-template void RunTestBlockCombineHashedSimple<true, false>(const TRunParams& params);
-template void RunTestBlockCombineHashedSimple<true, true>(const TRunParams& params);
+template void RunTestBlockCombineHashedSimple<false, false>(const TRunParams& params, TTestResultCollector& printout);
+template void RunTestBlockCombineHashedSimple<false, true>(const TRunParams& params, TTestResultCollector& printout);
+template void RunTestBlockCombineHashedSimple<true, false>(const TRunParams& params, TTestResultCollector& printout);
+template void RunTestBlockCombineHashedSimple<true, true>(const TRunParams& params, TTestResultCollector& printout);
}
diff --git a/ydb/core/kqp/tools/combiner_perf/simple_block.h b/ydb/core/kqp/tools/combiner_perf/simple_block.h
index b05c5788c4..e57554a10a 100644
--- a/ydb/core/kqp/tools/combiner_perf/simple_block.h
+++ b/ydb/core/kqp/tools/combiner_perf/simple_block.h
@@ -1,12 +1,13 @@
#pragma once
#include "run_params.h"
+#include "printout.h"
namespace NKikimr {
namespace NMiniKQL {
template<bool LLVM, bool Spilling>
-void RunTestBlockCombineHashedSimple(const TRunParams& params);
+void RunTestBlockCombineHashedSimple(const TRunParams& params, TTestResultCollector& printout);
}
}
diff --git a/ydb/core/kqp/tools/combiner_perf/simple_last.cpp b/ydb/core/kqp/tools/combiner_perf/simple_last.cpp
index 1bf311a17f..fe880637c7 100644
--- a/ydb/core/kqp/tools/combiner_perf/simple_last.cpp
+++ b/ydb/core/kqp/tools/combiner_perf/simple_last.cpp
@@ -2,6 +2,8 @@
#include "factories.h"
#include "converters.h"
+#include "streams.h"
+#include "printout.h"
#include <yql/essentials/minikql/comp_nodes/ut/mkql_computation_node_ut.h>
#include <yql/essentials/minikql/computation/mkql_computation_node_holders.h>
@@ -15,251 +17,19 @@
namespace NKikimr {
namespace NMiniKQL {
-using T6464Samples = std::vector<std::pair<ui64, ui64>>;
-using TString64Samples = std::vector<std::pair<std::string, ui64>>;
-
-T6464Samples MakeKeyed6464Samples(const size_t numSamples, const unsigned int maxKey) {
- std::default_random_engine eng;
- std::uniform_int_distribution<unsigned int> keys(0, maxKey);
- std::uniform_int_distribution<ui64> unif(0, 100000.0);
-
- T6464Samples samples(numSamples);
-
- eng.seed(std::time(nullptr));
- std::generate(samples.begin(), samples.end(),
- [&]() -> auto {
- return std::make_pair<ui64, ui64>(keys(eng), unif(eng));
- }
- );
-
- return samples;
-}
-
-TString64Samples MakeKeyedString64Samples(const size_t numSamples, const unsigned int maxKey, const bool longStrings) {
- std::default_random_engine eng;
- std::uniform_int_distribution<unsigned int> keys(0, maxKey);
- std::uniform_int_distribution<ui64> unif(0, 100000.0);
-
- TString64Samples samples(numSamples);
-
- eng.seed(std::time(nullptr));
- std::generate(samples.begin(), samples.end(),
- [&]() -> auto {
- auto key = keys(eng);
- std::string strKey;
- if (!longStrings) {
- strKey = std::string(ToString(key));
- } else {
- strKey = Sprintf("%07u.%07u.%07u.", key, key, key);
- }
- return std::make_pair<std::string, ui64>(std::move(strKey), unif(eng));
- }
- );
-
- return samples;
-}
-
-struct IWideStream : public NUdf::TBoxedValue
-{
- NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& result) final {
- Y_UNUSED(result);
- ythrow yexception() << "only WideFetch is supported here";
- }
-
- NUdf::EFetchStatus WideFetch(NUdf::TUnboxedValue* result, ui32 width) = 0;
-};
-
-template<typename K, typename V, bool EmbeddedKeys>
-struct TStream : public IWideStream {
- using TSamples = std::vector<std::pair<K, V>>;
-
- TStream(const THolderFactory& holderFactory, const TSamples& samples, size_t iterations)
- : HolderFactory(holderFactory)
- , Samples(samples)
- , End(Samples.end())
- , MaxIterations(iterations)
- , CurrSample(Samples.begin())
- {
- Y_ENSURE(samples.size() > 0);
- }
-
-private:
- const THolderFactory& HolderFactory;
- const TSamples& Samples;
- const TSamples::const_iterator End;
- const size_t MaxIterations;
- TSamples::const_iterator CurrSample = 0;
- size_t Iteration = 0;
-
-public:
- NUdf::EFetchStatus WideFetch(NUdf::TUnboxedValue* result, ui32 width) final {
- Y_UNUSED(HolderFactory); // for future use
-
- if (CurrSample == End) {
- ++Iteration;
- if (Iteration >= MaxIterations) {
- return NUdf::EFetchStatus::Finish;
- }
- CurrSample = Samples.begin();
- }
-
- if (width != 2) {
- ythrow yexception() << "width 2 expected";
- }
-
- // TODO: support embedded strings in values?
- NativeToUnboxed<EmbeddedKeys>(CurrSample->first, result[0]);
- NativeToUnboxed<false>(CurrSample->second, result[1]);
-
- ++CurrSample;
-
- return NUdf::EFetchStatus::Ok;
- }
-};
-
-template<typename K, typename V>
-std::unordered_map<K, V> ComputeSumReferenceResult(IWideStream& referenceStream)
-{
- std::unordered_map<K, V> expects;
- {
- NUdf::TUnboxedValue inputs[2];
-
- while (referenceStream.WideFetch(inputs, 2) == NUdf::EFetchStatus::Ok) {
- expects[UnboxedToNative<K>(inputs[0])] += inputs[1].Get<V>();
- }
- }
- return expects;
-}
-
-template<typename K, typename V>
-void VerifyListVsUnorderedMap(const NUdf::TUnboxedValue& pairList, const std::unordered_map<K, V>& map)
-{
- UNIT_ASSERT_VALUES_EQUAL(pairList.GetListLength(), map.size());
-
- const auto ptr = pairList.GetElements();
- for (size_t i = 0ULL; i < map.size(); ++i) {
- const auto elements = ptr[i].GetElements();
- const auto key = UnboxedToNative<K>(elements[0]);
- const auto value = UnboxedToNative<V>(elements[1]);
- const auto ii = map.find(key);
- UNIT_ASSERT(ii != map.end());
- UNIT_ASSERT_VALUES_EQUAL(value, ii->second);
- }
-}
-
-class IDataSampler
-{
- virtual THolder<IWideStream> MakeStream(const THolderFactory& holderFactory) const = 0;
- virtual TType* GetKeyType(TProgramBuilder& pb) const = 0;
- virtual void ComputeReferenceResult(IWideStream& referenceStream) = 0;
- virtual void VerifyComputedValueVsReference(const NUdf::TUnboxedValue& value) const = 0;
- virtual std::string Describe() const = 0;
-};
-
-class T6464DataSampler : public IDataSampler
-{
-public:
- TStream<ui64, ui64, false>::TSamples Samples;
- std::unordered_map<ui64, ui64> Expects;
-
- size_t StreamNumIters = 0;
-
- T6464DataSampler(size_t numSamples, size_t maxKey, size_t numIters)
- : Samples(MakeKeyed6464Samples(numSamples, maxKey))
- , StreamNumIters(numIters)
- {
- }
-
- THolder<IWideStream> MakeStream(const THolderFactory& holderFactory) const override
- {
- return THolder(new TStream<ui64, ui64, false>(holderFactory, Samples, StreamNumIters));
- }
-
- TType* GetKeyType(TProgramBuilder& pb) const override
- {
- return pb.NewDataType(NUdf::TDataType<ui64>::Id);
- }
-
- void ComputeReferenceResult(IWideStream& referenceStream) override
- {
- Y_ENSURE(Expects.empty());
-
- Expects = ComputeSumReferenceResult<ui64, ui64>(referenceStream);
- }
-
- void VerifyComputedValueVsReference(const NUdf::TUnboxedValue& value) const override
- {
- VerifyListVsUnorderedMap(value, Expects);
- }
-
- std::string Describe() const override
- {
- return "ui64 keys, ui64 values";
- }
-};
-
-class TString64DataSampler : public IDataSampler
-{
-public:
- TStream<std::string, ui64, false>::TSamples Samples;
- std::unordered_map<std::string, ui64> Expects;
-
- size_t StreamNumIters = 0;
- bool LongStrings = false;
-
- TString64DataSampler(size_t numSamples, size_t maxKey, size_t numIters, bool longStrings)
- : Samples(MakeKeyedString64Samples(numSamples, maxKey, longStrings))
- , StreamNumIters(numIters)
- , LongStrings(longStrings)
- {
- }
-
- THolder<IWideStream> MakeStream(const THolderFactory& holderFactory) const override
- {
- if (LongStrings) {
- return THolder(new TStream<std::string, ui64, false>(holderFactory, Samples, StreamNumIters));
- } else {
- return THolder(new TStream<std::string, ui64, true>(holderFactory, Samples, StreamNumIters));
- }
- }
-
- TType* GetKeyType(TProgramBuilder& pb) const override
- {
- return pb.NewDataType(NUdf::TDataType<char*>::Id);
- }
-
- void ComputeReferenceResult(IWideStream& referenceStream) override
- {
- Y_ENSURE(Expects.empty());
-
- Expects = ComputeSumReferenceResult<std::string, ui64>(referenceStream);
- }
-
- void VerifyComputedValueVsReference(const NUdf::TUnboxedValue& value) const override
- {
- VerifyListVsUnorderedMap(value, Expects);
- }
-
- std::string Describe() const override
- {
- return Sprintf("%s string keys, ui64 values", LongStrings ? "Long (24 byte)" : "Embedded");
- }
-};
-
template<bool LLVM, bool Spilling>
-void RunTestCombineLastSimple(const TRunParams& params)
+void RunTestCombineLastSimple(const TRunParams& params, TTestResultCollector& printout)
{
TSetup<LLVM, Spilling> setup(GetPerfTestFactory());
+ printout.SubmitTestNameAndParams(params, __func__, LLVM, Spilling);
+
const size_t numSamples = params.RowsPerRun;
const size_t numIters = params.NumRuns; // Will process numSamples * numIters items
const size_t maxKey = params.MaxKey; // maxKey + 1 distinct keys, each key multiplied by 64 for some reason
- Cerr << "Data rows total: " << numSamples << " x " << numIters << Endl;
- Cerr << (maxKey + 1) << " distinct numeric keys" << Endl;
-
- // TString64DataSampler sampler(numSamples, maxKey, numIters, params.LongStringKeys);
- T6464DataSampler sampler(numSamples, maxKey, numIters);
+ TString64DataSampler sampler(numSamples, maxKey, numIters, params.LongStringKeys);
+ // or T6464DataSampler sampler(numSamples, maxKey, numIters); -- maybe make selectable from params
Cerr << "Sampler type: " << sampler.Describe() << Endl;
TProgramBuilder& pb = *setup.PgmBuilder;
@@ -294,6 +64,16 @@ void RunTestCombineLastSimple(const TRunParams& params)
graph->GetContext().SpillerFactory = std::make_shared<TMockSpillerFactory>();
}
+ // Measure the input stream run time
+ const auto devnullStream = sampler.MakeStream(graph->GetHolderFactory());
+ const auto devnullStart = TInstant::Now();
+ {
+ NUdf::TUnboxedValue columns[2];
+ while (devnullStream->WideFetch(columns, 2) == NUdf::EFetchStatus::Ok) {
+ }
+ }
+ const auto devnullTime = TInstant::Now() - devnullStart;
+
// Reference implementation (sum via an std::unordered_map)
auto referenceStream = sampler.MakeStream(graph->GetHolderFactory());
const auto t = TInstant::Now();
@@ -306,18 +86,18 @@ void RunTestCombineLastSimple(const TRunParams& params)
const auto t1 = TInstant::Now();
const auto& value = graph->GetValue();
- const auto t2 = TInstant::Now();
+ const auto graphTime = TInstant::Now() - t1;
// Verification
sampler.VerifyComputedValueVsReference(value);
- Cerr << "WideLastCombiner graph runtime is: " << t2 - t1 << " vs. reference C++ implementation: " << cppTime << "" << Endl << Endl;
+ printout.SubmitTimings(graphTime, cppTime, devnullTime);
}
-template void RunTestCombineLastSimple<false, false>(const TRunParams& params);
-template void RunTestCombineLastSimple<false, true>(const TRunParams& params);
-template void RunTestCombineLastSimple<true, false>(const TRunParams& params);
-template void RunTestCombineLastSimple<true, true>(const TRunParams& params);
+template void RunTestCombineLastSimple<false, false>(const TRunParams& params, TTestResultCollector& printout);
+template void RunTestCombineLastSimple<false, true>(const TRunParams& params, TTestResultCollector& printout);
+template void RunTestCombineLastSimple<true, false>(const TRunParams& params, TTestResultCollector& printout);
+template void RunTestCombineLastSimple<true, true>(const TRunParams& params, TTestResultCollector& printout);
}
}
diff --git a/ydb/core/kqp/tools/combiner_perf/simple_last.h b/ydb/core/kqp/tools/combiner_perf/simple_last.h
index a630fd82c5..90433f8141 100644
--- a/ydb/core/kqp/tools/combiner_perf/simple_last.h
+++ b/ydb/core/kqp/tools/combiner_perf/simple_last.h
@@ -1,12 +1,13 @@
#pragma once
#include "run_params.h"
+#include "printout.h"
namespace NKikimr {
namespace NMiniKQL {
template<bool LLVM, bool Spilling>
-void RunTestCombineLastSimple(const TRunParams& params);
+void RunTestCombineLastSimple(const TRunParams& params, TTestResultCollector& printout);
}
}
diff --git a/ydb/core/kqp/tools/combiner_perf/streams.cpp b/ydb/core/kqp/tools/combiner_perf/streams.cpp
new file mode 100644
index 0000000000..7f0a662250
--- /dev/null
+++ b/ydb/core/kqp/tools/combiner_perf/streams.cpp
@@ -0,0 +1,48 @@
+#include "streams.h"
+
+namespace NKikimr {
+namespace NMiniKQL {
+
+T6464Samples MakeKeyed6464Samples(const size_t numSamples, const unsigned int maxKey) {
+ std::default_random_engine eng;
+ std::uniform_int_distribution<unsigned int> keys(0, maxKey);
+ std::uniform_int_distribution<uint64_t> unif(0, 100000.0);
+
+ T6464Samples samples(numSamples);
+
+ eng.seed(std::time(nullptr));
+ std::generate(samples.begin(), samples.end(),
+ [&]() -> auto {
+ return std::make_pair<uint64_t, uint64_t>(keys(eng), unif(eng));
+ }
+ );
+
+ return samples;
+}
+
+TString64Samples MakeKeyedString64Samples(const size_t numSamples, const unsigned int maxKey, const bool longStrings) {
+ std::default_random_engine eng;
+ std::uniform_int_distribution<unsigned int> keys(0, maxKey);
+ std::uniform_int_distribution<uint64_t> unif(0, 100000.0);
+
+ TString64Samples samples(numSamples);
+
+ eng.seed(std::time(nullptr));
+ std::generate(samples.begin(), samples.end(),
+ [&]() -> auto {
+ auto key = keys(eng);
+ std::string strKey;
+ if (!longStrings) {
+ strKey = std::string(ToString(key));
+ } else {
+ strKey = Sprintf("%07u.%07u.%07u.", key, key, key);
+ }
+ return std::make_pair<std::string, uint64_t>(std::move(strKey), unif(eng));
+ }
+ );
+
+ return samples;
+}
+
+}
+} \ No newline at end of file
diff --git a/ydb/core/kqp/tools/combiner_perf/streams.h b/ydb/core/kqp/tools/combiner_perf/streams.h
new file mode 100644
index 0000000000..cb955849f9
--- /dev/null
+++ b/ydb/core/kqp/tools/combiner_perf/streams.h
@@ -0,0 +1,290 @@
+// Sample data generators and streams
+
+#pragma once
+
+#include "converters.h"
+
+#include <yql/essentials/minikql/comp_nodes/ut/mkql_computation_node_ut.h>
+#include <yql/essentials/minikql/computation/mkql_block_builder.h>
+
+namespace NKikimr {
+namespace NMiniKQL {
+
+using T6464Samples = std::vector<std::pair<ui64, ui64>>;
+T6464Samples MakeKeyed6464Samples(const size_t numSamples, const unsigned int maxKey);
+
+using TString64Samples = std::vector<std::pair<std::string, ui64>>;
+TString64Samples MakeKeyedString64Samples(const size_t numSamples, const unsigned int maxKey, const bool longStrings);
+
+struct IWideStream : public NUdf::TBoxedValue
+{
+ NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& result) final override {
+ Y_UNUSED(result);
+ ythrow yexception() << "only WideFetch is supported here";
+ }
+
+ NUdf::EFetchStatus WideFetch(NUdf::TUnboxedValue* result, ui32 width) override = 0;
+};
+
+template<typename K, typename V, bool EmbeddedKeys>
+struct TKVStream : public IWideStream {
+ using TSamples = std::vector<std::pair<K, V>>;
+
+ TKVStream(const THolderFactory& holderFactory, const TSamples& samples, size_t iterations)
+ : HolderFactory(holderFactory)
+ , Samples(samples)
+ , End(Samples.end())
+ , MaxIterations(iterations)
+ , CurrSample(Samples.begin())
+ {
+ Y_ENSURE(samples.size() > 0);
+ }
+
+private:
+ const THolderFactory& HolderFactory;
+ const TSamples& Samples;
+ const TSamples::const_iterator End;
+ const size_t MaxIterations;
+ TSamples::const_iterator CurrSample = 0;
+ size_t Iteration = 0;
+
+public:
+ NUdf::EFetchStatus WideFetch(NUdf::TUnboxedValue* result, ui32 width) final {
+ Y_UNUSED(HolderFactory); // for future use
+
+ if (CurrSample == End) {
+ ++Iteration;
+ if (Iteration >= MaxIterations) {
+ return NUdf::EFetchStatus::Finish;
+ }
+ CurrSample = Samples.begin();
+ }
+
+ if (width != 2) {
+ ythrow yexception() << "width 2 expected";
+ }
+
+ // TODO: support embedded strings in values?
+ NativeToUnboxed<EmbeddedKeys>(CurrSample->first, result[0]);
+ NativeToUnboxed<false>(CurrSample->second, result[1]);
+
+ ++CurrSample;
+
+ return NUdf::EFetchStatus::Ok;
+ }
+};
+
+template<typename K, typename V>
+std::unordered_map<K, V> ComputeSumReferenceResult(IWideStream& referenceStream)
+{
+ std::unordered_map<K, V> expects;
+ {
+ NUdf::TUnboxedValue inputs[2];
+
+ while (referenceStream.WideFetch(inputs, 2) == NUdf::EFetchStatus::Ok) {
+ expects[UnboxedToNative<K>(inputs[0])] += inputs[1].Get<V>();
+ }
+ }
+ return expects;
+}
+
+template<typename K, typename V>
+void VerifyListVsUnorderedMap(const NUdf::TUnboxedValue& pairList, const std::unordered_map<K, V>& map)
+{
+ UNIT_ASSERT_VALUES_EQUAL(pairList.GetListLength(), map.size());
+
+ const auto ptr = pairList.GetElements();
+ for (size_t i = 0ULL; i < map.size(); ++i) {
+ const auto elements = ptr[i].GetElements();
+ const auto key = UnboxedToNative<K>(elements[0]);
+ const auto value = UnboxedToNative<V>(elements[1]);
+ const auto ii = map.find(key);
+ UNIT_ASSERT(ii != map.end());
+ UNIT_ASSERT_VALUES_EQUAL(value, ii->second);
+ }
+}
+
+class IDataSampler
+{
+ virtual THolder<IWideStream> MakeStream(const THolderFactory& holderFactory) const = 0;
+ virtual TType* GetKeyType(TProgramBuilder& pb) const = 0;
+ virtual void ComputeReferenceResult(IWideStream& referenceStream) = 0;
+ virtual void VerifyComputedValueVsReference(const NUdf::TUnboxedValue& value) const = 0;
+ virtual std::string Describe() const = 0;
+};
+
+class T6464DataSampler : public IDataSampler
+{
+public:
+ TKVStream<ui64, ui64, false>::TSamples Samples;
+ std::unordered_map<ui64, ui64> Expects;
+
+ size_t StreamNumIters = 0;
+
+ T6464DataSampler(size_t numSamples, size_t maxKey, size_t numIters)
+ : Samples(MakeKeyed6464Samples(numSamples, maxKey))
+ , StreamNumIters(numIters)
+ {
+ }
+
+ THolder<IWideStream> MakeStream(const THolderFactory& holderFactory) const override
+ {
+ return THolder(new TKVStream<ui64, ui64, false>(holderFactory, Samples, StreamNumIters));
+ }
+
+ TType* GetKeyType(TProgramBuilder& pb) const override
+ {
+ return pb.NewDataType(NUdf::TDataType<ui64>::Id);
+ }
+
+ void ComputeReferenceResult(IWideStream& referenceStream) override
+ {
+ Y_ENSURE(Expects.empty());
+
+ Expects = ComputeSumReferenceResult<ui64, ui64>(referenceStream);
+ }
+
+ void VerifyComputedValueVsReference(const NUdf::TUnboxedValue& value) const override
+ {
+ VerifyListVsUnorderedMap(value, Expects);
+ }
+
+ std::string Describe() const override
+ {
+ return "ui64 keys, ui64 values";
+ }
+};
+
+class TString64DataSampler : public IDataSampler
+{
+public:
+ TKVStream<std::string, ui64, false>::TSamples Samples;
+ std::unordered_map<std::string, ui64> Expects;
+
+ size_t StreamNumIters = 0;
+ bool LongStrings = false;
+
+ TString64DataSampler(size_t numSamples, size_t maxKey, size_t numIters, bool longStrings)
+ : Samples(MakeKeyedString64Samples(numSamples, maxKey, longStrings))
+ , StreamNumIters(numIters)
+ , LongStrings(longStrings)
+ {
+ }
+
+ THolder<IWideStream> MakeStream(const THolderFactory& holderFactory) const override
+ {
+ if (LongStrings) {
+ return THolder(new TKVStream<std::string, ui64, false>(holderFactory, Samples, StreamNumIters));
+ } else {
+ return THolder(new TKVStream<std::string, ui64, true>(holderFactory, Samples, StreamNumIters));
+ }
+ }
+
+ TType* GetKeyType(TProgramBuilder& pb) const override
+ {
+ return pb.NewDataType(NUdf::TDataType<char*>::Id);
+ }
+
+ void ComputeReferenceResult(IWideStream& referenceStream) override
+ {
+ Y_ENSURE(Expects.empty());
+
+ Expects = ComputeSumReferenceResult<std::string, ui64>(referenceStream);
+ }
+
+ void VerifyComputedValueVsReference(const NUdf::TUnboxedValue& value) const override
+ {
+ VerifyListVsUnorderedMap(value, Expects);
+ }
+
+ std::string Describe() const override
+ {
+ return Sprintf("%s string keys, ui64 values", LongStrings ? "Long (24 byte)" : "Embedded");
+ }
+};
+
+
+template<typename K, typename V>
+struct TBlockKVStream : public IWideStream {
+ using TSamples = std::vector<std::pair<K, V>>;
+
+ TBlockKVStream(TComputationContext& ctx, const TSamples& samples, size_t iterations, size_t blockSize, const std::vector<TType*> types)
+ : Context(ctx)
+ , Samples(samples)
+ , End(Samples.end())
+ , MaxIterations(iterations)
+ , CurrSample(Samples.begin())
+ , BlockSize(blockSize)
+ , Types(types)
+ {
+ Y_ENSURE(samples.size() > 0);
+ }
+
+private:
+ TComputationContext& Context;
+ const TSamples& Samples;
+ const TSamples::const_iterator End;
+ const size_t MaxIterations;
+ TSamples::const_iterator CurrSample;
+ size_t Iteration = 0;
+ size_t BlockSize = 1;
+ const std::vector<TType*> Types;
+
+ const TSamples::value_type* NextSample() {
+ if (CurrSample == End) {
+ ++Iteration;
+ if (Iteration >= MaxIterations) {
+ return nullptr;
+ }
+ CurrSample = Samples.begin();
+ }
+ return CurrSample++;
+ }
+
+ bool IsAtTheEnd() const {
+ return (CurrSample == End) && (Iteration >= MaxIterations);
+ }
+
+public:
+ NUdf::EFetchStatus WideFetch(NUdf::TUnboxedValue* result, ui32 width) final override {
+ if (width != 3) {
+ ythrow yexception() << "width 3 expected";
+ }
+
+ TVector<std::unique_ptr<IArrayBuilder>> builders;
+ std::transform(Types.cbegin(), Types.cend(), std::back_inserter(builders),
+ [&](const auto& type) {
+ return MakeArrayBuilder(TTypeInfoHelper(), type, Context.ArrowMemoryPool, BlockSize, &Context.Builder->GetPgBuilder());
+ });
+
+ size_t count = 0;
+ for (; count < BlockSize; ++count) {
+ const auto nextTuple = NextSample();
+ if (!nextTuple) {
+ break;
+ }
+
+ NUdf::TUnboxedValue key;
+ NativeToUnboxed<false>(nextTuple->first, key);
+ builders[0]->Add(key);
+
+ NUdf::TUnboxedValue value;
+ NativeToUnboxed<false>(nextTuple->second, value);
+ builders[1]->Add(value);
+ }
+
+ if (count > 0) {
+ const bool finish = IsAtTheEnd();
+ result[0] = Context.HolderFactory.CreateArrowBlock(builders[0]->Build(finish));
+ result[1] = Context.HolderFactory.CreateArrowBlock(builders[1]->Build(finish));
+ result[2] = Context.HolderFactory.CreateArrowBlock(arrow::Datum(static_cast<uint64_t>(count)));
+
+ return NUdf::EFetchStatus::Ok;
+ } else {
+ return NUdf::EFetchStatus::Finish;
+ }
+ }
+};
+
+}
+}
diff --git a/ydb/core/kqp/tools/combiner_perf/ya.make b/ydb/core/kqp/tools/combiner_perf/ya.make
index 567a9ccf29..d2e15dca53 100644
--- a/ydb/core/kqp/tools/combiner_perf/ya.make
+++ b/ydb/core/kqp/tools/combiner_perf/ya.make
@@ -41,12 +41,13 @@ CFLAGS(
ENDIF()
SRCS(
- factories.cpp
converters.cpp
+ factories.cpp
simple.cpp
+ simple_block.cpp
simple_last.cpp
+ streams.cpp
tpch_last.cpp
- simple_block.cpp
)
END()