aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/tools/python3/src/Lib/zoneinfo/_common.py
diff options
context:
space:
mode:
authorshadchin <shadchin@yandex-team.ru>2022-02-10 16:44:39 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:44:39 +0300
commite9656aae26e0358d5378e5b63dcac5c8dbe0e4d0 (patch)
tree64175d5cadab313b3e7039ebaa06c5bc3295e274 /contrib/tools/python3/src/Lib/zoneinfo/_common.py
parent2598ef1d0aee359b4b6d5fdd1758916d5907d04f (diff)
downloadydb-e9656aae26e0358d5378e5b63dcac5c8dbe0e4d0.tar.gz
Restoring authorship annotation for <shadchin@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'contrib/tools/python3/src/Lib/zoneinfo/_common.py')
-rw-r--r--contrib/tools/python3/src/Lib/zoneinfo/_common.py330
1 files changed, 165 insertions, 165 deletions
diff --git a/contrib/tools/python3/src/Lib/zoneinfo/_common.py b/contrib/tools/python3/src/Lib/zoneinfo/_common.py
index 33311c1357..41c898f37e 100644
--- a/contrib/tools/python3/src/Lib/zoneinfo/_common.py
+++ b/contrib/tools/python3/src/Lib/zoneinfo/_common.py
@@ -1,165 +1,165 @@
-import struct
-
-
-def load_tzdata(key):
- import importlib.resources
-
- components = key.split("/")
- package_name = ".".join(["tzdata.zoneinfo"] + components[:-1])
- resource_name = components[-1]
-
- try:
- return importlib.resources.open_binary(package_name, resource_name)
- except (ImportError, FileNotFoundError, UnicodeEncodeError):
- # There are three types of exception that can be raised that all amount
- # to "we cannot find this key":
- #
- # ImportError: If package_name doesn't exist (e.g. if tzdata is not
- # installed, or if there's an error in the folder name like
- # Amrica/New_York)
- # FileNotFoundError: If resource_name doesn't exist in the package
- # (e.g. Europe/Krasnoy)
- # UnicodeEncodeError: If package_name or resource_name are not UTF-8,
- # such as keys containing a surrogate character.
- raise ZoneInfoNotFoundError(f"No time zone found with key {key}")
-
-
-def load_data(fobj):
- header = _TZifHeader.from_file(fobj)
-
- if header.version == 1:
- time_size = 4
- time_type = "l"
- else:
- # Version 2+ has 64-bit integer transition times
- time_size = 8
- time_type = "q"
-
- # Version 2+ also starts with a Version 1 header and data, which
- # we need to skip now
- skip_bytes = (
- header.timecnt * 5 # Transition times and types
- + header.typecnt * 6 # Local time type records
- + header.charcnt # Time zone designations
- + header.leapcnt * 8 # Leap second records
- + header.isstdcnt # Standard/wall indicators
- + header.isutcnt # UT/local indicators
- )
-
- fobj.seek(skip_bytes, 1)
-
- # Now we need to read the second header, which is not the same
- # as the first
- header = _TZifHeader.from_file(fobj)
-
- typecnt = header.typecnt
- timecnt = header.timecnt
- charcnt = header.charcnt
-
- # The data portion starts with timecnt transitions and indices
- if timecnt:
- trans_list_utc = struct.unpack(
- f">{timecnt}{time_type}", fobj.read(timecnt * time_size)
- )
- trans_idx = struct.unpack(f">{timecnt}B", fobj.read(timecnt))
- else:
- trans_list_utc = ()
- trans_idx = ()
-
- # Read the ttinfo struct, (utoff, isdst, abbrind)
- if typecnt:
- utcoff, isdst, abbrind = zip(
- *(struct.unpack(">lbb", fobj.read(6)) for i in range(typecnt))
- )
- else:
- utcoff = ()
- isdst = ()
- abbrind = ()
-
- # Now read the abbreviations. They are null-terminated strings, indexed
- # not by position in the array but by position in the unsplit
- # abbreviation string. I suppose this makes more sense in C, which uses
- # null to terminate the strings, but it's inconvenient here...
- abbr_vals = {}
- abbr_chars = fobj.read(charcnt)
-
- def get_abbr(idx):
- # Gets a string starting at idx and running until the next \x00
- #
- # We cannot pre-populate abbr_vals by splitting on \x00 because there
- # are some zones that use subsets of longer abbreviations, like so:
- #
- # LMT\x00AHST\x00HDT\x00
- #
- # Where the idx to abbr mapping should be:
- #
- # {0: "LMT", 4: "AHST", 5: "HST", 9: "HDT"}
- if idx not in abbr_vals:
- span_end = abbr_chars.find(b"\x00", idx)
- abbr_vals[idx] = abbr_chars[idx:span_end].decode()
-
- return abbr_vals[idx]
-
- abbr = tuple(get_abbr(idx) for idx in abbrind)
-
- # The remainder of the file consists of leap seconds (currently unused) and
- # the standard/wall and ut/local indicators, which are metadata we don't need.
- # In version 2 files, we need to skip the unnecessary data to get at the TZ string:
- if header.version >= 2:
- # Each leap second record has size (time_size + 4)
- skip_bytes = header.isutcnt + header.isstdcnt + header.leapcnt * 12
- fobj.seek(skip_bytes, 1)
-
- c = fobj.read(1) # Should be \n
- assert c == b"\n", c
-
- tz_bytes = b""
- while (c := fobj.read(1)) != b"\n":
- tz_bytes += c
-
- tz_str = tz_bytes
- else:
- tz_str = None
-
- return trans_idx, trans_list_utc, utcoff, isdst, abbr, tz_str
-
-
-class _TZifHeader:
- __slots__ = [
- "version",
- "isutcnt",
- "isstdcnt",
- "leapcnt",
- "timecnt",
- "typecnt",
- "charcnt",
- ]
-
- def __init__(self, *args):
- assert len(self.__slots__) == len(args)
- for attr, val in zip(self.__slots__, args):
- setattr(self, attr, val)
-
- @classmethod
- def from_file(cls, stream):
- # The header starts with a 4-byte "magic" value
- if stream.read(4) != b"TZif":
- raise ValueError("Invalid TZif file: magic not found")
-
- _version = stream.read(1)
- if _version == b"\x00":
- version = 1
- else:
- version = int(_version)
- stream.read(15)
-
- args = (version,)
-
- # Slots are defined in the order that the bytes are arranged
- args = args + struct.unpack(">6l", stream.read(24))
-
- return cls(*args)
-
-
-class ZoneInfoNotFoundError(KeyError):
- """Exception raised when a ZoneInfo key is not found."""
+import struct
+
+
+def load_tzdata(key):
+ import importlib.resources
+
+ components = key.split("/")
+ package_name = ".".join(["tzdata.zoneinfo"] + components[:-1])
+ resource_name = components[-1]
+
+ try:
+ return importlib.resources.open_binary(package_name, resource_name)
+ except (ImportError, FileNotFoundError, UnicodeEncodeError):
+ # There are three types of exception that can be raised that all amount
+ # to "we cannot find this key":
+ #
+ # ImportError: If package_name doesn't exist (e.g. if tzdata is not
+ # installed, or if there's an error in the folder name like
+ # Amrica/New_York)
+ # FileNotFoundError: If resource_name doesn't exist in the package
+ # (e.g. Europe/Krasnoy)
+ # UnicodeEncodeError: If package_name or resource_name are not UTF-8,
+ # such as keys containing a surrogate character.
+ raise ZoneInfoNotFoundError(f"No time zone found with key {key}")
+
+
+def load_data(fobj):
+ header = _TZifHeader.from_file(fobj)
+
+ if header.version == 1:
+ time_size = 4
+ time_type = "l"
+ else:
+ # Version 2+ has 64-bit integer transition times
+ time_size = 8
+ time_type = "q"
+
+ # Version 2+ also starts with a Version 1 header and data, which
+ # we need to skip now
+ skip_bytes = (
+ header.timecnt * 5 # Transition times and types
+ + header.typecnt * 6 # Local time type records
+ + header.charcnt # Time zone designations
+ + header.leapcnt * 8 # Leap second records
+ + header.isstdcnt # Standard/wall indicators
+ + header.isutcnt # UT/local indicators
+ )
+
+ fobj.seek(skip_bytes, 1)
+
+ # Now we need to read the second header, which is not the same
+ # as the first
+ header = _TZifHeader.from_file(fobj)
+
+ typecnt = header.typecnt
+ timecnt = header.timecnt
+ charcnt = header.charcnt
+
+ # The data portion starts with timecnt transitions and indices
+ if timecnt:
+ trans_list_utc = struct.unpack(
+ f">{timecnt}{time_type}", fobj.read(timecnt * time_size)
+ )
+ trans_idx = struct.unpack(f">{timecnt}B", fobj.read(timecnt))
+ else:
+ trans_list_utc = ()
+ trans_idx = ()
+
+ # Read the ttinfo struct, (utoff, isdst, abbrind)
+ if typecnt:
+ utcoff, isdst, abbrind = zip(
+ *(struct.unpack(">lbb", fobj.read(6)) for i in range(typecnt))
+ )
+ else:
+ utcoff = ()
+ isdst = ()
+ abbrind = ()
+
+ # Now read the abbreviations. They are null-terminated strings, indexed
+ # not by position in the array but by position in the unsplit
+ # abbreviation string. I suppose this makes more sense in C, which uses
+ # null to terminate the strings, but it's inconvenient here...
+ abbr_vals = {}
+ abbr_chars = fobj.read(charcnt)
+
+ def get_abbr(idx):
+ # Gets a string starting at idx and running until the next \x00
+ #
+ # We cannot pre-populate abbr_vals by splitting on \x00 because there
+ # are some zones that use subsets of longer abbreviations, like so:
+ #
+ # LMT\x00AHST\x00HDT\x00
+ #
+ # Where the idx to abbr mapping should be:
+ #
+ # {0: "LMT", 4: "AHST", 5: "HST", 9: "HDT"}
+ if idx not in abbr_vals:
+ span_end = abbr_chars.find(b"\x00", idx)
+ abbr_vals[idx] = abbr_chars[idx:span_end].decode()
+
+ return abbr_vals[idx]
+
+ abbr = tuple(get_abbr(idx) for idx in abbrind)
+
+ # The remainder of the file consists of leap seconds (currently unused) and
+ # the standard/wall and ut/local indicators, which are metadata we don't need.
+ # In version 2 files, we need to skip the unnecessary data to get at the TZ string:
+ if header.version >= 2:
+ # Each leap second record has size (time_size + 4)
+ skip_bytes = header.isutcnt + header.isstdcnt + header.leapcnt * 12
+ fobj.seek(skip_bytes, 1)
+
+ c = fobj.read(1) # Should be \n
+ assert c == b"\n", c
+
+ tz_bytes = b""
+ while (c := fobj.read(1)) != b"\n":
+ tz_bytes += c
+
+ tz_str = tz_bytes
+ else:
+ tz_str = None
+
+ return trans_idx, trans_list_utc, utcoff, isdst, abbr, tz_str
+
+
+class _TZifHeader:
+ __slots__ = [
+ "version",
+ "isutcnt",
+ "isstdcnt",
+ "leapcnt",
+ "timecnt",
+ "typecnt",
+ "charcnt",
+ ]
+
+ def __init__(self, *args):
+ assert len(self.__slots__) == len(args)
+ for attr, val in zip(self.__slots__, args):
+ setattr(self, attr, val)
+
+ @classmethod
+ def from_file(cls, stream):
+ # The header starts with a 4-byte "magic" value
+ if stream.read(4) != b"TZif":
+ raise ValueError("Invalid TZif file: magic not found")
+
+ _version = stream.read(1)
+ if _version == b"\x00":
+ version = 1
+ else:
+ version = int(_version)
+ stream.read(15)
+
+ args = (version,)
+
+ # Slots are defined in the order that the bytes are arranged
+ args = args + struct.unpack(">6l", stream.read(24))
+
+ return cls(*args)
+
+
+class ZoneInfoNotFoundError(KeyError):
+ """Exception raised when a ZoneInfo key is not found."""