aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/tools/python3/src/Lib/zoneinfo/_common.py
blob: 33311c135799dec6fbf4f55fd5ca1bc1b279abea (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
import struct 
 
 
def load_tzdata(key): 
    import importlib.resources 
 
    components = key.split("/") 
    package_name = ".".join(["tzdata.zoneinfo"] + components[:-1]) 
    resource_name = components[-1] 
 
    try: 
        return importlib.resources.open_binary(package_name, resource_name) 
    except (ImportError, FileNotFoundError, UnicodeEncodeError): 
        # There are three types of exception that can be raised that all amount 
        # to "we cannot find this key": 
        # 
        # ImportError: If package_name doesn't exist (e.g. if tzdata is not 
        #   installed, or if there's an error in the folder name like 
        #   Amrica/New_York) 
        # FileNotFoundError: If resource_name doesn't exist in the package 
        #   (e.g. Europe/Krasnoy) 
        # UnicodeEncodeError: If package_name or resource_name are not UTF-8, 
        #   such as keys containing a surrogate character. 
        raise ZoneInfoNotFoundError(f"No time zone found with key {key}") 
 
 
def load_data(fobj): 
    header = _TZifHeader.from_file(fobj) 
 
    if header.version == 1: 
        time_size = 4 
        time_type = "l" 
    else: 
        # Version 2+ has 64-bit integer transition times 
        time_size = 8 
        time_type = "q" 
 
        # Version 2+ also starts with a Version 1 header and data, which 
        # we need to skip now 
        skip_bytes = ( 
            header.timecnt * 5  # Transition times and types 
            + header.typecnt * 6  # Local time type records 
            + header.charcnt  # Time zone designations 
            + header.leapcnt * 8  # Leap second records 
            + header.isstdcnt  # Standard/wall indicators 
            + header.isutcnt  # UT/local indicators 
        ) 
 
        fobj.seek(skip_bytes, 1) 
 
        # Now we need to read the second header, which is not the same 
        # as the first 
        header = _TZifHeader.from_file(fobj) 
 
    typecnt = header.typecnt 
    timecnt = header.timecnt 
    charcnt = header.charcnt 
 
    # The data portion starts with timecnt transitions and indices 
    if timecnt: 
        trans_list_utc = struct.unpack( 
            f">{timecnt}{time_type}", fobj.read(timecnt * time_size) 
        ) 
        trans_idx = struct.unpack(f">{timecnt}B", fobj.read(timecnt)) 
    else: 
        trans_list_utc = () 
        trans_idx = () 
 
    # Read the ttinfo struct, (utoff, isdst, abbrind) 
    if typecnt: 
        utcoff, isdst, abbrind = zip( 
            *(struct.unpack(">lbb", fobj.read(6)) for i in range(typecnt)) 
        ) 
    else: 
        utcoff = () 
        isdst = () 
        abbrind = () 
 
    # Now read the abbreviations. They are null-terminated strings, indexed 
    # not by position in the array but by position in the unsplit 
    # abbreviation string. I suppose this makes more sense in C, which uses 
    # null to terminate the strings, but it's inconvenient here... 
    abbr_vals = {} 
    abbr_chars = fobj.read(charcnt) 
 
    def get_abbr(idx): 
        # Gets a string starting at idx and running until the next \x00 
        # 
        # We cannot pre-populate abbr_vals by splitting on \x00 because there 
        # are some zones that use subsets of longer abbreviations, like so: 
        # 
        #  LMT\x00AHST\x00HDT\x00 
        # 
        # Where the idx to abbr mapping should be: 
        # 
        # {0: "LMT", 4: "AHST", 5: "HST", 9: "HDT"} 
        if idx not in abbr_vals: 
            span_end = abbr_chars.find(b"\x00", idx) 
            abbr_vals[idx] = abbr_chars[idx:span_end].decode() 
 
        return abbr_vals[idx] 
 
    abbr = tuple(get_abbr(idx) for idx in abbrind) 
 
    # The remainder of the file consists of leap seconds (currently unused) and 
    # the standard/wall and ut/local indicators, which are metadata we don't need. 
    # In version 2 files, we need to skip the unnecessary data to get at the TZ string: 
    if header.version >= 2: 
        # Each leap second record has size (time_size + 4) 
        skip_bytes = header.isutcnt + header.isstdcnt + header.leapcnt * 12 
        fobj.seek(skip_bytes, 1) 
 
        c = fobj.read(1)  # Should be \n 
        assert c == b"\n", c 
 
        tz_bytes = b"" 
        while (c := fobj.read(1)) != b"\n": 
            tz_bytes += c 
 
        tz_str = tz_bytes 
    else: 
        tz_str = None 
 
    return trans_idx, trans_list_utc, utcoff, isdst, abbr, tz_str 
 
 
class _TZifHeader: 
    __slots__ = [ 
        "version", 
        "isutcnt", 
        "isstdcnt", 
        "leapcnt", 
        "timecnt", 
        "typecnt", 
        "charcnt", 
    ] 
 
    def __init__(self, *args): 
        assert len(self.__slots__) == len(args) 
        for attr, val in zip(self.__slots__, args): 
            setattr(self, attr, val) 
 
    @classmethod 
    def from_file(cls, stream): 
        # The header starts with a 4-byte "magic" value 
        if stream.read(4) != b"TZif": 
            raise ValueError("Invalid TZif file: magic not found") 
 
        _version = stream.read(1) 
        if _version == b"\x00": 
            version = 1 
        else: 
            version = int(_version) 
        stream.read(15) 
 
        args = (version,) 
 
        # Slots are defined in the order that the bytes are arranged 
        args = args + struct.unpack(">6l", stream.read(24)) 
 
        return cls(*args) 
 
 
class ZoneInfoNotFoundError(KeyError): 
    """Exception raised when a ZoneInfo key is not found."""