aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/restricted/google/benchmark/tools/compare/gbench/util.py
blob: 1119a1a2ca289ab790266b74b3330264c43facc1 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
"""util.py - General utilities for running, loading, and processing benchmarks"""

import json
import os
import re
import subprocess
import sys
import tempfile

# Input file type enumeration
IT_Invalid = 0
IT_JSON = 1
IT_Executable = 2

_num_magic_bytes = 2 if sys.platform.startswith("win") else 4


def is_executable_file(filename):
    """
    Return 'True' if 'filename' names a valid file which is likely
    an executable. A file is considered an executable if it starts with the
    magic bytes for a EXE, Mach O, or ELF file.
    """
    if not os.path.isfile(filename):
        return False
    with open(filename, mode="rb") as f:
        magic_bytes = f.read(_num_magic_bytes)
    if sys.platform == "darwin":
        return magic_bytes in [
            b"\xfe\xed\xfa\xce",  # MH_MAGIC
            b"\xce\xfa\xed\xfe",  # MH_CIGAM
            b"\xfe\xed\xfa\xcf",  # MH_MAGIC_64
            b"\xcf\xfa\xed\xfe",  # MH_CIGAM_64
            b"\xca\xfe\xba\xbe",  # FAT_MAGIC
            b"\xbe\xba\xfe\xca",  # FAT_CIGAM
        ]
    elif sys.platform.startswith("win"):
        return magic_bytes == b"MZ"
    else:
        return magic_bytes == b"\x7fELF"


def is_json_file(filename):
    """
    Returns 'True' if 'filename' names a valid JSON output file.
    'False' otherwise.
    """
    try:
        with open(filename, "r") as f:
            json.load(f)
        return True
    except BaseException:
        pass
    return False


def classify_input_file(filename):
    """
    Return a tuple (type, msg) where 'type' specifies the classified type
    of 'filename'. If 'type' is 'IT_Invalid' then 'msg' is a human readable
    string representing the error.
    """
    ftype = IT_Invalid
    err_msg = None
    if not os.path.exists(filename):
        err_msg = "'%s' does not exist" % filename
    elif not os.path.isfile(filename):
        err_msg = "'%s' does not name a file" % filename
    elif is_executable_file(filename):
        ftype = IT_Executable
    elif is_json_file(filename):
        ftype = IT_JSON
    else:
        err_msg = (
            "'%s' does not name a valid benchmark executable or JSON file"
            % filename
        )
    return ftype, err_msg


def check_input_file(filename):
    """
    Classify the file named by 'filename' and return the classification.
    If the file is classified as 'IT_Invalid' print an error message and exit
    the program.
    """
    ftype, msg = classify_input_file(filename)
    if ftype == IT_Invalid:
        print("Invalid input file: %s" % msg)
        sys.exit(1)
    return ftype


def find_benchmark_flag(prefix, benchmark_flags):
    """
    Search the specified list of flags for a flag matching `<prefix><arg>` and
    if it is found return the arg it specifies. If specified more than once the
    last value is returned. If the flag is not found None is returned.
    """
    assert prefix.startswith("--") and prefix.endswith("=")
    result = None
    for f in benchmark_flags:
        if f.startswith(prefix):
            result = f[len(prefix) :]
    return result


def remove_benchmark_flags(prefix, benchmark_flags):
    """
    Return a new list containing the specified benchmark_flags except those
    with the specified prefix.
    """
    assert prefix.startswith("--") and prefix.endswith("=")
    return [f for f in benchmark_flags if not f.startswith(prefix)]


def load_benchmark_results(fname, benchmark_filter):
    """
    Read benchmark output from a file and return the JSON object.

    Apply benchmark_filter, a regular expression, with nearly the same
    semantics of the --benchmark_filter argument.  May be None.
    Note: the Python regular expression engine is used instead of the
    one used by the C++ code, which may produce different results
    in complex cases.

    REQUIRES: 'fname' names a file containing JSON benchmark output.
    """

    def benchmark_wanted(benchmark):
        if benchmark_filter is None:
            return True
        name = benchmark.get("run_name", None) or benchmark["name"]
        return re.search(benchmark_filter, name) is not None

    with open(fname, "r") as f:
        results = json.load(f)
        if "context" in results:
            if "json_schema_version" in results["context"]:
                json_schema_version = results["context"]["json_schema_version"]
                if json_schema_version != 1:
                    print(
                        "In %s, got unnsupported JSON schema version: %i, expected 1"
                        % (fname, json_schema_version)
                    )
                    sys.exit(1)
        if "benchmarks" in results:
            results["benchmarks"] = list(
                filter(benchmark_wanted, results["benchmarks"])
            )
        return results


def sort_benchmark_results(result):
    benchmarks = result["benchmarks"]

    # From inner key to the outer key!
    benchmarks = sorted(
        benchmarks,
        key=lambda benchmark: benchmark["repetition_index"]
        if "repetition_index" in benchmark
        else -1,
    )
    benchmarks = sorted(
        benchmarks,
        key=lambda benchmark: 1
        if "run_type" in benchmark and benchmark["run_type"] == "aggregate"
        else 0,
    )
    benchmarks = sorted(
        benchmarks,
        key=lambda benchmark: benchmark["per_family_instance_index"]
        if "per_family_instance_index" in benchmark
        else -1,
    )
    benchmarks = sorted(
        benchmarks,
        key=lambda benchmark: benchmark["family_index"]
        if "family_index" in benchmark
        else -1,
    )

    result["benchmarks"] = benchmarks
    return result


def run_benchmark(exe_name, benchmark_flags):
    """
    Run a benchmark specified by 'exe_name' with the specified
    'benchmark_flags'. The benchmark is run directly as a subprocess to preserve
    real time console output.
    RETURNS: A JSON object representing the benchmark output
    """
    output_name = find_benchmark_flag("--benchmark_out=", benchmark_flags)
    is_temp_output = False
    if output_name is None:
        is_temp_output = True
        thandle, output_name = tempfile.mkstemp()
        os.close(thandle)
        benchmark_flags = list(benchmark_flags) + [
            "--benchmark_out=%s" % output_name
        ]

    cmd = [exe_name] + benchmark_flags
    print("RUNNING: %s" % " ".join(cmd))
    exitCode = subprocess.call(cmd)
    if exitCode != 0:
        print("TEST FAILED...")
        sys.exit(exitCode)
    json_res = load_benchmark_results(output_name, None)
    if is_temp_output:
        os.unlink(output_name)
    return json_res


def run_or_load_benchmark(filename, benchmark_flags):
    """
    Get the results for a specified benchmark. If 'filename' specifies
    an executable benchmark then the results are generated by running the
    benchmark. Otherwise 'filename' must name a valid JSON output file,
    which is loaded and the result returned.
    """
    ftype = check_input_file(filename)
    if ftype == IT_JSON:
        benchmark_filter = find_benchmark_flag(
            "--benchmark_filter=", benchmark_flags
        )
        return load_benchmark_results(filename, benchmark_filter)
    if ftype == IT_Executable:
        return run_benchmark(filename, benchmark_flags)
    raise ValueError("Unknown file type %s" % ftype)