diff options
author | Pavel Zuev <pzuev@ydb.tech> | 2025-05-05 16:47:44 +0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-05-05 11:47:44 +0000 |
commit | 8e93406a08599cd346ecbe48e9034fae0fff2e7f (patch) | |
tree | 1c2f3d43f961c6b4c422a80b3c55a945ec26f60c | |
parent | a8596d8b407dadf9e47f664cfd242b9a90f10967 (diff) | |
download | ydb-8e93406a08599cd346ecbe48e9034fae0fff2e7f.tar.gz |
Support for selectable hashmap implementation in the combiner_perf (#18010)
-rwxr-xr-x | ydb/core/kqp/tools/combiner_perf/bin/format_markdown.py | 75 | ||||
-rw-r--r-- | ydb/core/kqp/tools/combiner_perf/bin/main.cpp | 44 | ||||
-rw-r--r-- | ydb/core/kqp/tools/combiner_perf/converters.h | 6 | ||||
-rw-r--r-- | ydb/core/kqp/tools/combiner_perf/hashmaps.h | 136 | ||||
-rw-r--r-- | ydb/core/kqp/tools/combiner_perf/printout.cpp | 39 | ||||
-rw-r--r-- | ydb/core/kqp/tools/combiner_perf/printout.h | 8 | ||||
-rw-r--r-- | ydb/core/kqp/tools/combiner_perf/run_params.h | 10 | ||||
-rw-r--r-- | ydb/core/kqp/tools/combiner_perf/simple.cpp | 14 | ||||
-rw-r--r-- | ydb/core/kqp/tools/combiner_perf/simple_block.cpp | 16 | ||||
-rw-r--r-- | ydb/core/kqp/tools/combiner_perf/simple_last.cpp | 19 | ||||
-rw-r--r-- | ydb/core/kqp/tools/combiner_perf/streams.cpp | 199 | ||||
-rw-r--r-- | ydb/core/kqp/tools/combiner_perf/streams.h | 126 | ||||
-rw-r--r-- | ydb/core/kqp/tools/combiner_perf/value_wrapper.h | 59 |
13 files changed, 585 insertions, 166 deletions
diff --git a/ydb/core/kqp/tools/combiner_perf/bin/format_markdown.py b/ydb/core/kqp/tools/combiner_perf/bin/format_markdown.py index 586e87a24b8..cc11128c36d 100755 --- a/ydb/core/kqp/tools/combiner_perf/bin/format_markdown.py +++ b/ydb/core/kqp/tools/combiner_perf/bin/format_markdown.py @@ -38,18 +38,14 @@ def gen_chart(ref_time, graph_time, llvm_time = None): out = '' out += draw_line(ref_time, None, 'C++', None) - if ref_time == 0: - hue = None - else: - hue = 120 - int(120.0 * ((graph_time / ref_time) - 0.5) / 1.5) # Map [0.5, 2] -> [120, 0] - hue = max(0, min(hue, 120)) # clamp to the [0, 120] range; hue 0 = red, hue 120 = green - def add_colored_bar(ref_value, result_value, title): + hue = None + comment = None if result_value: shame_ratio = result_value / ref_value comment = 'x %.1f' % shame_ratio - else: - comment = None + hue = 120 - int(120.0 * (shame_ratio - 0.5) / 1.5) # Map [0.5, 2] -> [120, 0] + hue = max(0, min(hue, 120)) # clamp to the [0, 120] range; hue 0 = red, hue 120 = green return draw_line(result_value, hue, title, comment) out += add_colored_bar(ref_time, graph_time, 'Graph') @@ -87,16 +83,22 @@ def format_time(ms): return '%.2f' % (ms / 1000.0) def format_mem(bytez): + if bytez is None: + return ' ' return '%.1f' % (bytez / (1024.0 * 1024.0)) def do_merge_llvm(samples): - sorted_samples = sorted(samples, key=lambda sample: sample.get('llvm', False)) output_samples = [] + for sample in samples: + if not sample.get('llvm', False): + output_samples.append(sample) + + sorted_samples = sorted(samples, key=lambda sample: sample.get('llvm', False)) index = {} for sample in sorted_samples: is_llvm = sample.get('llvm', False) - key = (sample['testName'], sample['numKeys'], sample_rows(sample), sample.get('spilling', False), sample.get('blockSize', 0), sample.get('combinerMemLimit', 0)) + key = (sample['testName'], sample['numKeys'], sample_rows(sample), sample.get('spilling', False), sample.get('blockSize', 0), sample.get('combinerMemLimit', 0), sample['hashType']) if key in index and not is_llvm: raise Exception('Attempted to merge two non-LLVM result samples, key = %s' % repr(key)) if key not in index and is_llvm: @@ -109,10 +111,48 @@ def do_merge_llvm(samples): index[key]['llvmCleanTime'] = result_time_or_zero else: index[key] = sample - output_samples.append(sample) return output_samples +def samples_sorted_by_section(samples): + sort_orders = { + 'testName': {}, + 'totalRows': {}, + 'spilling': {}, + 'blockSize': {}, + 'combinerMemLimit': {}, + 'hashType': {}, + 'numKeys': {}, + } + + for sample in samples: + for key in sort_orders.keys(): + if key in sample: + so = sort_orders[key] + value = sample[key] + if value not in so: + so[value] = len(so) + + def sort_order(sample): + result = [] + for key in sort_orders.keys(): + if key not in sample: + result.append(-1) + continue + result.append(sort_orders[key][sample[key]]) + return result + + return sorted(samples, key=sort_order) + + +def friendly_hash_name(hash_type): + types = { + 'std': 'std::unordered_map', + 'absl': 'absl::flat_hash_map', + } + return types[hash_type] + + def do_format(merge_llvm): per_section = collections.defaultdict(list) @@ -122,13 +162,17 @@ def do_format(merge_llvm): if not line: continue sample = json.loads(line) + sample['totalRows'] = sample_rows(sample) + sample['hashType'] = sample.get('hashType', 'std') all_samples.append(sample) + all_samples = samples_sorted_by_section(all_samples) + if merge_llvm: all_samples = do_merge_llvm(all_samples) for sample in all_samples: - section_name = (sample['testName'], sample_rows(sample), sample.get('llvm', False), sample.get('spilling', False), sample.get('blockSize', 0), sample.get('combinerMemLimit', 0)) + section_name = (sample['testName'], sample_rows(sample), sample.get('llvm', False), sample.get('spilling', False), sample.get('blockSize', 0), sample.get('combinerMemLimit', 0), sample['hashType']) per_section[section_name].append(sample) for _, samples in per_section.items(): @@ -150,6 +194,8 @@ def do_format(merge_llvm): memlimit_formatted = format_mem(memlimit) traits.append(f'{memlimit_formatted} MB RAM limit') traits.append(f'{num_rows_formatted} input rows') + hash_name = friendly_hash_name(samples[0]['hashType']) + traits.append(f'{hash_name}') traits_str = ', '.join(traits) own_times = [] @@ -173,7 +219,7 @@ def do_format(merge_llvm): ] headers += [ 'MaxRSS delta, MB', - 'Bytes per key', + 'Reference MaxRSS delta, MB', ] print(''.join(['<th>%s</th>' % item for item in headers]) + '\n') @@ -201,9 +247,8 @@ def do_format(merge_llvm): if has_llvm_column: cols.append(format_time(llvm_time_or_zero)) cols.append(format_mem(sample['maxRssDelta'])) - bytes_per_key = sample['maxRssDelta'] // sample['numKeys'] + cols.append(format_mem(sample.get('referenceMaxRssDelta', None))) - cols.append(str(bytes_per_key) if 0 < bytes_per_key < 10000 else ' ') print('<tr>' + ''.join(['<td>%s</td>' % col for col in cols]) + '</tr>\n') print('</table>\n:::\n') diff --git a/ydb/core/kqp/tools/combiner_perf/bin/main.cpp b/ydb/core/kqp/tools/combiner_perf/bin/main.cpp index 6871932de52..28a6df0c41b 100644 --- a/ydb/core/kqp/tools/combiner_perf/bin/main.cpp +++ b/ydb/core/kqp/tools/combiner_perf/bin/main.cpp @@ -19,6 +19,20 @@ using NKikimr::NMiniKQL::TRunParams; +TStringBuf HashMapTypeName(NKikimr::NMiniKQL::EHashMapImpl implType) +{ + switch (implType) { + case NKikimr::NMiniKQL::EHashMapImpl::UnorderedMap: + return "std"; + case NKikimr::NMiniKQL::EHashMapImpl::Absl: + return "absl"; + case NKikimr::NMiniKQL::EHashMapImpl::YqlRobinHood: + return "robinhood"; + default: + ythrow yexception() << "Unknown hashmap impl type"; + } +} + class TPrintingResultCollector : public TTestResultCollector { public: @@ -38,6 +52,7 @@ public: Cout << "Block size: " << runParams.BlockSize << Endl; Cout << "Long strings: " << (runParams.LongStringKeys ? "yes" : "no") << Endl; Cout << "Combiner mem limit: " << runParams.WideCombinerMemLimit << Endl; + Cout << "Hash map type: " << HashMapTypeName(runParams.ReferenceHashType) << Endl; Cout << Endl; Cout << "Graph runtime is: " << result.ResultTime << " vs. reference C++ implementation: " << result.ReferenceTime << Endl; @@ -81,6 +96,7 @@ public: out["longStringKeys"] = runParams.LongStringKeys; out["numKeys"] = runParams.NumKeys; out["combinerMemLimit"] = runParams.WideCombinerMemLimit; + out["hashType"] = HashMapTypeName(runParams.ReferenceHashType); out["generatorTime"] = result.GeneratorTime.MilliSeconds(); out["resultTime"] = result.ResultTime.MilliSeconds(); @@ -99,9 +115,9 @@ void DoFullPass(TRunParams runParams, bool withSpilling) TJsonResultCollector printout; - // const std::vector<size_t> numKeys = {4u, 1000u, 100'000u, 1'000'000u, 10'000'000, 30'000'000}; - // const std::vector<size_t> numKeys = {60'000'000, 120'000'000}; - const std::vector<size_t> numKeys = {1'000'000u}; + const std::vector<size_t> numKeys = {4u, 1000u, 100'000u, 1'000'000u, 10'000'000}; + //const std::vector<size_t> numKeys = {60'000'000, 120'000'000}; + //const std::vector<size_t> numKeys = {30'000'000u}; const std::vector<size_t> blockSizes = {128u, 8192u}; auto doSimple = [&printout, numKeys](const TRunParams& params) { @@ -244,6 +260,23 @@ int main(int argc, const char* argv[]) }) .Help("Input data type: string key -> ui64 numeric value or ui64 numeric key -> ui64 numeric value"); + options.AddLongOption("hashmap") + .Choices({"std", "absl", "robinhood"}) + .RequiredArgument() + .Handler1([&](const NLastGetopt::TOptsParser* option) { + auto val = TStringBuf(option->CurVal()); + if (val == "std") { + runParams.ReferenceHashType = NKikimr::NMiniKQL::EHashMapImpl::UnorderedMap; + } else if (val == "absl") { + runParams.ReferenceHashType = NKikimr::NMiniKQL::EHashMapImpl::Absl; + } else if (val == "robinhood") { + runParams.ReferenceHashType = NKikimr::NMiniKQL::EHashMapImpl::YqlRobinHood; + } else { + ythrow yexception() << "Unimplemented hash map type: " << val; + } + }) + .Help("Hash map type (std::unordered_map or absl::dense_hash_map)"); + options .AddLongOption('t', "test") .Choices({"combiner", "last-combiner", "block-combiner"}) @@ -262,6 +295,10 @@ int main(int argc, const char* argv[]) }) .Help("Enable single test run mode"); + options.AddLongOption("no-verify").NoArgument().Handler0([&](){ + runParams.EnableVerification = false; + }); + options .AddLongOption('m', "mode") .Choices({"gen", "ref", "graph", "all"}) @@ -305,6 +342,7 @@ int main(int argc, const char* argv[]) if (testType != ETestType::All) { DoSelectedTest(runParams, testType, llvm, spilling); } else { + runParams.AlwaysSubprocess = true; DoFullPass(runParams, spilling); } diff --git a/ydb/core/kqp/tools/combiner_perf/converters.h b/ydb/core/kqp/tools/combiner_perf/converters.h index 76a76062ffd..1eefb65f64c 100644 --- a/ydb/core/kqp/tools/combiner_perf/converters.h +++ b/ydb/core/kqp/tools/combiner_perf/converters.h @@ -13,16 +13,16 @@ namespace NKikimr { namespace NMiniKQL { template<bool Embedded> -void NativeToUnboxed(const ui64 value, NUdf::TUnboxedValue& result) +void NativeToUnboxed(const ui64 value, NUdf::TUnboxedValuePod& result) { result = NUdf::TUnboxedValuePod(value); } template<bool Embedded> -void NativeToUnboxed(const std::string& value, NUdf::TUnboxedValue& result) +void NativeToUnboxed(const std::string& value, NUdf::TUnboxedValuePod& result) { if constexpr (Embedded) { - result = NUdf::TUnboxedValue::Embedded(value); + result = NUdf::TUnboxedValuePod::Embedded(value); } else { result = NUdf::TUnboxedValuePod(NUdf::TStringValue(value)); } diff --git a/ydb/core/kqp/tools/combiner_perf/hashmaps.h b/ydb/core/kqp/tools/combiner_perf/hashmaps.h new file mode 100644 index 00000000000..255cf2470fc --- /dev/null +++ b/ydb/core/kqp/tools/combiner_perf/hashmaps.h @@ -0,0 +1,136 @@ +// Hash map implementation wrappers + +#pragma once + +#include <yql/essentials/minikql/comp_nodes/mkql_rh_hash.h> +#include <yql/essentials/public/udf/udf_value.h> + +#include <library/cpp/containers/absl_flat_hash/flat_hash_map.h> + +#include <unordered_map> + +namespace NKikimr { +namespace NMiniKQL { + +template<typename K, typename V> +struct TUnorderedMapImpl +{ + using TValueType = V; + using TMapType = std::unordered_map<K, V>; + constexpr static bool CustomOps = false; +}; + +template<typename K, typename V> +struct TAbslMapImpl +{ + using TValueType = V; + using TMapType = absl::flat_hash_map<K, V>; + constexpr static bool CustomOps = false; +}; + +template<typename K, typename V, typename TEqualTo = std::equal_to<K>, typename THash = std::hash<K>> +struct TRobinHoodMapImplBase +{ + // Warning: this implementation leaks long strings because it can't call destructors. + // Also it moves keys and values by simply copying bytes so take care. + using TValueType = V; + using TMapType = TRobinHoodHashFixedMap<K, V, TEqualTo, THash>; + constexpr static bool CustomOps = true; + + static void AggregateByKey(TMapType& map, const K& key, const V& delta) + { + bool isNew = false; + auto ptr = map.Insert(key, isNew); + if (isNew) { + *(V*)map.GetMutablePayload(ptr) = delta; + map.CheckGrow(); + } else { + *(V*)map.GetMutablePayload(ptr) += delta; + } + } + + template<typename Callback> + static void IteratePairs(const TMapType& map, Callback&& callback) + { + // TODO: GetPayload and IsValid should be const + for (const char* iter = map.Begin(); iter != map.End(); map.Advance(iter)) { + if (!const_cast<TMapType&>(map).IsValid(iter)) { + continue; + } + const auto& key = map.GetKey(iter); + const auto& value = *(V*)(const_cast<TMapType&>(map)).GetPayload(iter); + callback(key, value); + } + } + + static size_t Size(const TMapType& map) + { + return map.GetSize(); + } +}; + +template<typename K, typename V> +struct TRobinHoodMapImpl: public TRobinHoodMapImplBase<K, V> +{ +}; + +template<typename V> +struct TRobinHoodMapImpl<std::string, V> +{ + using TValueType = V; + + struct TEqualTo + { + bool operator()(const NYql::NUdf::TUnboxedValuePod& lhs, const NYql::NUdf::TUnboxedValuePod& rhs) { + return lhs.AsStringRef() == rhs.AsStringRef(); + } + }; + + struct THash + { + absl::Hash<std::string_view> AbslHash; + + size_t operator()(const NYql::NUdf::TUnboxedValuePod& val) { + auto result = AbslHash(val.AsStringRef()); + return result; + } + }; + + using TBase = TRobinHoodMapImplBase<std::string, V>; + using TRealBase = TRobinHoodMapImplBase<NYql::NUdf::TUnboxedValuePod, V, TEqualTo, THash>; + using TMapType = TRealBase::TMapType; + + constexpr static bool CustomOps = true; + + static void AggregateByKey(TMapType& map, const std::string& key, const V& delta) + { + NYql::NUdf::TUnboxedValuePod ub = NYql::NUdf::TUnboxedValuePod::Embedded(NYql::NUdf::TStringRef(key)); + TRealBase::AggregateByKey(map, ub, delta); + } + + template<typename Callback> + static void IteratePairs(const TMapType& map, Callback&& callback) + { + TRealBase::IteratePairs(map, [callback](const NYql::NUdf::TUnboxedValuePod& k, const V& v) { + callback(std::string(k.AsStringRef()), v); + }); + } + + static size_t Size(const TMapType& map) + { + return TRealBase::Size(map); + } +}; + +template<typename TMapImpl> +bool MapEmpty(const typename TMapImpl::TMapType& map) +{ + if constexpr (TMapImpl::CustomOps) { + return TMapImpl::Size(map) == 0; + } else { + return map.empty(); + } +} + +} +}
\ No newline at end of file diff --git a/ydb/core/kqp/tools/combiner_perf/printout.cpp b/ydb/core/kqp/tools/combiner_perf/printout.cpp index 2c4524f8afc..137a3bea441 100644 --- a/ydb/core/kqp/tools/combiner_perf/printout.cpp +++ b/ydb/core/kqp/tools/combiner_perf/printout.cpp @@ -2,6 +2,10 @@ #include <sys/resource.h> +#if defined (_darwin_) +#include <mach/mach.h> +#endif + long GetMaxRSS() { rusage usage; @@ -16,7 +20,7 @@ long GetMaxRSSDelta(const long prevMaxRss) long maxRss = GetMaxRSS(); if (maxRss <= 0 || prevMaxRss <= 0 || maxRss < prevMaxRss) { - ythrow yexception() << "Bad maxRSS measurement, before: " << prevMaxRss << ", after: " << maxRss << Endl; + ythrow yexception() << "Bad maxRSS measurement, before: " << prevMaxRss << ", after: " << maxRss << Endl; } #if defined (_darwin_) @@ -28,6 +32,39 @@ long GetMaxRSSDelta(const long prevMaxRss) return (maxRss - prevMaxRss) * factor; } +TDuration GetThreadCPUTime() +{ + ui64 us = 0; + +#if defined (_darwin_) + thread_basic_info_data_t info = {}; + mach_msg_type_number_t info_count = THREAD_BASIC_INFO_COUNT; + kern_return_t kern_err = thread_info( + mach_thread_self(), + THREAD_BASIC_INFO, + (thread_info_t)&info, + &info_count + ); + if (kern_err == KERN_SUCCESS) { + us = 1000000ULL * info.user_time.seconds + info.user_time.microseconds + 1000000ULL*info.system_time.seconds + info.system_time.microseconds; + } +#else + us = ThreadCPUTime(); +#endif + + if (!us) { + ythrow yexception() << "Failed to obtain thread CPU time"; + } + + return TDuration::MicroSeconds(us); +} + +TDuration GetThreadCPUTimeDelta(const TDuration startTime) +{ + TDuration current = GetThreadCPUTime(); + return current - startTime; +} + void MergeRunResults(const TRunResult& src, TRunResult& dst) { dst.ResultTime = Min(src.ResultTime, dst.ResultTime); diff --git a/ydb/core/kqp/tools/combiner_perf/printout.h b/ydb/core/kqp/tools/combiner_perf/printout.h index 1cb02b9a3a9..8c700bd6e91 100644 --- a/ydb/core/kqp/tools/combiner_perf/printout.h +++ b/ydb/core/kqp/tools/combiner_perf/printout.h @@ -40,6 +40,10 @@ public: }; // ru_maxrss from rusage, converted to bytes -long GetMaxRSS(); +long Y_NO_INLINE GetMaxRSS(); -long GetMaxRSSDelta(const long prevMaxRss);
\ No newline at end of file +long Y_NO_INLINE GetMaxRSSDelta(const long prevMaxRss); + +TDuration Y_NO_INLINE GetThreadCPUTime(); + +TDuration Y_NO_INLINE GetThreadCPUTimeDelta(const TDuration startTime);
\ No newline at end of file diff --git a/ydb/core/kqp/tools/combiner_perf/run_params.h b/ydb/core/kqp/tools/combiner_perf/run_params.h index 406977b1a19..9f6b8a5161a 100644 --- a/ydb/core/kqp/tools/combiner_perf/run_params.h +++ b/ydb/core/kqp/tools/combiner_perf/run_params.h @@ -18,9 +18,16 @@ enum class ESamplerType { UI64KeysUI64Values, }; +enum class EHashMapImpl { + UnorderedMap, + Absl, + YqlRobinHood, +}; + struct TRunParams { ETestMode TestMode = ETestMode::Full; ESamplerType SamplerType = ESamplerType::StringKeysUI64Values; + EHashMapImpl ReferenceHashType = EHashMapImpl::UnorderedMap; std::optional<ui64> RandomSeed; int NumAttempts = 0; size_t RowsPerRun = 0; @@ -28,8 +35,11 @@ struct TRunParams { size_t NumKeys = 0; // for numeric keys, the range is [0..NumKeys-1] size_t BlockSize = 0; size_t WideCombinerMemLimit = 0; + size_t NumAggregations = 1; bool LongStringKeys = false; bool MeasureReferenceMemory = false; + bool AlwaysSubprocess = false; + bool EnableVerification = true; }; } diff --git a/ydb/core/kqp/tools/combiner_perf/simple.cpp b/ydb/core/kqp/tools/combiner_perf/simple.cpp index 435df4bbbc8..024a7fc9bca 100644 --- a/ydb/core/kqp/tools/combiner_perf/simple.cpp +++ b/ydb/core/kqp/tools/combiner_perf/simple.cpp @@ -63,11 +63,11 @@ TRunResult RunTestOverGraph(const TRunParams& params, const bool needsVerificati // Compute node implementation Cerr << "Compute graph result" << Endl; - const auto graphTimeStart = TInstant::Now(); + const auto graphTimeStart = GetThreadCPUTime(); size_t lineCount = CountWideStreamOutputs<2>(computeGraphPtr->GetValue()); Cerr << lineCount << Endl; - return TInstant::Now() - graphTimeStart; + return GetThreadCPUTimeDelta(graphTimeStart); }; auto measureRefTime = [&](auto& computeGraphPtr, IDataSampler& sampler) { @@ -75,10 +75,10 @@ TRunResult RunTestOverGraph(const TRunParams& params, const bool needsVerificati Cerr << "Compute reference result" << Endl; auto referenceStream = sampler.MakeStream(computeGraphPtr->GetHolderFactory()); - const auto cppTimeStart = TInstant::Now(); + const auto cppTimeStart = GetThreadCPUTime(); sampler.ComputeReferenceResult(*referenceStream); - return TInstant::Now() - cppTimeStart; + return GetThreadCPUTimeDelta(cppTimeStart); }; auto graphRun1 = BuildGraph(setup, *sampler, params.WideCombinerMemLimit); @@ -106,13 +106,13 @@ TRunResult RunTestOverGraph(const TRunParams& params, const bool needsVerificati // Measure the input stream run time Cerr << "Generator run" << Endl; const auto devnullStream = sampler->MakeStream(graphRun1->GetHolderFactory()); - const auto devnullStart = TInstant::Now(); + const auto devnullStart = GetThreadCPUTime(); { NUdf::TUnboxedValue columns[2]; while (devnullStream->WideFetch(columns, 2) == NUdf::EFetchStatus::Ok) { } } - result.GeneratorTime = TInstant::Now() - devnullStart; + result.GeneratorTime = GetThreadCPUTimeDelta(devnullStart); if (params.TestMode == ETestMode::GeneratorOnly) { return result; @@ -142,7 +142,7 @@ void RunTestSimple(const TRunParams& params, TTestResultCollector& printout) Cerr << "======== " << __func__ << ", keys: " << params.NumKeys << ", llvm: " << LLVM << ", mem limit: " << params.WideCombinerMemLimit << Endl; - if (params.NumAttempts <= 1 && !params.MeasureReferenceMemory) { + if (params.NumAttempts <= 1 && !params.MeasureReferenceMemory && !params.AlwaysSubprocess) { finalResult = RunTestOverGraph<LLVM>(params, true, false); } else { diff --git a/ydb/core/kqp/tools/combiner_perf/simple_block.cpp b/ydb/core/kqp/tools/combiner_perf/simple_block.cpp index be34720560d..fa1cb5b9079 100644 --- a/ydb/core/kqp/tools/combiner_perf/simple_block.cpp +++ b/ydb/core/kqp/tools/combiner_perf/simple_block.cpp @@ -70,16 +70,16 @@ TRunResult RunTestOverGraph(const TRunParams& params, const bool measureReferenc // Compute graph implementation auto stream = NUdf::TUnboxedValuePod(sampler->MakeStream(computeGraphPtr->GetContext()).Release()); computeGraphPtr->GetEntryPoint(0, true)->SetValue(computeGraphPtr->GetContext(), std::move(stream)); - const auto graphStart = TInstant::Now(); + const auto graphStart = GetThreadCPUTime(); resultList = computeGraphPtr->GetValue(); - return TInstant::Now() - graphStart; + return GetThreadCPUTimeDelta(graphStart); }; auto measureRefTime = [&](auto& computeGraphPtr) { // Reference implementation (sum via an std::unordered_map) - const auto cppStart = TInstant::Now(); + const auto cppStart = GetThreadCPUTime(); sampler->ComputeReferenceResult(computeGraphPtr->GetContext()); - return TInstant::Now() - cppStart; + return GetThreadCPUTimeDelta(cppStart); }; auto measureGeneratorTime = [&](auto& computeGraphPtr) { @@ -87,16 +87,16 @@ TRunResult RunTestOverGraph(const TRunParams& params, const bool measureReferenc const auto devnullStreamPtr = sampler->MakeStream(computeGraphPtr->GetContext()); auto& devnullStream = *devnullStreamPtr; size_t numBlocks = 0; - const auto timeStart = TInstant::Now(); + const auto timeStart = GetThreadCPUTime(); { NUdf::TUnboxedValue columns[3]; while (devnullStream.WideFetch(columns, 3) == NUdf::EFetchStatus::Ok) { ++numBlocks; } } - auto timeEnd = TInstant::Now(); + auto duration = GetThreadCPUTimeDelta(timeStart); Cerr << "Blocks generated: " << numBlocks << Endl; - return timeEnd - timeStart; + return duration; }; const auto graph = setup.BuildGraph(pgmReturn, {streamCallable}); @@ -153,7 +153,7 @@ void RunTestBlockCombineHashedSimple(const TRunParams& params, TTestResultCollec Cerr << "======== " << __func__ << ", keys: " << params.NumKeys << ", block size: " << params.BlockSize << ", llvm: " << LLVM << Endl; - if (params.NumAttempts <= 1 && !params.MeasureReferenceMemory) { + if (params.NumAttempts <= 1 && !params.MeasureReferenceMemory && !params.AlwaysSubprocess) { finalResult = RunTestOverGraph<LLVM, Spilling>(params, false); } else { for (int i = 1; i <= params.NumAttempts; ++i) { diff --git a/ydb/core/kqp/tools/combiner_perf/simple_last.cpp b/ydb/core/kqp/tools/combiner_perf/simple_last.cpp index 33e871b45ed..413b438194e 100644 --- a/ydb/core/kqp/tools/combiner_perf/simple_last.cpp +++ b/ydb/core/kqp/tools/combiner_perf/simple_last.cpp @@ -26,14 +26,13 @@ namespace { TDuration MeasureGeneratorTime(IComputationGraph& graph, const IDataSampler& sampler) { const auto devnullStream = sampler.MakeStream(graph.GetHolderFactory()); - const auto devnullStart = TInstant::Now(); + const auto devnullStart = GetThreadCPUTime(); { NUdf::TUnboxedValue columns[2]; while (devnullStream->WideFetch(columns, 2) == NUdf::EFetchStatus::Ok) { } } - const auto devnullTime = TInstant::Now() - devnullStart; - return devnullTime; + return GetThreadCPUTimeDelta(devnullStart); } template<bool LLVM, bool Spilling> @@ -90,11 +89,11 @@ TRunResult RunTestOverGraph(const TRunParams& params, const bool needsVerificati // Compute node implementation Cerr << "Compute graph result" << Endl; - const auto graphTimeStart = TInstant::Now(); + const auto graphTimeStart = GetThreadCPUTime(); size_t lineCount = CountWideStreamOutputs<2>(computeGraphPtr->GetValue()); Cerr << lineCount << Endl; - return TInstant::Now() - graphTimeStart; + return GetThreadCPUTimeDelta(graphTimeStart); }; auto measureRefTime = [&](auto& computeGraphPtr, IDataSampler& sampler) { @@ -102,10 +101,10 @@ TRunResult RunTestOverGraph(const TRunParams& params, const bool needsVerificati Cerr << "Compute reference result" << Endl; auto referenceStream = sampler.MakeStream(computeGraphPtr->GetHolderFactory()); - const auto cppTimeStart = TInstant::Now(); + const auto cppTimeStart = GetThreadCPUTime(); sampler.ComputeReferenceResult(*referenceStream); - return TInstant::Now() - cppTimeStart; + return GetThreadCPUTimeDelta(cppTimeStart); }; auto graphRun1 = BuildGraph(setup, spillerFactory, *sampler); @@ -177,14 +176,14 @@ void RunTestCombineLastSimple(const TRunParams& params, TTestResultCollector& pr Cerr << "======== " << __func__ << ", keys: " << params.NumKeys << ", llvm: " << LLVM << ", spilling: " << Spilling << Endl; - if (params.NumAttempts <= 1 && !params.MeasureReferenceMemory) { - finalResult = RunTestOverGraph<LLVM, Spilling>(params, true, false); + if (params.NumAttempts <= 1 && !params.MeasureReferenceMemory && !params.AlwaysSubprocess) { + finalResult = RunTestOverGraph<LLVM, Spilling>(params, params.EnableVerification, false); } else { for (int i = 1; i <= params.NumAttempts; ++i) { Cerr << "------ Run " << i << " of " << params.NumAttempts << Endl; - const bool needsVerification = (i == 1); + const bool needsVerification = (i == 1) && params.EnableVerification; TRunResult runResult = RunForked([&]() { return RunTestOverGraph<LLVM, Spilling>(params, needsVerification, false); }); diff --git a/ydb/core/kqp/tools/combiner_perf/streams.cpp b/ydb/core/kqp/tools/combiner_perf/streams.cpp index 25b0427d16e..8e4c9ab1068 100644 --- a/ydb/core/kqp/tools/combiner_perf/streams.cpp +++ b/ydb/core/kqp/tools/combiner_perf/streams.cpp @@ -61,66 +61,116 @@ TString64Samples MakeKeyedString64Samples(const ui64 seed, const size_t numSampl return samples; } +template<typename K, typename V, typename R, typename Next> +THolder<R> DispatchByMap(EHashMapImpl implType, Next&& next) +{ + if (implType == EHashMapImpl::Absl) { + return next(TAbslMapImpl<K, V>()); + } else if (implType == EHashMapImpl::YqlRobinHood) { + return next(TRobinHoodMapImpl<K, V>()); + } else { + return next(TUnorderedMapImpl<K, V>()); + } +} + THolder<IDataSampler> CreateWideSamplerFromParams(const TRunParams& params) { Y_ENSURE(params.RandomSeed.has_value()); switch (params.SamplerType) { - case ESamplerType::StringKeysUI64Values: - return MakeHolder<TString64DataSampler>(*params.RandomSeed, params.RowsPerRun, params.NumKeys - 1, params.NumRuns, params.LongStringKeys); - case ESamplerType::UI64KeysUI64Values: - return MakeHolder<T6464DataSampler>(*params.RandomSeed, params.RowsPerRun, params.NumKeys - 1, params.NumRuns); + case ESamplerType::StringKeysUI64Values: { + auto next = [&](auto&& impl) { + using MapImpl = std::decay_t<decltype(impl)>; + using SamplerType = TString64DataSampler<MapImpl, MapImpl::TValueType::ArrayWidth>; + return MakeHolder<SamplerType>(*params.RandomSeed, params.RowsPerRun, params.NumKeys - 1, params.NumRuns, params.LongStringKeys); + }; + if (params.NumAggregations == 1) { + return DispatchByMap<std::string, TValueWrapper<ui64, 1>, IDataSampler>(params.ReferenceHashType, next); + } else { + // TODO: support other NumAggregations values? + return DispatchByMap<std::string, TValueWrapper<ui64, 3>, IDataSampler>(params.ReferenceHashType, next); + } + } + case ESamplerType::UI64KeysUI64Values: { + auto next = [&](auto&& impl) { + using MapImpl = std::decay_t<decltype(impl)>; + using SamplerType = T6464DataSampler<MapImpl, MapImpl::TValueType::ArrayWidth>; + return MakeHolder<SamplerType>(*params.RandomSeed, params.RowsPerRun, params.NumKeys - 1, params.NumRuns); + }; + if (params.NumAggregations == 1) { + return DispatchByMap<ui64, TValueWrapper<ui64, 1>, IDataSampler>(params.ReferenceHashType, next); + } else { + // TODO: support other NumAggregations values? + return DispatchByMap<ui64, TValueWrapper<ui64, 3>, IDataSampler>(params.ReferenceHashType, next); + } + } } } -template<typename K> -void UpdateMapFromBlocks(const NUdf::TUnboxedValue& key, const NUdf::TUnboxedValue& value, std::unordered_map<K, ui64>& result); +template<typename TMapImpl, typename K> +struct TUpdateMapFromBlocks +{ + static void Update(const NUdf::TUnboxedValue& key, const NUdf::TUnboxedValue& value, typename TMapImpl::TMapType& result); +}; -template<> -void UpdateMapFromBlocks(const NUdf::TUnboxedValue& key, const NUdf::TUnboxedValue& value, std::unordered_map<ui64, ui64>& result) +template<typename TMapImpl> +struct TUpdateMapFromBlocks<TMapImpl, ui64> { - auto datumKey = TArrowBlock::From(key).GetDatum(); - auto datumValue = TArrowBlock::From(value).GetDatum(); - UNIT_ASSERT(datumKey.is_array()); - UNIT_ASSERT(datumValue.is_array()); - - const auto ui64keys = datumKey.template array_as<arrow::UInt64Array>(); - const auto ui64values = datumValue.template array_as<arrow::UInt64Array>(); - UNIT_ASSERT(!!ui64keys); - UNIT_ASSERT(!!ui64values); - UNIT_ASSERT_VALUES_EQUAL(ui64keys->length(), ui64values->length()); - - const size_t length = ui64keys->length(); - for (size_t i = 0; i < length; ++i) { - result[ui64keys->Value(i)] += ui64values->Value(i); + static void Update(const NUdf::TUnboxedValue& key, const NUdf::TUnboxedValue& value, typename TMapImpl::TMapType& result) + { + auto datumKey = TArrowBlock::From(key).GetDatum(); + auto datumValue = TArrowBlock::From(value).GetDatum(); + UNIT_ASSERT(datumKey.is_array()); + UNIT_ASSERT(datumValue.is_array()); + + const auto ui64keys = datumKey.template array_as<arrow::UInt64Array>(); + const auto ui64values = datumValue.template array_as<arrow::UInt64Array>(); + UNIT_ASSERT(!!ui64keys); + UNIT_ASSERT(!!ui64values); + UNIT_ASSERT_VALUES_EQUAL(ui64keys->length(), ui64values->length()); + + const size_t length = ui64keys->length(); + for (size_t i = 0; i < length; ++i) { + if constexpr (!TMapImpl::CustomOps) { + result[ui64keys->Value(i)] += ui64values->Value(i); + } else { + TMapImpl::AggregateByKey(result, ui64keys->Value(i), ui64values->Value(i)); + } + } } -} +}; -template<> -void UpdateMapFromBlocks(const NUdf::TUnboxedValue& key, const NUdf::TUnboxedValue& value, std::unordered_map<std::string, ui64>& result) +template<typename TMapImpl> +struct TUpdateMapFromBlocks<TMapImpl, std::string> { - auto datumKey = TArrowBlock::From(key).GetDatum(); - auto datumValue = TArrowBlock::From(value).GetDatum(); - UNIT_ASSERT(datumKey.is_arraylike()); - UNIT_ASSERT(datumValue.is_array()); - - const auto ui64values = datumValue.template array_as<arrow::UInt64Array>(); - UNIT_ASSERT(!!ui64values); - - int64_t valueOffset = 0; - - for (const auto& chunk : datumKey.chunks()) { - auto* barray = dynamic_cast<arrow::BinaryArray*>(chunk.get()); - UNIT_ASSERT(barray != nullptr); - for (int64_t i = 0; i < barray->length(); ++i) { - auto key = barray->GetString(i); - auto val = ui64values->Value(valueOffset); - result[key] += val; - ++valueOffset; + static void Update(const NUdf::TUnboxedValue& key, const NUdf::TUnboxedValue& value, typename TMapImpl::TMapType& result) + { + auto datumKey = TArrowBlock::From(key).GetDatum(); + auto datumValue = TArrowBlock::From(value).GetDatum(); + UNIT_ASSERT(datumKey.is_arraylike()); + UNIT_ASSERT(datumValue.is_array()); + + const auto ui64values = datumValue.template array_as<arrow::UInt64Array>(); + UNIT_ASSERT(!!ui64values); + + int64_t valueOffset = 0; + + for (const auto& chunk : datumKey.chunks()) { + auto* barray = dynamic_cast<arrow::BinaryArray*>(chunk.get()); + UNIT_ASSERT(barray != nullptr); + for (int64_t i = 0; i < barray->length(); ++i) { + auto key = barray->GetString(i); + auto val = ui64values->Value(valueOffset); + if constexpr (!TMapImpl::CustomOps) { + result[key] += val; + } else { + TMapImpl::AggregateByKey(result, key, val); + } + ++valueOffset; + } } } -} - +}; template<typename T> TType* GetVerySimpleDataType(const TTypeEnvironment& env) @@ -134,7 +184,7 @@ TType* GetVerySimpleDataType<std::string>(const TTypeEnvironment& env) return TDataType::Create(NUdf::TDataType<char*>::Id, env); } -template<typename K, typename V> +template<typename K, typename V, typename TMapImpl> class TBlockSampler : public IBlockSampler { using TSamples = TBlockKVStream<K, V>::TSamples; @@ -182,7 +232,7 @@ public: void ComputeReferenceResult(TComputationContext& ctx) override { - Y_ENSURE(RefResult.empty()); + Y_ENSURE(MapEmpty<TMapImpl>(RefResult)); const THolder<IWideStream> refStreamPtr = MakeStream(ctx); IWideStream& refStream = *refStreamPtr; @@ -190,27 +240,35 @@ public: NUdf::TUnboxedValue columns[3]; while (refStream.WideFetch(columns, 3) == NUdf::EFetchStatus::Ok) { - UpdateMapFromBlocks(columns[0], columns[1], RefResult); + TUpdateMapFromBlocks<TMapImpl, K>::Update(columns[0], columns[1], RefResult); } } void VerifyReferenceResultAgainstRaw() override { - Y_ENSURE(!RefResult.empty()); Y_ENSURE(!RawResult.empty()); // TODO: Replace UNIT_ASSERTS with something else, or actually set up the unit test thread context - UNIT_ASSERT_VALUES_EQUAL(RefResult.size(), RawResult.size()); - for (const auto& tuple : RawResult) { - auto otherIt = RefResult.find(tuple.first); - UNIT_ASSERT(otherIt != RefResult.end()); - UNIT_ASSERT_VALUES_EQUAL(tuple.second, otherIt->second); + if constexpr (!TMapImpl::CustomOps) { + UNIT_ASSERT_VALUES_EQUAL(RefResult.size(), RawResult.size()); + for (const auto& tuple : RawResult) { + auto otherIt = RefResult.find(tuple.first); + UNIT_ASSERT(otherIt != RefResult.end()); + UNIT_ASSERT_VALUES_EQUAL(tuple.second, otherIt->second); + } + } else { + UNIT_ASSERT_VALUES_EQUAL(RawResult.size(), TMapImpl::Size(RefResult)); + TMapImpl::IteratePairs(RefResult, [&](const K& k, const V& v) { + auto otherIt = RawResult.find(k); + UNIT_ASSERT(otherIt != RawResult.end()); + UNIT_ASSERT_VALUES_EQUAL(v, otherIt->second); + }); } } void VerifyGraphResultAgainstReference(const NUdf::TUnboxedValue& blockList) override { - Y_ENSURE(!RefResult.empty()); + Y_ENSURE(!MapEmpty<TMapImpl>(RefResult)); size_t numResultItems = blockList.GetListLength(); Cerr << "Result block count: " << numResultItems << Endl; @@ -224,15 +282,10 @@ public: const auto elements = ptr[i].GetElements(); - UpdateMapFromBlocks(elements[0], elements[1], graphResult); + TUpdateMapFromBlocks<TUnorderedMapImpl<K, V>, K>::Update(elements[0], elements[1], graphResult); } - UNIT_ASSERT_VALUES_EQUAL(RefResult.size(), graphResult.size()); - for (const auto& tuple : RefResult) { - auto graphIt = graphResult.find(tuple.first); - UNIT_ASSERT(graphIt != graphResult.end()); - UNIT_ASSERT_VALUES_EQUAL(tuple.second, graphIt->second); - } + VerifyMapsAreEqual<K, V, TMapImpl>(graphResult, RefResult); } private: @@ -241,7 +294,7 @@ private: size_t BlockSize; std::unordered_map<K, V> RawResult; - std::unordered_map<K, V> RefResult; + TMapImpl::TMapType RefResult; }; THolder<IBlockSampler> CreateBlockSamplerFromParams(const TRunParams& params) @@ -250,13 +303,21 @@ THolder<IBlockSampler> CreateBlockSamplerFromParams(const TRunParams& params) switch(params.SamplerType) { case ESamplerType::StringKeysUI64Values: - return MakeHolder<TBlockSampler<std::string, ui64>>( - params, - MakeKeyedString64Samples(*params.RandomSeed, params.RowsPerRun, params.NumKeys - 1, params.LongStringKeys)); + return DispatchByMap<std::string, ui64, IBlockSampler>(params.ReferenceHashType, [&](auto&& impl) { + using MapImpl = std::decay_t<decltype(impl)>; + using SamplerType = TBlockSampler<std::string, ui64, MapImpl>; + return MakeHolder<SamplerType>( + params, + MakeKeyedString64Samples(*params.RandomSeed, params.RowsPerRun, params.NumKeys - 1, params.LongStringKeys)); + }); case ESamplerType::UI64KeysUI64Values: - return MakeHolder<TBlockSampler<ui64, ui64>>( - params, - MakeKeyed6464Samples(*params.RandomSeed, params.RowsPerRun, params.NumKeys - 1)); + return DispatchByMap<ui64, ui64, IBlockSampler>(params.ReferenceHashType, [&](auto&& impl) { + using MapImpl = std::decay_t<decltype(impl)>; + using SamplerType = TBlockSampler<ui64, ui64, MapImpl>; + return MakeHolder<SamplerType>( + params, + MakeKeyed6464Samples(*params.RandomSeed, params.RowsPerRun, params.NumKeys - 1)); + }); } } diff --git a/ydb/core/kqp/tools/combiner_perf/streams.h b/ydb/core/kqp/tools/combiner_perf/streams.h index 6bb2ba8f59b..413400fa3b7 100644 --- a/ydb/core/kqp/tools/combiner_perf/streams.h +++ b/ydb/core/kqp/tools/combiner_perf/streams.h @@ -1,9 +1,11 @@ -// Sample data generators and streams +// Sample data generators, streams, and reference aggregation implementations #pragma once #include "converters.h" #include "run_params.h" +#include "value_wrapper.h" +#include "hashmaps.h" #include <yql/essentials/minikql/comp_nodes/ut/mkql_computation_node_ut.h> #include <yql/essentials/minikql/computation/mkql_block_builder.h> @@ -17,6 +19,7 @@ T6464Samples MakeKeyed6464Samples(const ui64 seed, const size_t numSamples, cons using TString64Samples = std::vector<std::pair<std::string, ui64>>; TString64Samples MakeKeyedString64Samples(const ui64 seed, const size_t numSamples, const unsigned int maxKey, const bool longStrings); + struct IWideStream : public NUdf::TBoxedValue { NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& result) final override { @@ -27,7 +30,7 @@ struct IWideStream : public NUdf::TBoxedValue NUdf::EFetchStatus WideFetch(NUdf::TUnboxedValue* result, ui32 width) override = 0; }; -template<typename K, typename V, bool EmbeddedKeys> +template<typename K, typename V, bool EmbeddedKeys, size_t NumAggregates = 1> struct TKVStream : public IWideStream { using TSamples = std::vector<std::pair<K, V>>; @@ -61,13 +64,17 @@ public: CurrSample = Samples.begin(); } - if (width != 2) { - ythrow yexception() << "width 2 expected"; + if (width != 1 + NumAggregates) { + ythrow yexception() << "width " << (1 + NumAggregates) << " expected"; } // TODO: support embedded strings in values? NativeToUnboxed<EmbeddedKeys>(CurrSample->first, result[0]); - NativeToUnboxed<false>(CurrSample->second, result[1]); + NUdf::TUnboxedValuePod val; + NativeToUnboxed<false>(CurrSample->second, val); + for (size_t i = 0; i < NumAggregates; ++i) { + result[1 + i] = val; + } ++CurrSample; @@ -75,7 +82,8 @@ public: } }; -template<typename K, typename V> + +template<typename K, typename V, size_t NumAggregates> std::unordered_map<K, V> ComputeStreamSumResult(const NUdf::TUnboxedValue& wideStream) { std::unordered_map<K, V> expects; @@ -91,7 +99,10 @@ std::unordered_map<K, V> ComputeStreamSumResult(const NUdf::TUnboxedValue& wideS continue; // spilling combiners do yield sometimes } const K key = UnboxedToNative<K>(resultValues[0]); - const V value = UnboxedToNative<V>(resultValues[1]); + V value; + for (size_t i = 0; i < NumAggregates; ++i) { + value[i] = UnboxedToNative<typename V::TElement>(resultValues[1 + i]); + } expects[key] += value; ++numFetches; } @@ -100,53 +111,69 @@ std::unordered_map<K, V> ComputeStreamSumResult(const NUdf::TUnboxedValue& wideS return expects; } -// Direct version without the BoxedValueAccessor -template<typename K, typename V> -std::unordered_map<K, V> ComputeStreamSumResult(IWideStream& referenceStream) +// Direct version without the BoxedValueAccessor for use in reference timing measurements +template<typename K, typename V, size_t NumAggregates, typename TMapImpl> +void ComputeReferenceStreamSumResult(IWideStream& referenceStream, typename TMapImpl::TMapType& refResult) { - std::unordered_map<K, V> expects; - { - NUdf::TUnboxedValue inputs[2]; + NUdf::TUnboxedValue inputs[2]; - while (referenceStream.WideFetch(inputs, 2) == NUdf::EFetchStatus::Ok) { - expects[UnboxedToNative<K>(inputs[0])] += inputs[1].Get<V>(); + while (referenceStream.WideFetch(inputs, 1 + NumAggregates) == NUdf::EFetchStatus::Ok) { + V tmp; + for (size_t i = 0; i < V::ArrayWidth; ++i) { + tmp[i] = inputs[1 + i].Get<typename V::TElement>(); + } + if constexpr (!TMapImpl::CustomOps) { + refResult[UnboxedToNative<K>(inputs[0])] += tmp; + } else { + TMapImpl::AggregateByKey(refResult, UnboxedToNative<K>(inputs[0]), tmp); } } - return expects; } -template<typename K, typename V> -void VerifyMapsAreEqual(const std::unordered_map<K, V> computedMap, const std::unordered_map<K, V>& refMap) +template<typename K, typename V, typename TMapImpl> +void VerifyMapsAreEqual(const std::unordered_map<K, V> computedMap, const typename TMapImpl::TMapType& refMap) { - UNIT_ASSERT_VALUES_EQUAL(computedMap.size(), refMap.size()); - - for (const auto referencePair : refMap) { - const auto ii = computedMap.find(referencePair.first); - UNIT_ASSERT(ii != computedMap.end()); - UNIT_ASSERT_VALUES_EQUAL(referencePair.second, ii->second); + if constexpr (!TMapImpl::CustomOps) { + UNIT_ASSERT_VALUES_EQUAL(computedMap.size(), refMap.size()); + for (const auto referencePair : refMap) { + const auto ii = computedMap.find(referencePair.first); + UNIT_ASSERT(ii != computedMap.end()); + UNIT_ASSERT_VALUES_EQUAL(referencePair.second, ii->second); + } + } else { + UNIT_ASSERT_VALUES_EQUAL(computedMap.size(), TMapImpl::Size(refMap)); + TMapImpl::IteratePairs(refMap, [computedMap](const K& k, const V& v){ + const auto ii = computedMap.find(k); + UNIT_ASSERT(ii != computedMap.end()); + UNIT_ASSERT_VALUES_EQUAL(v, ii->second); + }); } } -template<typename K, typename V> -void VerifyStreamVsUnorderedMap(const NUdf::TUnboxedValue& wideStream, const std::unordered_map<K, V>& refMap) +template<typename TMapImpl, typename K, typename V, size_t NumAggregates> +void VerifyStreamVsMap(const NUdf::TUnboxedValue& wideStream, const typename TMapImpl::TMapType& refMap) { - std::unordered_map<K, V> resultMap = ComputeStreamSumResult<K, V>(wideStream); - VerifyMapsAreEqual(resultMap, refMap); + auto resultMap = ComputeStreamSumResult<K, V, NumAggregates>(wideStream); + VerifyMapsAreEqual<K, V, TMapImpl>(resultMap, refMap); } -template<typename K, typename V> -void VerifyListVsUnorderedMap(const NUdf::TUnboxedValue& pairList, const std::unordered_map<K, V>& refMap) +template<typename TMapImpl, typename K, typename V, size_t NumAggregates> +void VerifyListVsMap(const NUdf::TUnboxedValue& pairList, const typename TMapImpl::TMapType& refMap) { std::unordered_map<K, V> resultMap; + const auto ptr = pairList.GetElements(); for (size_t i = 0; i < pairList.GetListLength(); ++i) { const auto elements = ptr[i].GetElements(); const auto key = UnboxedToNative<K>(elements[0]); - const auto value = UnboxedToNative<V>(elements[1]); + V value; + for (size_t i = 0; i < NumAggregates; ++i) { + value[i] = UnboxedToNative<typename V::TElement>(elements[1 + i]); + } resultMap[key] += value; } - VerifyMapsAreEqual(resultMap, refMap); + VerifyMapsAreEqual<K, V, TMapImpl>(resultMap, refMap); } template<size_t Width> @@ -181,11 +208,12 @@ public: virtual std::string Describe() const = 0; }; +template<typename TMapImpl, size_t NumAggregates> class T6464DataSampler : public IDataSampler { public: TKVStream<ui64, ui64, false>::TSamples Samples; - std::unordered_map<ui64, ui64> Expects; + TMapImpl::TMapType Expects; size_t StreamNumIters = 0; @@ -197,7 +225,7 @@ public: THolder<IWideStream> MakeStream(const THolderFactory& holderFactory) const override { - return THolder(new TKVStream<ui64, ui64, false>(holderFactory, Samples, StreamNumIters)); + return THolder(new TKVStream<ui64, ui64, false, NumAggregates>(holderFactory, Samples, StreamNumIters)); } TType* GetKeyType(TProgramBuilder& pb) const override @@ -207,32 +235,33 @@ public: void ComputeReferenceResult(IWideStream& referenceStream) override { - Y_ENSURE(Expects.empty()); + Y_ENSURE(MapEmpty<TMapImpl>(Expects)); - Expects = ComputeStreamSumResult<ui64, ui64>(referenceStream); + ComputeReferenceStreamSumResult<ui64, TValueWrapper<ui64, NumAggregates>, NumAggregates, TMapImpl>(referenceStream, Expects); } void VerifyStreamVsReference(const NUdf::TUnboxedValue& wideStream) const override { - VerifyStreamVsUnorderedMap(wideStream, Expects); + VerifyStreamVsMap<TMapImpl, ui64, TValueWrapper<ui64, NumAggregates>, NumAggregates>(wideStream, Expects); } void VerifyComputedValueVsReference(const NUdf::TUnboxedValue& value) const override { - VerifyListVsUnorderedMap(value, Expects); + VerifyListVsMap<TMapImpl, ui64, TValueWrapper<ui64, NumAggregates>, NumAggregates>(value, Expects); } std::string Describe() const override { - return "ui64 keys, ui64 values"; + return std::string("keys: ui64, values: ") + TValueWrapper<ui64, NumAggregates>::Describe(); } }; +template<typename TMapImpl, size_t NumAggregates> class TString64DataSampler : public IDataSampler { public: - TKVStream<std::string, ui64, false>::TSamples Samples; - std::unordered_map<std::string, ui64> Expects; + TKVStream<std::string, ui64, false, NumAggregates>::TSamples Samples; + TMapImpl::TMapType Expects; size_t StreamNumIters = 0; bool LongStrings = false; @@ -247,9 +276,9 @@ public: THolder<IWideStream> MakeStream(const THolderFactory& holderFactory) const override { if (LongStrings) { - return THolder(new TKVStream<std::string, ui64, false>(holderFactory, Samples, StreamNumIters)); + return THolder(new TKVStream<std::string, ui64, false, NumAggregates>(holderFactory, Samples, StreamNumIters)); } else { - return THolder(new TKVStream<std::string, ui64, true>(holderFactory, Samples, StreamNumIters)); + return THolder(new TKVStream<std::string, ui64, true, NumAggregates>(holderFactory, Samples, StreamNumIters)); } } @@ -260,24 +289,25 @@ public: void ComputeReferenceResult(IWideStream& referenceStream) override { - Y_ENSURE(Expects.empty()); + Y_ENSURE(MapEmpty<TMapImpl>(Expects)); - Expects = ComputeStreamSumResult<std::string, ui64>(referenceStream); + ComputeReferenceStreamSumResult<std::string, TValueWrapper<ui64, NumAggregates>, NumAggregates, TMapImpl>(referenceStream, Expects); } void VerifyStreamVsReference(const NUdf::TUnboxedValue& wideStream) const override { - VerifyStreamVsUnorderedMap(wideStream, Expects); + VerifyStreamVsMap<TMapImpl, std::string, TValueWrapper<ui64, NumAggregates>, NumAggregates>(wideStream, Expects); } void VerifyComputedValueVsReference(const NUdf::TUnboxedValue& value) const override { - VerifyListVsUnorderedMap(value, Expects); + VerifyListVsMap<TMapImpl, std::string, TValueWrapper<ui64, NumAggregates>, NumAggregates>(value, Expects); } std::string Describe() const override { - return Sprintf("%s string keys, ui64 values", LongStrings ? "Long (24 byte)" : "Embedded"); + std::string valueDescr = TValueWrapper<ui64, NumAggregates>::Describe(); + return Sprintf("keys: %s string, values: %s", LongStrings ? "Long (24 byte)" : "Embedded", valueDescr.c_str()); } }; diff --git a/ydb/core/kqp/tools/combiner_perf/value_wrapper.h b/ydb/core/kqp/tools/combiner_perf/value_wrapper.h new file mode 100644 index 00000000000..e59481bf40e --- /dev/null +++ b/ydb/core/kqp/tools/combiner_perf/value_wrapper.h @@ -0,0 +1,59 @@ +#pragma once + +#include <util/generic/string.h> +#include <util/stream/output.h> +#include <util/string/printf.h> + +#include <array> +#include <string> + +namespace NKikimr { +namespace NMiniKQL { + +// A simple wrapper for aggregate sums over both scalar values and tuples. +// Just a statically-sized array for now. + +template<typename V, size_t Width> +struct TValueWrapper: public std::array<V, Width> +{ + using TElement = V; + constexpr static const size_t ArrayWidth = Width; + + TValueWrapper() + : std::array<V, Width>{} + { + }; + + TValueWrapper& operator+= (const TValueWrapper& rhs) + { + for (size_t i = 0; i < Width; ++i) { + (*this)[i] += rhs[i]; + } + return *this; + } + + static std::string Describe() + { + if constexpr (std::is_same_v<V, ui64>) { + return Sprintf("array<ui64, %lu>", Width); + } else if constexpr (std::is_same_v<V, std::string>) { + return Sprintf("array<string, %lu>", Width); + } else { + return Sprintf("array<?, %lu>", Width); + } + } +}; + +} +} + +template<typename V, size_t Width> +IOutputStream& operator << (IOutputStream& out Y_LIFETIME_BOUND, const NKikimr::NMiniKQL::TValueWrapper<V, Width>& wrapper) { + for (size_t i = 0; i < Width; ++i) { + if (i > 0) { + out << ", "; + } + out << wrapper[i]; + } + return out; +}
\ No newline at end of file |