diff options
author | shadchin <shadchin@yandex-team.ru> | 2022-02-10 16:44:30 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:44:30 +0300 |
commit | 2598ef1d0aee359b4b6d5fdd1758916d5907d04f (patch) | |
tree | 012bb94d777798f1f56ac1cec429509766d05181 /contrib/tools/python3/src/Lib/multiprocessing/shared_memory.py | |
parent | 6751af0b0c1b952fede40b19b71da8025b5d8bcf (diff) | |
download | ydb-2598ef1d0aee359b4b6d5fdd1758916d5907d04f.tar.gz |
Restoring authorship annotation for <shadchin@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'contrib/tools/python3/src/Lib/multiprocessing/shared_memory.py')
-rw-r--r-- | contrib/tools/python3/src/Lib/multiprocessing/shared_memory.py | 1064 |
1 files changed, 532 insertions, 532 deletions
diff --git a/contrib/tools/python3/src/Lib/multiprocessing/shared_memory.py b/contrib/tools/python3/src/Lib/multiprocessing/shared_memory.py index 122b3fcebf..db0516a993 100644 --- a/contrib/tools/python3/src/Lib/multiprocessing/shared_memory.py +++ b/contrib/tools/python3/src/Lib/multiprocessing/shared_memory.py @@ -1,532 +1,532 @@ -"""Provides shared memory for direct access across processes. - -The API of this package is currently provisional. Refer to the -documentation for details. -""" - - -__all__ = [ 'SharedMemory', 'ShareableList' ] - - -from functools import partial -import mmap -import os -import errno -import struct -import secrets -import types - -if os.name == "nt": - import _winapi - _USE_POSIX = False -else: - import _posixshmem - _USE_POSIX = True - - -_O_CREX = os.O_CREAT | os.O_EXCL - -# FreeBSD (and perhaps other BSDs) limit names to 14 characters. -_SHM_SAFE_NAME_LENGTH = 14 - -# Shared memory block name prefix -if _USE_POSIX: - _SHM_NAME_PREFIX = '/psm_' -else: - _SHM_NAME_PREFIX = 'wnsm_' - - -def _make_filename(): - "Create a random filename for the shared memory object." - # number of random bytes to use for name - nbytes = (_SHM_SAFE_NAME_LENGTH - len(_SHM_NAME_PREFIX)) // 2 - assert nbytes >= 2, '_SHM_NAME_PREFIX too long' - name = _SHM_NAME_PREFIX + secrets.token_hex(nbytes) - assert len(name) <= _SHM_SAFE_NAME_LENGTH - return name - - -class SharedMemory: - """Creates a new shared memory block or attaches to an existing - shared memory block. - - Every shared memory block is assigned a unique name. This enables - one process to create a shared memory block with a particular name - so that a different process can attach to that same shared memory - block using that same name. - - As a resource for sharing data across processes, shared memory blocks - may outlive the original process that created them. When one process - no longer needs access to a shared memory block that might still be - needed by other processes, the close() method should be called. - When a shared memory block is no longer needed by any process, the - unlink() method should be called to ensure proper cleanup.""" - - # Defaults; enables close() and unlink() to run without errors. - _name = None - _fd = -1 - _mmap = None - _buf = None - _flags = os.O_RDWR - _mode = 0o600 - _prepend_leading_slash = True if _USE_POSIX else False - - def __init__(self, name=None, create=False, size=0): - if not size >= 0: - raise ValueError("'size' must be a positive integer") - if create: - self._flags = _O_CREX | os.O_RDWR - if size == 0: - raise ValueError("'size' must be a positive number different from zero") - if name is None and not self._flags & os.O_EXCL: - raise ValueError("'name' can only be None if create=True") - - if _USE_POSIX: - - # POSIX Shared Memory - - if name is None: - while True: - name = _make_filename() - try: - self._fd = _posixshmem.shm_open( - name, - self._flags, - mode=self._mode - ) - except FileExistsError: - continue - self._name = name - break - else: - name = "/" + name if self._prepend_leading_slash else name - self._fd = _posixshmem.shm_open( - name, - self._flags, - mode=self._mode - ) - self._name = name - try: - if create and size: - os.ftruncate(self._fd, size) - stats = os.fstat(self._fd) - size = stats.st_size - self._mmap = mmap.mmap(self._fd, size) - except OSError: - self.unlink() - raise - - from .resource_tracker import register - register(self._name, "shared_memory") - - else: - - # Windows Named Shared Memory - - if create: - while True: - temp_name = _make_filename() if name is None else name - # Create and reserve shared memory block with this name - # until it can be attached to by mmap. - h_map = _winapi.CreateFileMapping( - _winapi.INVALID_HANDLE_VALUE, - _winapi.NULL, - _winapi.PAGE_READWRITE, - (size >> 32) & 0xFFFFFFFF, - size & 0xFFFFFFFF, - temp_name - ) - try: - last_error_code = _winapi.GetLastError() - if last_error_code == _winapi.ERROR_ALREADY_EXISTS: - if name is not None: - raise FileExistsError( - errno.EEXIST, - os.strerror(errno.EEXIST), - name, - _winapi.ERROR_ALREADY_EXISTS - ) - else: - continue - self._mmap = mmap.mmap(-1, size, tagname=temp_name) - finally: - _winapi.CloseHandle(h_map) - self._name = temp_name - break - - else: - self._name = name - # Dynamically determine the existing named shared memory - # block's size which is likely a multiple of mmap.PAGESIZE. - h_map = _winapi.OpenFileMapping( - _winapi.FILE_MAP_READ, - False, - name - ) - try: - p_buf = _winapi.MapViewOfFile( - h_map, - _winapi.FILE_MAP_READ, - 0, - 0, - 0 - ) - finally: - _winapi.CloseHandle(h_map) - size = _winapi.VirtualQuerySize(p_buf) - self._mmap = mmap.mmap(-1, size, tagname=name) - - self._size = size - self._buf = memoryview(self._mmap) - - def __del__(self): - try: - self.close() - except OSError: - pass - - def __reduce__(self): - return ( - self.__class__, - ( - self.name, - False, - self.size, - ), - ) - - def __repr__(self): - return f'{self.__class__.__name__}({self.name!r}, size={self.size})' - - @property - def buf(self): - "A memoryview of contents of the shared memory block." - return self._buf - - @property - def name(self): - "Unique name that identifies the shared memory block." - reported_name = self._name - if _USE_POSIX and self._prepend_leading_slash: - if self._name.startswith("/"): - reported_name = self._name[1:] - return reported_name - - @property - def size(self): - "Size in bytes." - return self._size - - def close(self): - """Closes access to the shared memory from this instance but does - not destroy the shared memory block.""" - if self._buf is not None: - self._buf.release() - self._buf = None - if self._mmap is not None: - self._mmap.close() - self._mmap = None - if _USE_POSIX and self._fd >= 0: - os.close(self._fd) - self._fd = -1 - - def unlink(self): - """Requests that the underlying shared memory block be destroyed. - - In order to ensure proper cleanup of resources, unlink should be - called once (and only once) across all processes which have access - to the shared memory block.""" - if _USE_POSIX and self._name: - from .resource_tracker import unregister - _posixshmem.shm_unlink(self._name) - unregister(self._name, "shared_memory") - - -_encoding = "utf8" - -class ShareableList: - """Pattern for a mutable list-like object shareable via a shared - memory block. It differs from the built-in list type in that these - lists can not change their overall length (i.e. no append, insert, - etc.) - - Because values are packed into a memoryview as bytes, the struct - packing format for any storable value must require no more than 8 - characters to describe its format.""" - - # The shared memory area is organized as follows: - # - 8 bytes: number of items (N) as a 64-bit integer - # - (N + 1) * 8 bytes: offsets of each element from the start of the - # data area - # - K bytes: the data area storing item values (with encoding and size - # depending on their respective types) - # - N * 8 bytes: `struct` format string for each element - # - N bytes: index into _back_transforms_mapping for each element - # (for reconstructing the corresponding Python value) - _types_mapping = { - int: "q", - float: "d", - bool: "xxxxxxx?", - str: "%ds", - bytes: "%ds", - None.__class__: "xxxxxx?x", - } - _alignment = 8 - _back_transforms_mapping = { - 0: lambda value: value, # int, float, bool - 1: lambda value: value.rstrip(b'\x00').decode(_encoding), # str - 2: lambda value: value.rstrip(b'\x00'), # bytes - 3: lambda _value: None, # None - } - - @staticmethod - def _extract_recreation_code(value): - """Used in concert with _back_transforms_mapping to convert values - into the appropriate Python objects when retrieving them from - the list as well as when storing them.""" - if not isinstance(value, (str, bytes, None.__class__)): - return 0 - elif isinstance(value, str): - return 1 - elif isinstance(value, bytes): - return 2 - else: - return 3 # NoneType - - def __init__(self, sequence=None, *, name=None): - if name is None or sequence is not None: - sequence = sequence or () - _formats = [ - self._types_mapping[type(item)] - if not isinstance(item, (str, bytes)) - else self._types_mapping[type(item)] % ( - self._alignment * (len(item) // self._alignment + 1), - ) - for item in sequence - ] - self._list_len = len(_formats) - assert sum(len(fmt) <= 8 for fmt in _formats) == self._list_len - offset = 0 - # The offsets of each list element into the shared memory's - # data area (0 meaning the start of the data area, not the start - # of the shared memory area). - self._allocated_offsets = [0] - for fmt in _formats: - offset += self._alignment if fmt[-1] != "s" else int(fmt[:-1]) - self._allocated_offsets.append(offset) - _recreation_codes = [ - self._extract_recreation_code(item) for item in sequence - ] - requested_size = struct.calcsize( - "q" + self._format_size_metainfo + - "".join(_formats) + - self._format_packing_metainfo + - self._format_back_transform_codes - ) - - self.shm = SharedMemory(name, create=True, size=requested_size) - else: - self.shm = SharedMemory(name) - - if sequence is not None: - _enc = _encoding - struct.pack_into( - "q" + self._format_size_metainfo, - self.shm.buf, - 0, - self._list_len, - *(self._allocated_offsets) - ) - struct.pack_into( - "".join(_formats), - self.shm.buf, - self._offset_data_start, - *(v.encode(_enc) if isinstance(v, str) else v for v in sequence) - ) - struct.pack_into( - self._format_packing_metainfo, - self.shm.buf, - self._offset_packing_formats, - *(v.encode(_enc) for v in _formats) - ) - struct.pack_into( - self._format_back_transform_codes, - self.shm.buf, - self._offset_back_transform_codes, - *(_recreation_codes) - ) - - else: - self._list_len = len(self) # Obtains size from offset 0 in buffer. - self._allocated_offsets = list( - struct.unpack_from( - self._format_size_metainfo, - self.shm.buf, - 1 * 8 - ) - ) - - def _get_packing_format(self, position): - "Gets the packing format for a single value stored in the list." - position = position if position >= 0 else position + self._list_len - if (position >= self._list_len) or (self._list_len < 0): - raise IndexError("Requested position out of range.") - - v = struct.unpack_from( - "8s", - self.shm.buf, - self._offset_packing_formats + position * 8 - )[0] - fmt = v.rstrip(b'\x00') - fmt_as_str = fmt.decode(_encoding) - - return fmt_as_str - - def _get_back_transform(self, position): - "Gets the back transformation function for a single value." - - if (position >= self._list_len) or (self._list_len < 0): - raise IndexError("Requested position out of range.") - - transform_code = struct.unpack_from( - "b", - self.shm.buf, - self._offset_back_transform_codes + position - )[0] - transform_function = self._back_transforms_mapping[transform_code] - - return transform_function - - def _set_packing_format_and_transform(self, position, fmt_as_str, value): - """Sets the packing format and back transformation code for a - single value in the list at the specified position.""" - - if (position >= self._list_len) or (self._list_len < 0): - raise IndexError("Requested position out of range.") - - struct.pack_into( - "8s", - self.shm.buf, - self._offset_packing_formats + position * 8, - fmt_as_str.encode(_encoding) - ) - - transform_code = self._extract_recreation_code(value) - struct.pack_into( - "b", - self.shm.buf, - self._offset_back_transform_codes + position, - transform_code - ) - - def __getitem__(self, position): - position = position if position >= 0 else position + self._list_len - try: - offset = self._offset_data_start + self._allocated_offsets[position] - (v,) = struct.unpack_from( - self._get_packing_format(position), - self.shm.buf, - offset - ) - except IndexError: - raise IndexError("index out of range") - - back_transform = self._get_back_transform(position) - v = back_transform(v) - - return v - - def __setitem__(self, position, value): - position = position if position >= 0 else position + self._list_len - try: - item_offset = self._allocated_offsets[position] - offset = self._offset_data_start + item_offset - current_format = self._get_packing_format(position) - except IndexError: - raise IndexError("assignment index out of range") - - if not isinstance(value, (str, bytes)): - new_format = self._types_mapping[type(value)] - encoded_value = value - else: - allocated_length = self._allocated_offsets[position + 1] - item_offset - - encoded_value = (value.encode(_encoding) - if isinstance(value, str) else value) - if len(encoded_value) > allocated_length: - raise ValueError("bytes/str item exceeds available storage") - if current_format[-1] == "s": - new_format = current_format - else: - new_format = self._types_mapping[str] % ( - allocated_length, - ) - - self._set_packing_format_and_transform( - position, - new_format, - value - ) - struct.pack_into(new_format, self.shm.buf, offset, encoded_value) - - def __reduce__(self): - return partial(self.__class__, name=self.shm.name), () - - def __len__(self): - return struct.unpack_from("q", self.shm.buf, 0)[0] - - def __repr__(self): - return f'{self.__class__.__name__}({list(self)}, name={self.shm.name!r})' - - @property - def format(self): - "The struct packing format used by all currently stored items." - return "".join( - self._get_packing_format(i) for i in range(self._list_len) - ) - - @property - def _format_size_metainfo(self): - "The struct packing format used for the items' storage offsets." - return "q" * (self._list_len + 1) - - @property - def _format_packing_metainfo(self): - "The struct packing format used for the items' packing formats." - return "8s" * self._list_len - - @property - def _format_back_transform_codes(self): - "The struct packing format used for the items' back transforms." - return "b" * self._list_len - - @property - def _offset_data_start(self): - # - 8 bytes for the list length - # - (N + 1) * 8 bytes for the element offsets - return (self._list_len + 2) * 8 - - @property - def _offset_packing_formats(self): - return self._offset_data_start + self._allocated_offsets[-1] - - @property - def _offset_back_transform_codes(self): - return self._offset_packing_formats + self._list_len * 8 - - def count(self, value): - "L.count(value) -> integer -- return number of occurrences of value." - - return sum(value == entry for entry in self) - - def index(self, value): - """L.index(value) -> integer -- return first index of value. - Raises ValueError if the value is not present.""" - - for position, entry in enumerate(self): - if value == entry: - return position - else: - raise ValueError(f"{value!r} not in this container") - - __class_getitem__ = classmethod(types.GenericAlias) +"""Provides shared memory for direct access across processes. + +The API of this package is currently provisional. Refer to the +documentation for details. +""" + + +__all__ = [ 'SharedMemory', 'ShareableList' ] + + +from functools import partial +import mmap +import os +import errno +import struct +import secrets +import types + +if os.name == "nt": + import _winapi + _USE_POSIX = False +else: + import _posixshmem + _USE_POSIX = True + + +_O_CREX = os.O_CREAT | os.O_EXCL + +# FreeBSD (and perhaps other BSDs) limit names to 14 characters. +_SHM_SAFE_NAME_LENGTH = 14 + +# Shared memory block name prefix +if _USE_POSIX: + _SHM_NAME_PREFIX = '/psm_' +else: + _SHM_NAME_PREFIX = 'wnsm_' + + +def _make_filename(): + "Create a random filename for the shared memory object." + # number of random bytes to use for name + nbytes = (_SHM_SAFE_NAME_LENGTH - len(_SHM_NAME_PREFIX)) // 2 + assert nbytes >= 2, '_SHM_NAME_PREFIX too long' + name = _SHM_NAME_PREFIX + secrets.token_hex(nbytes) + assert len(name) <= _SHM_SAFE_NAME_LENGTH + return name + + +class SharedMemory: + """Creates a new shared memory block or attaches to an existing + shared memory block. + + Every shared memory block is assigned a unique name. This enables + one process to create a shared memory block with a particular name + so that a different process can attach to that same shared memory + block using that same name. + + As a resource for sharing data across processes, shared memory blocks + may outlive the original process that created them. When one process + no longer needs access to a shared memory block that might still be + needed by other processes, the close() method should be called. + When a shared memory block is no longer needed by any process, the + unlink() method should be called to ensure proper cleanup.""" + + # Defaults; enables close() and unlink() to run without errors. + _name = None + _fd = -1 + _mmap = None + _buf = None + _flags = os.O_RDWR + _mode = 0o600 + _prepend_leading_slash = True if _USE_POSIX else False + + def __init__(self, name=None, create=False, size=0): + if not size >= 0: + raise ValueError("'size' must be a positive integer") + if create: + self._flags = _O_CREX | os.O_RDWR + if size == 0: + raise ValueError("'size' must be a positive number different from zero") + if name is None and not self._flags & os.O_EXCL: + raise ValueError("'name' can only be None if create=True") + + if _USE_POSIX: + + # POSIX Shared Memory + + if name is None: + while True: + name = _make_filename() + try: + self._fd = _posixshmem.shm_open( + name, + self._flags, + mode=self._mode + ) + except FileExistsError: + continue + self._name = name + break + else: + name = "/" + name if self._prepend_leading_slash else name + self._fd = _posixshmem.shm_open( + name, + self._flags, + mode=self._mode + ) + self._name = name + try: + if create and size: + os.ftruncate(self._fd, size) + stats = os.fstat(self._fd) + size = stats.st_size + self._mmap = mmap.mmap(self._fd, size) + except OSError: + self.unlink() + raise + + from .resource_tracker import register + register(self._name, "shared_memory") + + else: + + # Windows Named Shared Memory + + if create: + while True: + temp_name = _make_filename() if name is None else name + # Create and reserve shared memory block with this name + # until it can be attached to by mmap. + h_map = _winapi.CreateFileMapping( + _winapi.INVALID_HANDLE_VALUE, + _winapi.NULL, + _winapi.PAGE_READWRITE, + (size >> 32) & 0xFFFFFFFF, + size & 0xFFFFFFFF, + temp_name + ) + try: + last_error_code = _winapi.GetLastError() + if last_error_code == _winapi.ERROR_ALREADY_EXISTS: + if name is not None: + raise FileExistsError( + errno.EEXIST, + os.strerror(errno.EEXIST), + name, + _winapi.ERROR_ALREADY_EXISTS + ) + else: + continue + self._mmap = mmap.mmap(-1, size, tagname=temp_name) + finally: + _winapi.CloseHandle(h_map) + self._name = temp_name + break + + else: + self._name = name + # Dynamically determine the existing named shared memory + # block's size which is likely a multiple of mmap.PAGESIZE. + h_map = _winapi.OpenFileMapping( + _winapi.FILE_MAP_READ, + False, + name + ) + try: + p_buf = _winapi.MapViewOfFile( + h_map, + _winapi.FILE_MAP_READ, + 0, + 0, + 0 + ) + finally: + _winapi.CloseHandle(h_map) + size = _winapi.VirtualQuerySize(p_buf) + self._mmap = mmap.mmap(-1, size, tagname=name) + + self._size = size + self._buf = memoryview(self._mmap) + + def __del__(self): + try: + self.close() + except OSError: + pass + + def __reduce__(self): + return ( + self.__class__, + ( + self.name, + False, + self.size, + ), + ) + + def __repr__(self): + return f'{self.__class__.__name__}({self.name!r}, size={self.size})' + + @property + def buf(self): + "A memoryview of contents of the shared memory block." + return self._buf + + @property + def name(self): + "Unique name that identifies the shared memory block." + reported_name = self._name + if _USE_POSIX and self._prepend_leading_slash: + if self._name.startswith("/"): + reported_name = self._name[1:] + return reported_name + + @property + def size(self): + "Size in bytes." + return self._size + + def close(self): + """Closes access to the shared memory from this instance but does + not destroy the shared memory block.""" + if self._buf is not None: + self._buf.release() + self._buf = None + if self._mmap is not None: + self._mmap.close() + self._mmap = None + if _USE_POSIX and self._fd >= 0: + os.close(self._fd) + self._fd = -1 + + def unlink(self): + """Requests that the underlying shared memory block be destroyed. + + In order to ensure proper cleanup of resources, unlink should be + called once (and only once) across all processes which have access + to the shared memory block.""" + if _USE_POSIX and self._name: + from .resource_tracker import unregister + _posixshmem.shm_unlink(self._name) + unregister(self._name, "shared_memory") + + +_encoding = "utf8" + +class ShareableList: + """Pattern for a mutable list-like object shareable via a shared + memory block. It differs from the built-in list type in that these + lists can not change their overall length (i.e. no append, insert, + etc.) + + Because values are packed into a memoryview as bytes, the struct + packing format for any storable value must require no more than 8 + characters to describe its format.""" + + # The shared memory area is organized as follows: + # - 8 bytes: number of items (N) as a 64-bit integer + # - (N + 1) * 8 bytes: offsets of each element from the start of the + # data area + # - K bytes: the data area storing item values (with encoding and size + # depending on their respective types) + # - N * 8 bytes: `struct` format string for each element + # - N bytes: index into _back_transforms_mapping for each element + # (for reconstructing the corresponding Python value) + _types_mapping = { + int: "q", + float: "d", + bool: "xxxxxxx?", + str: "%ds", + bytes: "%ds", + None.__class__: "xxxxxx?x", + } + _alignment = 8 + _back_transforms_mapping = { + 0: lambda value: value, # int, float, bool + 1: lambda value: value.rstrip(b'\x00').decode(_encoding), # str + 2: lambda value: value.rstrip(b'\x00'), # bytes + 3: lambda _value: None, # None + } + + @staticmethod + def _extract_recreation_code(value): + """Used in concert with _back_transforms_mapping to convert values + into the appropriate Python objects when retrieving them from + the list as well as when storing them.""" + if not isinstance(value, (str, bytes, None.__class__)): + return 0 + elif isinstance(value, str): + return 1 + elif isinstance(value, bytes): + return 2 + else: + return 3 # NoneType + + def __init__(self, sequence=None, *, name=None): + if name is None or sequence is not None: + sequence = sequence or () + _formats = [ + self._types_mapping[type(item)] + if not isinstance(item, (str, bytes)) + else self._types_mapping[type(item)] % ( + self._alignment * (len(item) // self._alignment + 1), + ) + for item in sequence + ] + self._list_len = len(_formats) + assert sum(len(fmt) <= 8 for fmt in _formats) == self._list_len + offset = 0 + # The offsets of each list element into the shared memory's + # data area (0 meaning the start of the data area, not the start + # of the shared memory area). + self._allocated_offsets = [0] + for fmt in _formats: + offset += self._alignment if fmt[-1] != "s" else int(fmt[:-1]) + self._allocated_offsets.append(offset) + _recreation_codes = [ + self._extract_recreation_code(item) for item in sequence + ] + requested_size = struct.calcsize( + "q" + self._format_size_metainfo + + "".join(_formats) + + self._format_packing_metainfo + + self._format_back_transform_codes + ) + + self.shm = SharedMemory(name, create=True, size=requested_size) + else: + self.shm = SharedMemory(name) + + if sequence is not None: + _enc = _encoding + struct.pack_into( + "q" + self._format_size_metainfo, + self.shm.buf, + 0, + self._list_len, + *(self._allocated_offsets) + ) + struct.pack_into( + "".join(_formats), + self.shm.buf, + self._offset_data_start, + *(v.encode(_enc) if isinstance(v, str) else v for v in sequence) + ) + struct.pack_into( + self._format_packing_metainfo, + self.shm.buf, + self._offset_packing_formats, + *(v.encode(_enc) for v in _formats) + ) + struct.pack_into( + self._format_back_transform_codes, + self.shm.buf, + self._offset_back_transform_codes, + *(_recreation_codes) + ) + + else: + self._list_len = len(self) # Obtains size from offset 0 in buffer. + self._allocated_offsets = list( + struct.unpack_from( + self._format_size_metainfo, + self.shm.buf, + 1 * 8 + ) + ) + + def _get_packing_format(self, position): + "Gets the packing format for a single value stored in the list." + position = position if position >= 0 else position + self._list_len + if (position >= self._list_len) or (self._list_len < 0): + raise IndexError("Requested position out of range.") + + v = struct.unpack_from( + "8s", + self.shm.buf, + self._offset_packing_formats + position * 8 + )[0] + fmt = v.rstrip(b'\x00') + fmt_as_str = fmt.decode(_encoding) + + return fmt_as_str + + def _get_back_transform(self, position): + "Gets the back transformation function for a single value." + + if (position >= self._list_len) or (self._list_len < 0): + raise IndexError("Requested position out of range.") + + transform_code = struct.unpack_from( + "b", + self.shm.buf, + self._offset_back_transform_codes + position + )[0] + transform_function = self._back_transforms_mapping[transform_code] + + return transform_function + + def _set_packing_format_and_transform(self, position, fmt_as_str, value): + """Sets the packing format and back transformation code for a + single value in the list at the specified position.""" + + if (position >= self._list_len) or (self._list_len < 0): + raise IndexError("Requested position out of range.") + + struct.pack_into( + "8s", + self.shm.buf, + self._offset_packing_formats + position * 8, + fmt_as_str.encode(_encoding) + ) + + transform_code = self._extract_recreation_code(value) + struct.pack_into( + "b", + self.shm.buf, + self._offset_back_transform_codes + position, + transform_code + ) + + def __getitem__(self, position): + position = position if position >= 0 else position + self._list_len + try: + offset = self._offset_data_start + self._allocated_offsets[position] + (v,) = struct.unpack_from( + self._get_packing_format(position), + self.shm.buf, + offset + ) + except IndexError: + raise IndexError("index out of range") + + back_transform = self._get_back_transform(position) + v = back_transform(v) + + return v + + def __setitem__(self, position, value): + position = position if position >= 0 else position + self._list_len + try: + item_offset = self._allocated_offsets[position] + offset = self._offset_data_start + item_offset + current_format = self._get_packing_format(position) + except IndexError: + raise IndexError("assignment index out of range") + + if not isinstance(value, (str, bytes)): + new_format = self._types_mapping[type(value)] + encoded_value = value + else: + allocated_length = self._allocated_offsets[position + 1] - item_offset + + encoded_value = (value.encode(_encoding) + if isinstance(value, str) else value) + if len(encoded_value) > allocated_length: + raise ValueError("bytes/str item exceeds available storage") + if current_format[-1] == "s": + new_format = current_format + else: + new_format = self._types_mapping[str] % ( + allocated_length, + ) + + self._set_packing_format_and_transform( + position, + new_format, + value + ) + struct.pack_into(new_format, self.shm.buf, offset, encoded_value) + + def __reduce__(self): + return partial(self.__class__, name=self.shm.name), () + + def __len__(self): + return struct.unpack_from("q", self.shm.buf, 0)[0] + + def __repr__(self): + return f'{self.__class__.__name__}({list(self)}, name={self.shm.name!r})' + + @property + def format(self): + "The struct packing format used by all currently stored items." + return "".join( + self._get_packing_format(i) for i in range(self._list_len) + ) + + @property + def _format_size_metainfo(self): + "The struct packing format used for the items' storage offsets." + return "q" * (self._list_len + 1) + + @property + def _format_packing_metainfo(self): + "The struct packing format used for the items' packing formats." + return "8s" * self._list_len + + @property + def _format_back_transform_codes(self): + "The struct packing format used for the items' back transforms." + return "b" * self._list_len + + @property + def _offset_data_start(self): + # - 8 bytes for the list length + # - (N + 1) * 8 bytes for the element offsets + return (self._list_len + 2) * 8 + + @property + def _offset_packing_formats(self): + return self._offset_data_start + self._allocated_offsets[-1] + + @property + def _offset_back_transform_codes(self): + return self._offset_packing_formats + self._list_len * 8 + + def count(self, value): + "L.count(value) -> integer -- return number of occurrences of value." + + return sum(value == entry for entry in self) + + def index(self, value): + """L.index(value) -> integer -- return first index of value. + Raises ValueError if the value is not present.""" + + for position, entry in enumerate(self): + if value == entry: + return position + else: + raise ValueError(f"{value!r} not in this container") + + __class_getitem__ = classmethod(types.GenericAlias) |