aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPavel Zuev <pzuev@ydb.tech>2025-05-05 16:47:44 +0500
committerGitHub <noreply@github.com>2025-05-05 11:47:44 +0000
commit8e93406a08599cd346ecbe48e9034fae0fff2e7f (patch)
tree1c2f3d43f961c6b4c422a80b3c55a945ec26f60c
parenta8596d8b407dadf9e47f664cfd242b9a90f10967 (diff)
downloadydb-8e93406a08599cd346ecbe48e9034fae0fff2e7f.tar.gz
Support for selectable hashmap implementation in the combiner_perf (#18010)
-rwxr-xr-xydb/core/kqp/tools/combiner_perf/bin/format_markdown.py75
-rw-r--r--ydb/core/kqp/tools/combiner_perf/bin/main.cpp44
-rw-r--r--ydb/core/kqp/tools/combiner_perf/converters.h6
-rw-r--r--ydb/core/kqp/tools/combiner_perf/hashmaps.h136
-rw-r--r--ydb/core/kqp/tools/combiner_perf/printout.cpp39
-rw-r--r--ydb/core/kqp/tools/combiner_perf/printout.h8
-rw-r--r--ydb/core/kqp/tools/combiner_perf/run_params.h10
-rw-r--r--ydb/core/kqp/tools/combiner_perf/simple.cpp14
-rw-r--r--ydb/core/kqp/tools/combiner_perf/simple_block.cpp16
-rw-r--r--ydb/core/kqp/tools/combiner_perf/simple_last.cpp19
-rw-r--r--ydb/core/kqp/tools/combiner_perf/streams.cpp199
-rw-r--r--ydb/core/kqp/tools/combiner_perf/streams.h126
-rw-r--r--ydb/core/kqp/tools/combiner_perf/value_wrapper.h59
13 files changed, 585 insertions, 166 deletions
diff --git a/ydb/core/kqp/tools/combiner_perf/bin/format_markdown.py b/ydb/core/kqp/tools/combiner_perf/bin/format_markdown.py
index 586e87a24b8..cc11128c36d 100755
--- a/ydb/core/kqp/tools/combiner_perf/bin/format_markdown.py
+++ b/ydb/core/kqp/tools/combiner_perf/bin/format_markdown.py
@@ -38,18 +38,14 @@ def gen_chart(ref_time, graph_time, llvm_time = None):
out = ''
out += draw_line(ref_time, None, 'C++', None)
- if ref_time == 0:
- hue = None
- else:
- hue = 120 - int(120.0 * ((graph_time / ref_time) - 0.5) / 1.5) # Map [0.5, 2] -> [120, 0]
- hue = max(0, min(hue, 120)) # clamp to the [0, 120] range; hue 0 = red, hue 120 = green
-
def add_colored_bar(ref_value, result_value, title):
+ hue = None
+ comment = None
if result_value:
shame_ratio = result_value / ref_value
comment = 'x %.1f' % shame_ratio
- else:
- comment = None
+ hue = 120 - int(120.0 * (shame_ratio - 0.5) / 1.5) # Map [0.5, 2] -> [120, 0]
+ hue = max(0, min(hue, 120)) # clamp to the [0, 120] range; hue 0 = red, hue 120 = green
return draw_line(result_value, hue, title, comment)
out += add_colored_bar(ref_time, graph_time, 'Graph')
@@ -87,16 +83,22 @@ def format_time(ms):
return '%.2f' % (ms / 1000.0)
def format_mem(bytez):
+ if bytez is None:
+ return ' '
return '%.1f' % (bytez / (1024.0 * 1024.0))
def do_merge_llvm(samples):
- sorted_samples = sorted(samples, key=lambda sample: sample.get('llvm', False))
output_samples = []
+ for sample in samples:
+ if not sample.get('llvm', False):
+ output_samples.append(sample)
+
+ sorted_samples = sorted(samples, key=lambda sample: sample.get('llvm', False))
index = {}
for sample in sorted_samples:
is_llvm = sample.get('llvm', False)
- key = (sample['testName'], sample['numKeys'], sample_rows(sample), sample.get('spilling', False), sample.get('blockSize', 0), sample.get('combinerMemLimit', 0))
+ key = (sample['testName'], sample['numKeys'], sample_rows(sample), sample.get('spilling', False), sample.get('blockSize', 0), sample.get('combinerMemLimit', 0), sample['hashType'])
if key in index and not is_llvm:
raise Exception('Attempted to merge two non-LLVM result samples, key = %s' % repr(key))
if key not in index and is_llvm:
@@ -109,10 +111,48 @@ def do_merge_llvm(samples):
index[key]['llvmCleanTime'] = result_time_or_zero
else:
index[key] = sample
- output_samples.append(sample)
return output_samples
+def samples_sorted_by_section(samples):
+ sort_orders = {
+ 'testName': {},
+ 'totalRows': {},
+ 'spilling': {},
+ 'blockSize': {},
+ 'combinerMemLimit': {},
+ 'hashType': {},
+ 'numKeys': {},
+ }
+
+ for sample in samples:
+ for key in sort_orders.keys():
+ if key in sample:
+ so = sort_orders[key]
+ value = sample[key]
+ if value not in so:
+ so[value] = len(so)
+
+ def sort_order(sample):
+ result = []
+ for key in sort_orders.keys():
+ if key not in sample:
+ result.append(-1)
+ continue
+ result.append(sort_orders[key][sample[key]])
+ return result
+
+ return sorted(samples, key=sort_order)
+
+
+def friendly_hash_name(hash_type):
+ types = {
+ 'std': 'std::unordered_map',
+ 'absl': 'absl::flat_hash_map',
+ }
+ return types[hash_type]
+
+
def do_format(merge_llvm):
per_section = collections.defaultdict(list)
@@ -122,13 +162,17 @@ def do_format(merge_llvm):
if not line:
continue
sample = json.loads(line)
+ sample['totalRows'] = sample_rows(sample)
+ sample['hashType'] = sample.get('hashType', 'std')
all_samples.append(sample)
+ all_samples = samples_sorted_by_section(all_samples)
+
if merge_llvm:
all_samples = do_merge_llvm(all_samples)
for sample in all_samples:
- section_name = (sample['testName'], sample_rows(sample), sample.get('llvm', False), sample.get('spilling', False), sample.get('blockSize', 0), sample.get('combinerMemLimit', 0))
+ section_name = (sample['testName'], sample_rows(sample), sample.get('llvm', False), sample.get('spilling', False), sample.get('blockSize', 0), sample.get('combinerMemLimit', 0), sample['hashType'])
per_section[section_name].append(sample)
for _, samples in per_section.items():
@@ -150,6 +194,8 @@ def do_format(merge_llvm):
memlimit_formatted = format_mem(memlimit)
traits.append(f'{memlimit_formatted} MB RAM limit')
traits.append(f'{num_rows_formatted} input rows')
+ hash_name = friendly_hash_name(samples[0]['hashType'])
+ traits.append(f'{hash_name}')
traits_str = ', '.join(traits)
own_times = []
@@ -173,7 +219,7 @@ def do_format(merge_llvm):
]
headers += [
'MaxRSS delta, MB',
- 'Bytes per key',
+ 'Reference MaxRSS delta, MB',
]
print(''.join(['<th>%s</th>' % item for item in headers]) + '\n')
@@ -201,9 +247,8 @@ def do_format(merge_llvm):
if has_llvm_column:
cols.append(format_time(llvm_time_or_zero))
cols.append(format_mem(sample['maxRssDelta']))
- bytes_per_key = sample['maxRssDelta'] // sample['numKeys']
+ cols.append(format_mem(sample.get('referenceMaxRssDelta', None)))
- cols.append(str(bytes_per_key) if 0 < bytes_per_key < 10000 else ' ')
print('<tr>' + ''.join(['<td>%s</td>' % col for col in cols]) + '</tr>\n')
print('</table>\n:::\n')
diff --git a/ydb/core/kqp/tools/combiner_perf/bin/main.cpp b/ydb/core/kqp/tools/combiner_perf/bin/main.cpp
index 6871932de52..28a6df0c41b 100644
--- a/ydb/core/kqp/tools/combiner_perf/bin/main.cpp
+++ b/ydb/core/kqp/tools/combiner_perf/bin/main.cpp
@@ -19,6 +19,20 @@
using NKikimr::NMiniKQL::TRunParams;
+TStringBuf HashMapTypeName(NKikimr::NMiniKQL::EHashMapImpl implType)
+{
+ switch (implType) {
+ case NKikimr::NMiniKQL::EHashMapImpl::UnorderedMap:
+ return "std";
+ case NKikimr::NMiniKQL::EHashMapImpl::Absl:
+ return "absl";
+ case NKikimr::NMiniKQL::EHashMapImpl::YqlRobinHood:
+ return "robinhood";
+ default:
+ ythrow yexception() << "Unknown hashmap impl type";
+ }
+}
+
class TPrintingResultCollector : public TTestResultCollector
{
public:
@@ -38,6 +52,7 @@ public:
Cout << "Block size: " << runParams.BlockSize << Endl;
Cout << "Long strings: " << (runParams.LongStringKeys ? "yes" : "no") << Endl;
Cout << "Combiner mem limit: " << runParams.WideCombinerMemLimit << Endl;
+ Cout << "Hash map type: " << HashMapTypeName(runParams.ReferenceHashType) << Endl;
Cout << Endl;
Cout << "Graph runtime is: " << result.ResultTime << " vs. reference C++ implementation: " << result.ReferenceTime << Endl;
@@ -81,6 +96,7 @@ public:
out["longStringKeys"] = runParams.LongStringKeys;
out["numKeys"] = runParams.NumKeys;
out["combinerMemLimit"] = runParams.WideCombinerMemLimit;
+ out["hashType"] = HashMapTypeName(runParams.ReferenceHashType);
out["generatorTime"] = result.GeneratorTime.MilliSeconds();
out["resultTime"] = result.ResultTime.MilliSeconds();
@@ -99,9 +115,9 @@ void DoFullPass(TRunParams runParams, bool withSpilling)
TJsonResultCollector printout;
- // const std::vector<size_t> numKeys = {4u, 1000u, 100'000u, 1'000'000u, 10'000'000, 30'000'000};
- // const std::vector<size_t> numKeys = {60'000'000, 120'000'000};
- const std::vector<size_t> numKeys = {1'000'000u};
+ const std::vector<size_t> numKeys = {4u, 1000u, 100'000u, 1'000'000u, 10'000'000};
+ //const std::vector<size_t> numKeys = {60'000'000, 120'000'000};
+ //const std::vector<size_t> numKeys = {30'000'000u};
const std::vector<size_t> blockSizes = {128u, 8192u};
auto doSimple = [&printout, numKeys](const TRunParams& params) {
@@ -244,6 +260,23 @@ int main(int argc, const char* argv[])
})
.Help("Input data type: string key -> ui64 numeric value or ui64 numeric key -> ui64 numeric value");
+ options.AddLongOption("hashmap")
+ .Choices({"std", "absl", "robinhood"})
+ .RequiredArgument()
+ .Handler1([&](const NLastGetopt::TOptsParser* option) {
+ auto val = TStringBuf(option->CurVal());
+ if (val == "std") {
+ runParams.ReferenceHashType = NKikimr::NMiniKQL::EHashMapImpl::UnorderedMap;
+ } else if (val == "absl") {
+ runParams.ReferenceHashType = NKikimr::NMiniKQL::EHashMapImpl::Absl;
+ } else if (val == "robinhood") {
+ runParams.ReferenceHashType = NKikimr::NMiniKQL::EHashMapImpl::YqlRobinHood;
+ } else {
+ ythrow yexception() << "Unimplemented hash map type: " << val;
+ }
+ })
+ .Help("Hash map type (std::unordered_map or absl::dense_hash_map)");
+
options
.AddLongOption('t', "test")
.Choices({"combiner", "last-combiner", "block-combiner"})
@@ -262,6 +295,10 @@ int main(int argc, const char* argv[])
})
.Help("Enable single test run mode");
+ options.AddLongOption("no-verify").NoArgument().Handler0([&](){
+ runParams.EnableVerification = false;
+ });
+
options
.AddLongOption('m', "mode")
.Choices({"gen", "ref", "graph", "all"})
@@ -305,6 +342,7 @@ int main(int argc, const char* argv[])
if (testType != ETestType::All) {
DoSelectedTest(runParams, testType, llvm, spilling);
} else {
+ runParams.AlwaysSubprocess = true;
DoFullPass(runParams, spilling);
}
diff --git a/ydb/core/kqp/tools/combiner_perf/converters.h b/ydb/core/kqp/tools/combiner_perf/converters.h
index 76a76062ffd..1eefb65f64c 100644
--- a/ydb/core/kqp/tools/combiner_perf/converters.h
+++ b/ydb/core/kqp/tools/combiner_perf/converters.h
@@ -13,16 +13,16 @@ namespace NKikimr {
namespace NMiniKQL {
template<bool Embedded>
-void NativeToUnboxed(const ui64 value, NUdf::TUnboxedValue& result)
+void NativeToUnboxed(const ui64 value, NUdf::TUnboxedValuePod& result)
{
result = NUdf::TUnboxedValuePod(value);
}
template<bool Embedded>
-void NativeToUnboxed(const std::string& value, NUdf::TUnboxedValue& result)
+void NativeToUnboxed(const std::string& value, NUdf::TUnboxedValuePod& result)
{
if constexpr (Embedded) {
- result = NUdf::TUnboxedValue::Embedded(value);
+ result = NUdf::TUnboxedValuePod::Embedded(value);
} else {
result = NUdf::TUnboxedValuePod(NUdf::TStringValue(value));
}
diff --git a/ydb/core/kqp/tools/combiner_perf/hashmaps.h b/ydb/core/kqp/tools/combiner_perf/hashmaps.h
new file mode 100644
index 00000000000..255cf2470fc
--- /dev/null
+++ b/ydb/core/kqp/tools/combiner_perf/hashmaps.h
@@ -0,0 +1,136 @@
+// Hash map implementation wrappers
+
+#pragma once
+
+#include <yql/essentials/minikql/comp_nodes/mkql_rh_hash.h>
+#include <yql/essentials/public/udf/udf_value.h>
+
+#include <library/cpp/containers/absl_flat_hash/flat_hash_map.h>
+
+#include <unordered_map>
+
+namespace NKikimr {
+namespace NMiniKQL {
+
+template<typename K, typename V>
+struct TUnorderedMapImpl
+{
+ using TValueType = V;
+ using TMapType = std::unordered_map<K, V>;
+ constexpr static bool CustomOps = false;
+};
+
+template<typename K, typename V>
+struct TAbslMapImpl
+{
+ using TValueType = V;
+ using TMapType = absl::flat_hash_map<K, V>;
+ constexpr static bool CustomOps = false;
+};
+
+template<typename K, typename V, typename TEqualTo = std::equal_to<K>, typename THash = std::hash<K>>
+struct TRobinHoodMapImplBase
+{
+ // Warning: this implementation leaks long strings because it can't call destructors.
+ // Also it moves keys and values by simply copying bytes so take care.
+ using TValueType = V;
+ using TMapType = TRobinHoodHashFixedMap<K, V, TEqualTo, THash>;
+ constexpr static bool CustomOps = true;
+
+ static void AggregateByKey(TMapType& map, const K& key, const V& delta)
+ {
+ bool isNew = false;
+ auto ptr = map.Insert(key, isNew);
+ if (isNew) {
+ *(V*)map.GetMutablePayload(ptr) = delta;
+ map.CheckGrow();
+ } else {
+ *(V*)map.GetMutablePayload(ptr) += delta;
+ }
+ }
+
+ template<typename Callback>
+ static void IteratePairs(const TMapType& map, Callback&& callback)
+ {
+ // TODO: GetPayload and IsValid should be const
+ for (const char* iter = map.Begin(); iter != map.End(); map.Advance(iter)) {
+ if (!const_cast<TMapType&>(map).IsValid(iter)) {
+ continue;
+ }
+ const auto& key = map.GetKey(iter);
+ const auto& value = *(V*)(const_cast<TMapType&>(map)).GetPayload(iter);
+ callback(key, value);
+ }
+ }
+
+ static size_t Size(const TMapType& map)
+ {
+ return map.GetSize();
+ }
+};
+
+template<typename K, typename V>
+struct TRobinHoodMapImpl: public TRobinHoodMapImplBase<K, V>
+{
+};
+
+template<typename V>
+struct TRobinHoodMapImpl<std::string, V>
+{
+ using TValueType = V;
+
+ struct TEqualTo
+ {
+ bool operator()(const NYql::NUdf::TUnboxedValuePod& lhs, const NYql::NUdf::TUnboxedValuePod& rhs) {
+ return lhs.AsStringRef() == rhs.AsStringRef();
+ }
+ };
+
+ struct THash
+ {
+ absl::Hash<std::string_view> AbslHash;
+
+ size_t operator()(const NYql::NUdf::TUnboxedValuePod& val) {
+ auto result = AbslHash(val.AsStringRef());
+ return result;
+ }
+ };
+
+ using TBase = TRobinHoodMapImplBase<std::string, V>;
+ using TRealBase = TRobinHoodMapImplBase<NYql::NUdf::TUnboxedValuePod, V, TEqualTo, THash>;
+ using TMapType = TRealBase::TMapType;
+
+ constexpr static bool CustomOps = true;
+
+ static void AggregateByKey(TMapType& map, const std::string& key, const V& delta)
+ {
+ NYql::NUdf::TUnboxedValuePod ub = NYql::NUdf::TUnboxedValuePod::Embedded(NYql::NUdf::TStringRef(key));
+ TRealBase::AggregateByKey(map, ub, delta);
+ }
+
+ template<typename Callback>
+ static void IteratePairs(const TMapType& map, Callback&& callback)
+ {
+ TRealBase::IteratePairs(map, [callback](const NYql::NUdf::TUnboxedValuePod& k, const V& v) {
+ callback(std::string(k.AsStringRef()), v);
+ });
+ }
+
+ static size_t Size(const TMapType& map)
+ {
+ return TRealBase::Size(map);
+ }
+};
+
+template<typename TMapImpl>
+bool MapEmpty(const typename TMapImpl::TMapType& map)
+{
+ if constexpr (TMapImpl::CustomOps) {
+ return TMapImpl::Size(map) == 0;
+ } else {
+ return map.empty();
+ }
+}
+
+}
+} \ No newline at end of file
diff --git a/ydb/core/kqp/tools/combiner_perf/printout.cpp b/ydb/core/kqp/tools/combiner_perf/printout.cpp
index 2c4524f8afc..137a3bea441 100644
--- a/ydb/core/kqp/tools/combiner_perf/printout.cpp
+++ b/ydb/core/kqp/tools/combiner_perf/printout.cpp
@@ -2,6 +2,10 @@
#include <sys/resource.h>
+#if defined (_darwin_)
+#include <mach/mach.h>
+#endif
+
long GetMaxRSS()
{
rusage usage;
@@ -16,7 +20,7 @@ long GetMaxRSSDelta(const long prevMaxRss)
long maxRss = GetMaxRSS();
if (maxRss <= 0 || prevMaxRss <= 0 || maxRss < prevMaxRss) {
- ythrow yexception() << "Bad maxRSS measurement, before: " << prevMaxRss << ", after: " << maxRss << Endl;
+ ythrow yexception() << "Bad maxRSS measurement, before: " << prevMaxRss << ", after: " << maxRss << Endl;
}
#if defined (_darwin_)
@@ -28,6 +32,39 @@ long GetMaxRSSDelta(const long prevMaxRss)
return (maxRss - prevMaxRss) * factor;
}
+TDuration GetThreadCPUTime()
+{
+ ui64 us = 0;
+
+#if defined (_darwin_)
+ thread_basic_info_data_t info = {};
+ mach_msg_type_number_t info_count = THREAD_BASIC_INFO_COUNT;
+ kern_return_t kern_err = thread_info(
+ mach_thread_self(),
+ THREAD_BASIC_INFO,
+ (thread_info_t)&info,
+ &info_count
+ );
+ if (kern_err == KERN_SUCCESS) {
+ us = 1000000ULL * info.user_time.seconds + info.user_time.microseconds + 1000000ULL*info.system_time.seconds + info.system_time.microseconds;
+ }
+#else
+ us = ThreadCPUTime();
+#endif
+
+ if (!us) {
+ ythrow yexception() << "Failed to obtain thread CPU time";
+ }
+
+ return TDuration::MicroSeconds(us);
+}
+
+TDuration GetThreadCPUTimeDelta(const TDuration startTime)
+{
+ TDuration current = GetThreadCPUTime();
+ return current - startTime;
+}
+
void MergeRunResults(const TRunResult& src, TRunResult& dst)
{
dst.ResultTime = Min(src.ResultTime, dst.ResultTime);
diff --git a/ydb/core/kqp/tools/combiner_perf/printout.h b/ydb/core/kqp/tools/combiner_perf/printout.h
index 1cb02b9a3a9..8c700bd6e91 100644
--- a/ydb/core/kqp/tools/combiner_perf/printout.h
+++ b/ydb/core/kqp/tools/combiner_perf/printout.h
@@ -40,6 +40,10 @@ public:
};
// ru_maxrss from rusage, converted to bytes
-long GetMaxRSS();
+long Y_NO_INLINE GetMaxRSS();
-long GetMaxRSSDelta(const long prevMaxRss); \ No newline at end of file
+long Y_NO_INLINE GetMaxRSSDelta(const long prevMaxRss);
+
+TDuration Y_NO_INLINE GetThreadCPUTime();
+
+TDuration Y_NO_INLINE GetThreadCPUTimeDelta(const TDuration startTime); \ No newline at end of file
diff --git a/ydb/core/kqp/tools/combiner_perf/run_params.h b/ydb/core/kqp/tools/combiner_perf/run_params.h
index 406977b1a19..9f6b8a5161a 100644
--- a/ydb/core/kqp/tools/combiner_perf/run_params.h
+++ b/ydb/core/kqp/tools/combiner_perf/run_params.h
@@ -18,9 +18,16 @@ enum class ESamplerType {
UI64KeysUI64Values,
};
+enum class EHashMapImpl {
+ UnorderedMap,
+ Absl,
+ YqlRobinHood,
+};
+
struct TRunParams {
ETestMode TestMode = ETestMode::Full;
ESamplerType SamplerType = ESamplerType::StringKeysUI64Values;
+ EHashMapImpl ReferenceHashType = EHashMapImpl::UnorderedMap;
std::optional<ui64> RandomSeed;
int NumAttempts = 0;
size_t RowsPerRun = 0;
@@ -28,8 +35,11 @@ struct TRunParams {
size_t NumKeys = 0; // for numeric keys, the range is [0..NumKeys-1]
size_t BlockSize = 0;
size_t WideCombinerMemLimit = 0;
+ size_t NumAggregations = 1;
bool LongStringKeys = false;
bool MeasureReferenceMemory = false;
+ bool AlwaysSubprocess = false;
+ bool EnableVerification = true;
};
}
diff --git a/ydb/core/kqp/tools/combiner_perf/simple.cpp b/ydb/core/kqp/tools/combiner_perf/simple.cpp
index 435df4bbbc8..024a7fc9bca 100644
--- a/ydb/core/kqp/tools/combiner_perf/simple.cpp
+++ b/ydb/core/kqp/tools/combiner_perf/simple.cpp
@@ -63,11 +63,11 @@ TRunResult RunTestOverGraph(const TRunParams& params, const bool needsVerificati
// Compute node implementation
Cerr << "Compute graph result" << Endl;
- const auto graphTimeStart = TInstant::Now();
+ const auto graphTimeStart = GetThreadCPUTime();
size_t lineCount = CountWideStreamOutputs<2>(computeGraphPtr->GetValue());
Cerr << lineCount << Endl;
- return TInstant::Now() - graphTimeStart;
+ return GetThreadCPUTimeDelta(graphTimeStart);
};
auto measureRefTime = [&](auto& computeGraphPtr, IDataSampler& sampler) {
@@ -75,10 +75,10 @@ TRunResult RunTestOverGraph(const TRunParams& params, const bool needsVerificati
Cerr << "Compute reference result" << Endl;
auto referenceStream = sampler.MakeStream(computeGraphPtr->GetHolderFactory());
- const auto cppTimeStart = TInstant::Now();
+ const auto cppTimeStart = GetThreadCPUTime();
sampler.ComputeReferenceResult(*referenceStream);
- return TInstant::Now() - cppTimeStart;
+ return GetThreadCPUTimeDelta(cppTimeStart);
};
auto graphRun1 = BuildGraph(setup, *sampler, params.WideCombinerMemLimit);
@@ -106,13 +106,13 @@ TRunResult RunTestOverGraph(const TRunParams& params, const bool needsVerificati
// Measure the input stream run time
Cerr << "Generator run" << Endl;
const auto devnullStream = sampler->MakeStream(graphRun1->GetHolderFactory());
- const auto devnullStart = TInstant::Now();
+ const auto devnullStart = GetThreadCPUTime();
{
NUdf::TUnboxedValue columns[2];
while (devnullStream->WideFetch(columns, 2) == NUdf::EFetchStatus::Ok) {
}
}
- result.GeneratorTime = TInstant::Now() - devnullStart;
+ result.GeneratorTime = GetThreadCPUTimeDelta(devnullStart);
if (params.TestMode == ETestMode::GeneratorOnly) {
return result;
@@ -142,7 +142,7 @@ void RunTestSimple(const TRunParams& params, TTestResultCollector& printout)
Cerr << "======== " << __func__ << ", keys: " << params.NumKeys << ", llvm: " << LLVM << ", mem limit: " << params.WideCombinerMemLimit << Endl;
- if (params.NumAttempts <= 1 && !params.MeasureReferenceMemory) {
+ if (params.NumAttempts <= 1 && !params.MeasureReferenceMemory && !params.AlwaysSubprocess) {
finalResult = RunTestOverGraph<LLVM>(params, true, false);
}
else {
diff --git a/ydb/core/kqp/tools/combiner_perf/simple_block.cpp b/ydb/core/kqp/tools/combiner_perf/simple_block.cpp
index be34720560d..fa1cb5b9079 100644
--- a/ydb/core/kqp/tools/combiner_perf/simple_block.cpp
+++ b/ydb/core/kqp/tools/combiner_perf/simple_block.cpp
@@ -70,16 +70,16 @@ TRunResult RunTestOverGraph(const TRunParams& params, const bool measureReferenc
// Compute graph implementation
auto stream = NUdf::TUnboxedValuePod(sampler->MakeStream(computeGraphPtr->GetContext()).Release());
computeGraphPtr->GetEntryPoint(0, true)->SetValue(computeGraphPtr->GetContext(), std::move(stream));
- const auto graphStart = TInstant::Now();
+ const auto graphStart = GetThreadCPUTime();
resultList = computeGraphPtr->GetValue();
- return TInstant::Now() - graphStart;
+ return GetThreadCPUTimeDelta(graphStart);
};
auto measureRefTime = [&](auto& computeGraphPtr) {
// Reference implementation (sum via an std::unordered_map)
- const auto cppStart = TInstant::Now();
+ const auto cppStart = GetThreadCPUTime();
sampler->ComputeReferenceResult(computeGraphPtr->GetContext());
- return TInstant::Now() - cppStart;
+ return GetThreadCPUTimeDelta(cppStart);
};
auto measureGeneratorTime = [&](auto& computeGraphPtr) {
@@ -87,16 +87,16 @@ TRunResult RunTestOverGraph(const TRunParams& params, const bool measureReferenc
const auto devnullStreamPtr = sampler->MakeStream(computeGraphPtr->GetContext());
auto& devnullStream = *devnullStreamPtr;
size_t numBlocks = 0;
- const auto timeStart = TInstant::Now();
+ const auto timeStart = GetThreadCPUTime();
{
NUdf::TUnboxedValue columns[3];
while (devnullStream.WideFetch(columns, 3) == NUdf::EFetchStatus::Ok) {
++numBlocks;
}
}
- auto timeEnd = TInstant::Now();
+ auto duration = GetThreadCPUTimeDelta(timeStart);
Cerr << "Blocks generated: " << numBlocks << Endl;
- return timeEnd - timeStart;
+ return duration;
};
const auto graph = setup.BuildGraph(pgmReturn, {streamCallable});
@@ -153,7 +153,7 @@ void RunTestBlockCombineHashedSimple(const TRunParams& params, TTestResultCollec
Cerr << "======== " << __func__ << ", keys: " << params.NumKeys << ", block size: " << params.BlockSize << ", llvm: " << LLVM << Endl;
- if (params.NumAttempts <= 1 && !params.MeasureReferenceMemory) {
+ if (params.NumAttempts <= 1 && !params.MeasureReferenceMemory && !params.AlwaysSubprocess) {
finalResult = RunTestOverGraph<LLVM, Spilling>(params, false);
} else {
for (int i = 1; i <= params.NumAttempts; ++i) {
diff --git a/ydb/core/kqp/tools/combiner_perf/simple_last.cpp b/ydb/core/kqp/tools/combiner_perf/simple_last.cpp
index 33e871b45ed..413b438194e 100644
--- a/ydb/core/kqp/tools/combiner_perf/simple_last.cpp
+++ b/ydb/core/kqp/tools/combiner_perf/simple_last.cpp
@@ -26,14 +26,13 @@ namespace {
TDuration MeasureGeneratorTime(IComputationGraph& graph, const IDataSampler& sampler)
{
const auto devnullStream = sampler.MakeStream(graph.GetHolderFactory());
- const auto devnullStart = TInstant::Now();
+ const auto devnullStart = GetThreadCPUTime();
{
NUdf::TUnboxedValue columns[2];
while (devnullStream->WideFetch(columns, 2) == NUdf::EFetchStatus::Ok) {
}
}
- const auto devnullTime = TInstant::Now() - devnullStart;
- return devnullTime;
+ return GetThreadCPUTimeDelta(devnullStart);
}
template<bool LLVM, bool Spilling>
@@ -90,11 +89,11 @@ TRunResult RunTestOverGraph(const TRunParams& params, const bool needsVerificati
// Compute node implementation
Cerr << "Compute graph result" << Endl;
- const auto graphTimeStart = TInstant::Now();
+ const auto graphTimeStart = GetThreadCPUTime();
size_t lineCount = CountWideStreamOutputs<2>(computeGraphPtr->GetValue());
Cerr << lineCount << Endl;
- return TInstant::Now() - graphTimeStart;
+ return GetThreadCPUTimeDelta(graphTimeStart);
};
auto measureRefTime = [&](auto& computeGraphPtr, IDataSampler& sampler) {
@@ -102,10 +101,10 @@ TRunResult RunTestOverGraph(const TRunParams& params, const bool needsVerificati
Cerr << "Compute reference result" << Endl;
auto referenceStream = sampler.MakeStream(computeGraphPtr->GetHolderFactory());
- const auto cppTimeStart = TInstant::Now();
+ const auto cppTimeStart = GetThreadCPUTime();
sampler.ComputeReferenceResult(*referenceStream);
- return TInstant::Now() - cppTimeStart;
+ return GetThreadCPUTimeDelta(cppTimeStart);
};
auto graphRun1 = BuildGraph(setup, spillerFactory, *sampler);
@@ -177,14 +176,14 @@ void RunTestCombineLastSimple(const TRunParams& params, TTestResultCollector& pr
Cerr << "======== " << __func__ << ", keys: " << params.NumKeys << ", llvm: " << LLVM << ", spilling: " << Spilling << Endl;
- if (params.NumAttempts <= 1 && !params.MeasureReferenceMemory) {
- finalResult = RunTestOverGraph<LLVM, Spilling>(params, true, false);
+ if (params.NumAttempts <= 1 && !params.MeasureReferenceMemory && !params.AlwaysSubprocess) {
+ finalResult = RunTestOverGraph<LLVM, Spilling>(params, params.EnableVerification, false);
}
else {
for (int i = 1; i <= params.NumAttempts; ++i) {
Cerr << "------ Run " << i << " of " << params.NumAttempts << Endl;
- const bool needsVerification = (i == 1);
+ const bool needsVerification = (i == 1) && params.EnableVerification;
TRunResult runResult = RunForked([&]() {
return RunTestOverGraph<LLVM, Spilling>(params, needsVerification, false);
});
diff --git a/ydb/core/kqp/tools/combiner_perf/streams.cpp b/ydb/core/kqp/tools/combiner_perf/streams.cpp
index 25b0427d16e..8e4c9ab1068 100644
--- a/ydb/core/kqp/tools/combiner_perf/streams.cpp
+++ b/ydb/core/kqp/tools/combiner_perf/streams.cpp
@@ -61,66 +61,116 @@ TString64Samples MakeKeyedString64Samples(const ui64 seed, const size_t numSampl
return samples;
}
+template<typename K, typename V, typename R, typename Next>
+THolder<R> DispatchByMap(EHashMapImpl implType, Next&& next)
+{
+ if (implType == EHashMapImpl::Absl) {
+ return next(TAbslMapImpl<K, V>());
+ } else if (implType == EHashMapImpl::YqlRobinHood) {
+ return next(TRobinHoodMapImpl<K, V>());
+ } else {
+ return next(TUnorderedMapImpl<K, V>());
+ }
+}
+
THolder<IDataSampler> CreateWideSamplerFromParams(const TRunParams& params)
{
Y_ENSURE(params.RandomSeed.has_value());
switch (params.SamplerType) {
- case ESamplerType::StringKeysUI64Values:
- return MakeHolder<TString64DataSampler>(*params.RandomSeed, params.RowsPerRun, params.NumKeys - 1, params.NumRuns, params.LongStringKeys);
- case ESamplerType::UI64KeysUI64Values:
- return MakeHolder<T6464DataSampler>(*params.RandomSeed, params.RowsPerRun, params.NumKeys - 1, params.NumRuns);
+ case ESamplerType::StringKeysUI64Values: {
+ auto next = [&](auto&& impl) {
+ using MapImpl = std::decay_t<decltype(impl)>;
+ using SamplerType = TString64DataSampler<MapImpl, MapImpl::TValueType::ArrayWidth>;
+ return MakeHolder<SamplerType>(*params.RandomSeed, params.RowsPerRun, params.NumKeys - 1, params.NumRuns, params.LongStringKeys);
+ };
+ if (params.NumAggregations == 1) {
+ return DispatchByMap<std::string, TValueWrapper<ui64, 1>, IDataSampler>(params.ReferenceHashType, next);
+ } else {
+ // TODO: support other NumAggregations values?
+ return DispatchByMap<std::string, TValueWrapper<ui64, 3>, IDataSampler>(params.ReferenceHashType, next);
+ }
+ }
+ case ESamplerType::UI64KeysUI64Values: {
+ auto next = [&](auto&& impl) {
+ using MapImpl = std::decay_t<decltype(impl)>;
+ using SamplerType = T6464DataSampler<MapImpl, MapImpl::TValueType::ArrayWidth>;
+ return MakeHolder<SamplerType>(*params.RandomSeed, params.RowsPerRun, params.NumKeys - 1, params.NumRuns);
+ };
+ if (params.NumAggregations == 1) {
+ return DispatchByMap<ui64, TValueWrapper<ui64, 1>, IDataSampler>(params.ReferenceHashType, next);
+ } else {
+ // TODO: support other NumAggregations values?
+ return DispatchByMap<ui64, TValueWrapper<ui64, 3>, IDataSampler>(params.ReferenceHashType, next);
+ }
+ }
}
}
-template<typename K>
-void UpdateMapFromBlocks(const NUdf::TUnboxedValue& key, const NUdf::TUnboxedValue& value, std::unordered_map<K, ui64>& result);
+template<typename TMapImpl, typename K>
+struct TUpdateMapFromBlocks
+{
+ static void Update(const NUdf::TUnboxedValue& key, const NUdf::TUnboxedValue& value, typename TMapImpl::TMapType& result);
+};
-template<>
-void UpdateMapFromBlocks(const NUdf::TUnboxedValue& key, const NUdf::TUnboxedValue& value, std::unordered_map<ui64, ui64>& result)
+template<typename TMapImpl>
+struct TUpdateMapFromBlocks<TMapImpl, ui64>
{
- auto datumKey = TArrowBlock::From(key).GetDatum();
- auto datumValue = TArrowBlock::From(value).GetDatum();
- UNIT_ASSERT(datumKey.is_array());
- UNIT_ASSERT(datumValue.is_array());
-
- const auto ui64keys = datumKey.template array_as<arrow::UInt64Array>();
- const auto ui64values = datumValue.template array_as<arrow::UInt64Array>();
- UNIT_ASSERT(!!ui64keys);
- UNIT_ASSERT(!!ui64values);
- UNIT_ASSERT_VALUES_EQUAL(ui64keys->length(), ui64values->length());
-
- const size_t length = ui64keys->length();
- for (size_t i = 0; i < length; ++i) {
- result[ui64keys->Value(i)] += ui64values->Value(i);
+ static void Update(const NUdf::TUnboxedValue& key, const NUdf::TUnboxedValue& value, typename TMapImpl::TMapType& result)
+ {
+ auto datumKey = TArrowBlock::From(key).GetDatum();
+ auto datumValue = TArrowBlock::From(value).GetDatum();
+ UNIT_ASSERT(datumKey.is_array());
+ UNIT_ASSERT(datumValue.is_array());
+
+ const auto ui64keys = datumKey.template array_as<arrow::UInt64Array>();
+ const auto ui64values = datumValue.template array_as<arrow::UInt64Array>();
+ UNIT_ASSERT(!!ui64keys);
+ UNIT_ASSERT(!!ui64values);
+ UNIT_ASSERT_VALUES_EQUAL(ui64keys->length(), ui64values->length());
+
+ const size_t length = ui64keys->length();
+ for (size_t i = 0; i < length; ++i) {
+ if constexpr (!TMapImpl::CustomOps) {
+ result[ui64keys->Value(i)] += ui64values->Value(i);
+ } else {
+ TMapImpl::AggregateByKey(result, ui64keys->Value(i), ui64values->Value(i));
+ }
+ }
}
-}
+};
-template<>
-void UpdateMapFromBlocks(const NUdf::TUnboxedValue& key, const NUdf::TUnboxedValue& value, std::unordered_map<std::string, ui64>& result)
+template<typename TMapImpl>
+struct TUpdateMapFromBlocks<TMapImpl, std::string>
{
- auto datumKey = TArrowBlock::From(key).GetDatum();
- auto datumValue = TArrowBlock::From(value).GetDatum();
- UNIT_ASSERT(datumKey.is_arraylike());
- UNIT_ASSERT(datumValue.is_array());
-
- const auto ui64values = datumValue.template array_as<arrow::UInt64Array>();
- UNIT_ASSERT(!!ui64values);
-
- int64_t valueOffset = 0;
-
- for (const auto& chunk : datumKey.chunks()) {
- auto* barray = dynamic_cast<arrow::BinaryArray*>(chunk.get());
- UNIT_ASSERT(barray != nullptr);
- for (int64_t i = 0; i < barray->length(); ++i) {
- auto key = barray->GetString(i);
- auto val = ui64values->Value(valueOffset);
- result[key] += val;
- ++valueOffset;
+ static void Update(const NUdf::TUnboxedValue& key, const NUdf::TUnboxedValue& value, typename TMapImpl::TMapType& result)
+ {
+ auto datumKey = TArrowBlock::From(key).GetDatum();
+ auto datumValue = TArrowBlock::From(value).GetDatum();
+ UNIT_ASSERT(datumKey.is_arraylike());
+ UNIT_ASSERT(datumValue.is_array());
+
+ const auto ui64values = datumValue.template array_as<arrow::UInt64Array>();
+ UNIT_ASSERT(!!ui64values);
+
+ int64_t valueOffset = 0;
+
+ for (const auto& chunk : datumKey.chunks()) {
+ auto* barray = dynamic_cast<arrow::BinaryArray*>(chunk.get());
+ UNIT_ASSERT(barray != nullptr);
+ for (int64_t i = 0; i < barray->length(); ++i) {
+ auto key = barray->GetString(i);
+ auto val = ui64values->Value(valueOffset);
+ if constexpr (!TMapImpl::CustomOps) {
+ result[key] += val;
+ } else {
+ TMapImpl::AggregateByKey(result, key, val);
+ }
+ ++valueOffset;
+ }
}
}
-}
-
+};
template<typename T>
TType* GetVerySimpleDataType(const TTypeEnvironment& env)
@@ -134,7 +184,7 @@ TType* GetVerySimpleDataType<std::string>(const TTypeEnvironment& env)
return TDataType::Create(NUdf::TDataType<char*>::Id, env);
}
-template<typename K, typename V>
+template<typename K, typename V, typename TMapImpl>
class TBlockSampler : public IBlockSampler
{
using TSamples = TBlockKVStream<K, V>::TSamples;
@@ -182,7 +232,7 @@ public:
void ComputeReferenceResult(TComputationContext& ctx) override
{
- Y_ENSURE(RefResult.empty());
+ Y_ENSURE(MapEmpty<TMapImpl>(RefResult));
const THolder<IWideStream> refStreamPtr = MakeStream(ctx);
IWideStream& refStream = *refStreamPtr;
@@ -190,27 +240,35 @@ public:
NUdf::TUnboxedValue columns[3];
while (refStream.WideFetch(columns, 3) == NUdf::EFetchStatus::Ok) {
- UpdateMapFromBlocks(columns[0], columns[1], RefResult);
+ TUpdateMapFromBlocks<TMapImpl, K>::Update(columns[0], columns[1], RefResult);
}
}
void VerifyReferenceResultAgainstRaw() override
{
- Y_ENSURE(!RefResult.empty());
Y_ENSURE(!RawResult.empty());
// TODO: Replace UNIT_ASSERTS with something else, or actually set up the unit test thread context
- UNIT_ASSERT_VALUES_EQUAL(RefResult.size(), RawResult.size());
- for (const auto& tuple : RawResult) {
- auto otherIt = RefResult.find(tuple.first);
- UNIT_ASSERT(otherIt != RefResult.end());
- UNIT_ASSERT_VALUES_EQUAL(tuple.second, otherIt->second);
+ if constexpr (!TMapImpl::CustomOps) {
+ UNIT_ASSERT_VALUES_EQUAL(RefResult.size(), RawResult.size());
+ for (const auto& tuple : RawResult) {
+ auto otherIt = RefResult.find(tuple.first);
+ UNIT_ASSERT(otherIt != RefResult.end());
+ UNIT_ASSERT_VALUES_EQUAL(tuple.second, otherIt->second);
+ }
+ } else {
+ UNIT_ASSERT_VALUES_EQUAL(RawResult.size(), TMapImpl::Size(RefResult));
+ TMapImpl::IteratePairs(RefResult, [&](const K& k, const V& v) {
+ auto otherIt = RawResult.find(k);
+ UNIT_ASSERT(otherIt != RawResult.end());
+ UNIT_ASSERT_VALUES_EQUAL(v, otherIt->second);
+ });
}
}
void VerifyGraphResultAgainstReference(const NUdf::TUnboxedValue& blockList) override
{
- Y_ENSURE(!RefResult.empty());
+ Y_ENSURE(!MapEmpty<TMapImpl>(RefResult));
size_t numResultItems = blockList.GetListLength();
Cerr << "Result block count: " << numResultItems << Endl;
@@ -224,15 +282,10 @@ public:
const auto elements = ptr[i].GetElements();
- UpdateMapFromBlocks(elements[0], elements[1], graphResult);
+ TUpdateMapFromBlocks<TUnorderedMapImpl<K, V>, K>::Update(elements[0], elements[1], graphResult);
}
- UNIT_ASSERT_VALUES_EQUAL(RefResult.size(), graphResult.size());
- for (const auto& tuple : RefResult) {
- auto graphIt = graphResult.find(tuple.first);
- UNIT_ASSERT(graphIt != graphResult.end());
- UNIT_ASSERT_VALUES_EQUAL(tuple.second, graphIt->second);
- }
+ VerifyMapsAreEqual<K, V, TMapImpl>(graphResult, RefResult);
}
private:
@@ -241,7 +294,7 @@ private:
size_t BlockSize;
std::unordered_map<K, V> RawResult;
- std::unordered_map<K, V> RefResult;
+ TMapImpl::TMapType RefResult;
};
THolder<IBlockSampler> CreateBlockSamplerFromParams(const TRunParams& params)
@@ -250,13 +303,21 @@ THolder<IBlockSampler> CreateBlockSamplerFromParams(const TRunParams& params)
switch(params.SamplerType) {
case ESamplerType::StringKeysUI64Values:
- return MakeHolder<TBlockSampler<std::string, ui64>>(
- params,
- MakeKeyedString64Samples(*params.RandomSeed, params.RowsPerRun, params.NumKeys - 1, params.LongStringKeys));
+ return DispatchByMap<std::string, ui64, IBlockSampler>(params.ReferenceHashType, [&](auto&& impl) {
+ using MapImpl = std::decay_t<decltype(impl)>;
+ using SamplerType = TBlockSampler<std::string, ui64, MapImpl>;
+ return MakeHolder<SamplerType>(
+ params,
+ MakeKeyedString64Samples(*params.RandomSeed, params.RowsPerRun, params.NumKeys - 1, params.LongStringKeys));
+ });
case ESamplerType::UI64KeysUI64Values:
- return MakeHolder<TBlockSampler<ui64, ui64>>(
- params,
- MakeKeyed6464Samples(*params.RandomSeed, params.RowsPerRun, params.NumKeys - 1));
+ return DispatchByMap<ui64, ui64, IBlockSampler>(params.ReferenceHashType, [&](auto&& impl) {
+ using MapImpl = std::decay_t<decltype(impl)>;
+ using SamplerType = TBlockSampler<ui64, ui64, MapImpl>;
+ return MakeHolder<SamplerType>(
+ params,
+ MakeKeyed6464Samples(*params.RandomSeed, params.RowsPerRun, params.NumKeys - 1));
+ });
}
}
diff --git a/ydb/core/kqp/tools/combiner_perf/streams.h b/ydb/core/kqp/tools/combiner_perf/streams.h
index 6bb2ba8f59b..413400fa3b7 100644
--- a/ydb/core/kqp/tools/combiner_perf/streams.h
+++ b/ydb/core/kqp/tools/combiner_perf/streams.h
@@ -1,9 +1,11 @@
-// Sample data generators and streams
+// Sample data generators, streams, and reference aggregation implementations
#pragma once
#include "converters.h"
#include "run_params.h"
+#include "value_wrapper.h"
+#include "hashmaps.h"
#include <yql/essentials/minikql/comp_nodes/ut/mkql_computation_node_ut.h>
#include <yql/essentials/minikql/computation/mkql_block_builder.h>
@@ -17,6 +19,7 @@ T6464Samples MakeKeyed6464Samples(const ui64 seed, const size_t numSamples, cons
using TString64Samples = std::vector<std::pair<std::string, ui64>>;
TString64Samples MakeKeyedString64Samples(const ui64 seed, const size_t numSamples, const unsigned int maxKey, const bool longStrings);
+
struct IWideStream : public NUdf::TBoxedValue
{
NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& result) final override {
@@ -27,7 +30,7 @@ struct IWideStream : public NUdf::TBoxedValue
NUdf::EFetchStatus WideFetch(NUdf::TUnboxedValue* result, ui32 width) override = 0;
};
-template<typename K, typename V, bool EmbeddedKeys>
+template<typename K, typename V, bool EmbeddedKeys, size_t NumAggregates = 1>
struct TKVStream : public IWideStream {
using TSamples = std::vector<std::pair<K, V>>;
@@ -61,13 +64,17 @@ public:
CurrSample = Samples.begin();
}
- if (width != 2) {
- ythrow yexception() << "width 2 expected";
+ if (width != 1 + NumAggregates) {
+ ythrow yexception() << "width " << (1 + NumAggregates) << " expected";
}
// TODO: support embedded strings in values?
NativeToUnboxed<EmbeddedKeys>(CurrSample->first, result[0]);
- NativeToUnboxed<false>(CurrSample->second, result[1]);
+ NUdf::TUnboxedValuePod val;
+ NativeToUnboxed<false>(CurrSample->second, val);
+ for (size_t i = 0; i < NumAggregates; ++i) {
+ result[1 + i] = val;
+ }
++CurrSample;
@@ -75,7 +82,8 @@ public:
}
};
-template<typename K, typename V>
+
+template<typename K, typename V, size_t NumAggregates>
std::unordered_map<K, V> ComputeStreamSumResult(const NUdf::TUnboxedValue& wideStream)
{
std::unordered_map<K, V> expects;
@@ -91,7 +99,10 @@ std::unordered_map<K, V> ComputeStreamSumResult(const NUdf::TUnboxedValue& wideS
continue; // spilling combiners do yield sometimes
}
const K key = UnboxedToNative<K>(resultValues[0]);
- const V value = UnboxedToNative<V>(resultValues[1]);
+ V value;
+ for (size_t i = 0; i < NumAggregates; ++i) {
+ value[i] = UnboxedToNative<typename V::TElement>(resultValues[1 + i]);
+ }
expects[key] += value;
++numFetches;
}
@@ -100,53 +111,69 @@ std::unordered_map<K, V> ComputeStreamSumResult(const NUdf::TUnboxedValue& wideS
return expects;
}
-// Direct version without the BoxedValueAccessor
-template<typename K, typename V>
-std::unordered_map<K, V> ComputeStreamSumResult(IWideStream& referenceStream)
+// Direct version without the BoxedValueAccessor for use in reference timing measurements
+template<typename K, typename V, size_t NumAggregates, typename TMapImpl>
+void ComputeReferenceStreamSumResult(IWideStream& referenceStream, typename TMapImpl::TMapType& refResult)
{
- std::unordered_map<K, V> expects;
- {
- NUdf::TUnboxedValue inputs[2];
+ NUdf::TUnboxedValue inputs[2];
- while (referenceStream.WideFetch(inputs, 2) == NUdf::EFetchStatus::Ok) {
- expects[UnboxedToNative<K>(inputs[0])] += inputs[1].Get<V>();
+ while (referenceStream.WideFetch(inputs, 1 + NumAggregates) == NUdf::EFetchStatus::Ok) {
+ V tmp;
+ for (size_t i = 0; i < V::ArrayWidth; ++i) {
+ tmp[i] = inputs[1 + i].Get<typename V::TElement>();
+ }
+ if constexpr (!TMapImpl::CustomOps) {
+ refResult[UnboxedToNative<K>(inputs[0])] += tmp;
+ } else {
+ TMapImpl::AggregateByKey(refResult, UnboxedToNative<K>(inputs[0]), tmp);
}
}
- return expects;
}
-template<typename K, typename V>
-void VerifyMapsAreEqual(const std::unordered_map<K, V> computedMap, const std::unordered_map<K, V>& refMap)
+template<typename K, typename V, typename TMapImpl>
+void VerifyMapsAreEqual(const std::unordered_map<K, V> computedMap, const typename TMapImpl::TMapType& refMap)
{
- UNIT_ASSERT_VALUES_EQUAL(computedMap.size(), refMap.size());
-
- for (const auto referencePair : refMap) {
- const auto ii = computedMap.find(referencePair.first);
- UNIT_ASSERT(ii != computedMap.end());
- UNIT_ASSERT_VALUES_EQUAL(referencePair.second, ii->second);
+ if constexpr (!TMapImpl::CustomOps) {
+ UNIT_ASSERT_VALUES_EQUAL(computedMap.size(), refMap.size());
+ for (const auto referencePair : refMap) {
+ const auto ii = computedMap.find(referencePair.first);
+ UNIT_ASSERT(ii != computedMap.end());
+ UNIT_ASSERT_VALUES_EQUAL(referencePair.second, ii->second);
+ }
+ } else {
+ UNIT_ASSERT_VALUES_EQUAL(computedMap.size(), TMapImpl::Size(refMap));
+ TMapImpl::IteratePairs(refMap, [computedMap](const K& k, const V& v){
+ const auto ii = computedMap.find(k);
+ UNIT_ASSERT(ii != computedMap.end());
+ UNIT_ASSERT_VALUES_EQUAL(v, ii->second);
+ });
}
}
-template<typename K, typename V>
-void VerifyStreamVsUnorderedMap(const NUdf::TUnboxedValue& wideStream, const std::unordered_map<K, V>& refMap)
+template<typename TMapImpl, typename K, typename V, size_t NumAggregates>
+void VerifyStreamVsMap(const NUdf::TUnboxedValue& wideStream, const typename TMapImpl::TMapType& refMap)
{
- std::unordered_map<K, V> resultMap = ComputeStreamSumResult<K, V>(wideStream);
- VerifyMapsAreEqual(resultMap, refMap);
+ auto resultMap = ComputeStreamSumResult<K, V, NumAggregates>(wideStream);
+ VerifyMapsAreEqual<K, V, TMapImpl>(resultMap, refMap);
}
-template<typename K, typename V>
-void VerifyListVsUnorderedMap(const NUdf::TUnboxedValue& pairList, const std::unordered_map<K, V>& refMap)
+template<typename TMapImpl, typename K, typename V, size_t NumAggregates>
+void VerifyListVsMap(const NUdf::TUnboxedValue& pairList, const typename TMapImpl::TMapType& refMap)
{
std::unordered_map<K, V> resultMap;
+
const auto ptr = pairList.GetElements();
for (size_t i = 0; i < pairList.GetListLength(); ++i) {
const auto elements = ptr[i].GetElements();
const auto key = UnboxedToNative<K>(elements[0]);
- const auto value = UnboxedToNative<V>(elements[1]);
+ V value;
+ for (size_t i = 0; i < NumAggregates; ++i) {
+ value[i] = UnboxedToNative<typename V::TElement>(elements[1 + i]);
+ }
resultMap[key] += value;
}
- VerifyMapsAreEqual(resultMap, refMap);
+ VerifyMapsAreEqual<K, V, TMapImpl>(resultMap, refMap);
}
template<size_t Width>
@@ -181,11 +208,12 @@ public:
virtual std::string Describe() const = 0;
};
+template<typename TMapImpl, size_t NumAggregates>
class T6464DataSampler : public IDataSampler
{
public:
TKVStream<ui64, ui64, false>::TSamples Samples;
- std::unordered_map<ui64, ui64> Expects;
+ TMapImpl::TMapType Expects;
size_t StreamNumIters = 0;
@@ -197,7 +225,7 @@ public:
THolder<IWideStream> MakeStream(const THolderFactory& holderFactory) const override
{
- return THolder(new TKVStream<ui64, ui64, false>(holderFactory, Samples, StreamNumIters));
+ return THolder(new TKVStream<ui64, ui64, false, NumAggregates>(holderFactory, Samples, StreamNumIters));
}
TType* GetKeyType(TProgramBuilder& pb) const override
@@ -207,32 +235,33 @@ public:
void ComputeReferenceResult(IWideStream& referenceStream) override
{
- Y_ENSURE(Expects.empty());
+ Y_ENSURE(MapEmpty<TMapImpl>(Expects));
- Expects = ComputeStreamSumResult<ui64, ui64>(referenceStream);
+ ComputeReferenceStreamSumResult<ui64, TValueWrapper<ui64, NumAggregates>, NumAggregates, TMapImpl>(referenceStream, Expects);
}
void VerifyStreamVsReference(const NUdf::TUnboxedValue& wideStream) const override
{
- VerifyStreamVsUnorderedMap(wideStream, Expects);
+ VerifyStreamVsMap<TMapImpl, ui64, TValueWrapper<ui64, NumAggregates>, NumAggregates>(wideStream, Expects);
}
void VerifyComputedValueVsReference(const NUdf::TUnboxedValue& value) const override
{
- VerifyListVsUnorderedMap(value, Expects);
+ VerifyListVsMap<TMapImpl, ui64, TValueWrapper<ui64, NumAggregates>, NumAggregates>(value, Expects);
}
std::string Describe() const override
{
- return "ui64 keys, ui64 values";
+ return std::string("keys: ui64, values: ") + TValueWrapper<ui64, NumAggregates>::Describe();
}
};
+template<typename TMapImpl, size_t NumAggregates>
class TString64DataSampler : public IDataSampler
{
public:
- TKVStream<std::string, ui64, false>::TSamples Samples;
- std::unordered_map<std::string, ui64> Expects;
+ TKVStream<std::string, ui64, false, NumAggregates>::TSamples Samples;
+ TMapImpl::TMapType Expects;
size_t StreamNumIters = 0;
bool LongStrings = false;
@@ -247,9 +276,9 @@ public:
THolder<IWideStream> MakeStream(const THolderFactory& holderFactory) const override
{
if (LongStrings) {
- return THolder(new TKVStream<std::string, ui64, false>(holderFactory, Samples, StreamNumIters));
+ return THolder(new TKVStream<std::string, ui64, false, NumAggregates>(holderFactory, Samples, StreamNumIters));
} else {
- return THolder(new TKVStream<std::string, ui64, true>(holderFactory, Samples, StreamNumIters));
+ return THolder(new TKVStream<std::string, ui64, true, NumAggregates>(holderFactory, Samples, StreamNumIters));
}
}
@@ -260,24 +289,25 @@ public:
void ComputeReferenceResult(IWideStream& referenceStream) override
{
- Y_ENSURE(Expects.empty());
+ Y_ENSURE(MapEmpty<TMapImpl>(Expects));
- Expects = ComputeStreamSumResult<std::string, ui64>(referenceStream);
+ ComputeReferenceStreamSumResult<std::string, TValueWrapper<ui64, NumAggregates>, NumAggregates, TMapImpl>(referenceStream, Expects);
}
void VerifyStreamVsReference(const NUdf::TUnboxedValue& wideStream) const override
{
- VerifyStreamVsUnorderedMap(wideStream, Expects);
+ VerifyStreamVsMap<TMapImpl, std::string, TValueWrapper<ui64, NumAggregates>, NumAggregates>(wideStream, Expects);
}
void VerifyComputedValueVsReference(const NUdf::TUnboxedValue& value) const override
{
- VerifyListVsUnorderedMap(value, Expects);
+ VerifyListVsMap<TMapImpl, std::string, TValueWrapper<ui64, NumAggregates>, NumAggregates>(value, Expects);
}
std::string Describe() const override
{
- return Sprintf("%s string keys, ui64 values", LongStrings ? "Long (24 byte)" : "Embedded");
+ std::string valueDescr = TValueWrapper<ui64, NumAggregates>::Describe();
+ return Sprintf("keys: %s string, values: %s", LongStrings ? "Long (24 byte)" : "Embedded", valueDescr.c_str());
}
};
diff --git a/ydb/core/kqp/tools/combiner_perf/value_wrapper.h b/ydb/core/kqp/tools/combiner_perf/value_wrapper.h
new file mode 100644
index 00000000000..e59481bf40e
--- /dev/null
+++ b/ydb/core/kqp/tools/combiner_perf/value_wrapper.h
@@ -0,0 +1,59 @@
+#pragma once
+
+#include <util/generic/string.h>
+#include <util/stream/output.h>
+#include <util/string/printf.h>
+
+#include <array>
+#include <string>
+
+namespace NKikimr {
+namespace NMiniKQL {
+
+// A simple wrapper for aggregate sums over both scalar values and tuples.
+// Just a statically-sized array for now.
+
+template<typename V, size_t Width>
+struct TValueWrapper: public std::array<V, Width>
+{
+ using TElement = V;
+ constexpr static const size_t ArrayWidth = Width;
+
+ TValueWrapper()
+ : std::array<V, Width>{}
+ {
+ };
+
+ TValueWrapper& operator+= (const TValueWrapper& rhs)
+ {
+ for (size_t i = 0; i < Width; ++i) {
+ (*this)[i] += rhs[i];
+ }
+ return *this;
+ }
+
+ static std::string Describe()
+ {
+ if constexpr (std::is_same_v<V, ui64>) {
+ return Sprintf("array<ui64, %lu>", Width);
+ } else if constexpr (std::is_same_v<V, std::string>) {
+ return Sprintf("array<string, %lu>", Width);
+ } else {
+ return Sprintf("array<?, %lu>", Width);
+ }
+ }
+};
+
+}
+}
+
+template<typename V, size_t Width>
+IOutputStream& operator << (IOutputStream& out Y_LIFETIME_BOUND, const NKikimr::NMiniKQL::TValueWrapper<V, Width>& wrapper) {
+ for (size_t i = 0; i < Width; ++i) {
+ if (i > 0) {
+ out << ", ";
+ }
+ out << wrapper[i];
+ }
+ return out;
+} \ No newline at end of file