aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/tools/python3/src/Lib/multiprocessing/shared_memory.py
diff options
context:
space:
mode:
authorshadchin <shadchin@yandex-team.ru>2022-02-10 16:44:30 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:44:30 +0300
commit2598ef1d0aee359b4b6d5fdd1758916d5907d04f (patch)
tree012bb94d777798f1f56ac1cec429509766d05181 /contrib/tools/python3/src/Lib/multiprocessing/shared_memory.py
parent6751af0b0c1b952fede40b19b71da8025b5d8bcf (diff)
downloadydb-2598ef1d0aee359b4b6d5fdd1758916d5907d04f.tar.gz
Restoring authorship annotation for <shadchin@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'contrib/tools/python3/src/Lib/multiprocessing/shared_memory.py')
-rw-r--r--contrib/tools/python3/src/Lib/multiprocessing/shared_memory.py1064
1 files changed, 532 insertions, 532 deletions
diff --git a/contrib/tools/python3/src/Lib/multiprocessing/shared_memory.py b/contrib/tools/python3/src/Lib/multiprocessing/shared_memory.py
index 122b3fcebf..db0516a993 100644
--- a/contrib/tools/python3/src/Lib/multiprocessing/shared_memory.py
+++ b/contrib/tools/python3/src/Lib/multiprocessing/shared_memory.py
@@ -1,532 +1,532 @@
-"""Provides shared memory for direct access across processes.
-
-The API of this package is currently provisional. Refer to the
-documentation for details.
-"""
-
-
-__all__ = [ 'SharedMemory', 'ShareableList' ]
-
-
-from functools import partial
-import mmap
-import os
-import errno
-import struct
-import secrets
-import types
-
-if os.name == "nt":
- import _winapi
- _USE_POSIX = False
-else:
- import _posixshmem
- _USE_POSIX = True
-
-
-_O_CREX = os.O_CREAT | os.O_EXCL
-
-# FreeBSD (and perhaps other BSDs) limit names to 14 characters.
-_SHM_SAFE_NAME_LENGTH = 14
-
-# Shared memory block name prefix
-if _USE_POSIX:
- _SHM_NAME_PREFIX = '/psm_'
-else:
- _SHM_NAME_PREFIX = 'wnsm_'
-
-
-def _make_filename():
- "Create a random filename for the shared memory object."
- # number of random bytes to use for name
- nbytes = (_SHM_SAFE_NAME_LENGTH - len(_SHM_NAME_PREFIX)) // 2
- assert nbytes >= 2, '_SHM_NAME_PREFIX too long'
- name = _SHM_NAME_PREFIX + secrets.token_hex(nbytes)
- assert len(name) <= _SHM_SAFE_NAME_LENGTH
- return name
-
-
-class SharedMemory:
- """Creates a new shared memory block or attaches to an existing
- shared memory block.
-
- Every shared memory block is assigned a unique name. This enables
- one process to create a shared memory block with a particular name
- so that a different process can attach to that same shared memory
- block using that same name.
-
- As a resource for sharing data across processes, shared memory blocks
- may outlive the original process that created them. When one process
- no longer needs access to a shared memory block that might still be
- needed by other processes, the close() method should be called.
- When a shared memory block is no longer needed by any process, the
- unlink() method should be called to ensure proper cleanup."""
-
- # Defaults; enables close() and unlink() to run without errors.
- _name = None
- _fd = -1
- _mmap = None
- _buf = None
- _flags = os.O_RDWR
- _mode = 0o600
- _prepend_leading_slash = True if _USE_POSIX else False
-
- def __init__(self, name=None, create=False, size=0):
- if not size >= 0:
- raise ValueError("'size' must be a positive integer")
- if create:
- self._flags = _O_CREX | os.O_RDWR
- if size == 0:
- raise ValueError("'size' must be a positive number different from zero")
- if name is None and not self._flags & os.O_EXCL:
- raise ValueError("'name' can only be None if create=True")
-
- if _USE_POSIX:
-
- # POSIX Shared Memory
-
- if name is None:
- while True:
- name = _make_filename()
- try:
- self._fd = _posixshmem.shm_open(
- name,
- self._flags,
- mode=self._mode
- )
- except FileExistsError:
- continue
- self._name = name
- break
- else:
- name = "/" + name if self._prepend_leading_slash else name
- self._fd = _posixshmem.shm_open(
- name,
- self._flags,
- mode=self._mode
- )
- self._name = name
- try:
- if create and size:
- os.ftruncate(self._fd, size)
- stats = os.fstat(self._fd)
- size = stats.st_size
- self._mmap = mmap.mmap(self._fd, size)
- except OSError:
- self.unlink()
- raise
-
- from .resource_tracker import register
- register(self._name, "shared_memory")
-
- else:
-
- # Windows Named Shared Memory
-
- if create:
- while True:
- temp_name = _make_filename() if name is None else name
- # Create and reserve shared memory block with this name
- # until it can be attached to by mmap.
- h_map = _winapi.CreateFileMapping(
- _winapi.INVALID_HANDLE_VALUE,
- _winapi.NULL,
- _winapi.PAGE_READWRITE,
- (size >> 32) & 0xFFFFFFFF,
- size & 0xFFFFFFFF,
- temp_name
- )
- try:
- last_error_code = _winapi.GetLastError()
- if last_error_code == _winapi.ERROR_ALREADY_EXISTS:
- if name is not None:
- raise FileExistsError(
- errno.EEXIST,
- os.strerror(errno.EEXIST),
- name,
- _winapi.ERROR_ALREADY_EXISTS
- )
- else:
- continue
- self._mmap = mmap.mmap(-1, size, tagname=temp_name)
- finally:
- _winapi.CloseHandle(h_map)
- self._name = temp_name
- break
-
- else:
- self._name = name
- # Dynamically determine the existing named shared memory
- # block's size which is likely a multiple of mmap.PAGESIZE.
- h_map = _winapi.OpenFileMapping(
- _winapi.FILE_MAP_READ,
- False,
- name
- )
- try:
- p_buf = _winapi.MapViewOfFile(
- h_map,
- _winapi.FILE_MAP_READ,
- 0,
- 0,
- 0
- )
- finally:
- _winapi.CloseHandle(h_map)
- size = _winapi.VirtualQuerySize(p_buf)
- self._mmap = mmap.mmap(-1, size, tagname=name)
-
- self._size = size
- self._buf = memoryview(self._mmap)
-
- def __del__(self):
- try:
- self.close()
- except OSError:
- pass
-
- def __reduce__(self):
- return (
- self.__class__,
- (
- self.name,
- False,
- self.size,
- ),
- )
-
- def __repr__(self):
- return f'{self.__class__.__name__}({self.name!r}, size={self.size})'
-
- @property
- def buf(self):
- "A memoryview of contents of the shared memory block."
- return self._buf
-
- @property
- def name(self):
- "Unique name that identifies the shared memory block."
- reported_name = self._name
- if _USE_POSIX and self._prepend_leading_slash:
- if self._name.startswith("/"):
- reported_name = self._name[1:]
- return reported_name
-
- @property
- def size(self):
- "Size in bytes."
- return self._size
-
- def close(self):
- """Closes access to the shared memory from this instance but does
- not destroy the shared memory block."""
- if self._buf is not None:
- self._buf.release()
- self._buf = None
- if self._mmap is not None:
- self._mmap.close()
- self._mmap = None
- if _USE_POSIX and self._fd >= 0:
- os.close(self._fd)
- self._fd = -1
-
- def unlink(self):
- """Requests that the underlying shared memory block be destroyed.
-
- In order to ensure proper cleanup of resources, unlink should be
- called once (and only once) across all processes which have access
- to the shared memory block."""
- if _USE_POSIX and self._name:
- from .resource_tracker import unregister
- _posixshmem.shm_unlink(self._name)
- unregister(self._name, "shared_memory")
-
-
-_encoding = "utf8"
-
-class ShareableList:
- """Pattern for a mutable list-like object shareable via a shared
- memory block. It differs from the built-in list type in that these
- lists can not change their overall length (i.e. no append, insert,
- etc.)
-
- Because values are packed into a memoryview as bytes, the struct
- packing format for any storable value must require no more than 8
- characters to describe its format."""
-
- # The shared memory area is organized as follows:
- # - 8 bytes: number of items (N) as a 64-bit integer
- # - (N + 1) * 8 bytes: offsets of each element from the start of the
- # data area
- # - K bytes: the data area storing item values (with encoding and size
- # depending on their respective types)
- # - N * 8 bytes: `struct` format string for each element
- # - N bytes: index into _back_transforms_mapping for each element
- # (for reconstructing the corresponding Python value)
- _types_mapping = {
- int: "q",
- float: "d",
- bool: "xxxxxxx?",
- str: "%ds",
- bytes: "%ds",
- None.__class__: "xxxxxx?x",
- }
- _alignment = 8
- _back_transforms_mapping = {
- 0: lambda value: value, # int, float, bool
- 1: lambda value: value.rstrip(b'\x00').decode(_encoding), # str
- 2: lambda value: value.rstrip(b'\x00'), # bytes
- 3: lambda _value: None, # None
- }
-
- @staticmethod
- def _extract_recreation_code(value):
- """Used in concert with _back_transforms_mapping to convert values
- into the appropriate Python objects when retrieving them from
- the list as well as when storing them."""
- if not isinstance(value, (str, bytes, None.__class__)):
- return 0
- elif isinstance(value, str):
- return 1
- elif isinstance(value, bytes):
- return 2
- else:
- return 3 # NoneType
-
- def __init__(self, sequence=None, *, name=None):
- if name is None or sequence is not None:
- sequence = sequence or ()
- _formats = [
- self._types_mapping[type(item)]
- if not isinstance(item, (str, bytes))
- else self._types_mapping[type(item)] % (
- self._alignment * (len(item) // self._alignment + 1),
- )
- for item in sequence
- ]
- self._list_len = len(_formats)
- assert sum(len(fmt) <= 8 for fmt in _formats) == self._list_len
- offset = 0
- # The offsets of each list element into the shared memory's
- # data area (0 meaning the start of the data area, not the start
- # of the shared memory area).
- self._allocated_offsets = [0]
- for fmt in _formats:
- offset += self._alignment if fmt[-1] != "s" else int(fmt[:-1])
- self._allocated_offsets.append(offset)
- _recreation_codes = [
- self._extract_recreation_code(item) for item in sequence
- ]
- requested_size = struct.calcsize(
- "q" + self._format_size_metainfo +
- "".join(_formats) +
- self._format_packing_metainfo +
- self._format_back_transform_codes
- )
-
- self.shm = SharedMemory(name, create=True, size=requested_size)
- else:
- self.shm = SharedMemory(name)
-
- if sequence is not None:
- _enc = _encoding
- struct.pack_into(
- "q" + self._format_size_metainfo,
- self.shm.buf,
- 0,
- self._list_len,
- *(self._allocated_offsets)
- )
- struct.pack_into(
- "".join(_formats),
- self.shm.buf,
- self._offset_data_start,
- *(v.encode(_enc) if isinstance(v, str) else v for v in sequence)
- )
- struct.pack_into(
- self._format_packing_metainfo,
- self.shm.buf,
- self._offset_packing_formats,
- *(v.encode(_enc) for v in _formats)
- )
- struct.pack_into(
- self._format_back_transform_codes,
- self.shm.buf,
- self._offset_back_transform_codes,
- *(_recreation_codes)
- )
-
- else:
- self._list_len = len(self) # Obtains size from offset 0 in buffer.
- self._allocated_offsets = list(
- struct.unpack_from(
- self._format_size_metainfo,
- self.shm.buf,
- 1 * 8
- )
- )
-
- def _get_packing_format(self, position):
- "Gets the packing format for a single value stored in the list."
- position = position if position >= 0 else position + self._list_len
- if (position >= self._list_len) or (self._list_len < 0):
- raise IndexError("Requested position out of range.")
-
- v = struct.unpack_from(
- "8s",
- self.shm.buf,
- self._offset_packing_formats + position * 8
- )[0]
- fmt = v.rstrip(b'\x00')
- fmt_as_str = fmt.decode(_encoding)
-
- return fmt_as_str
-
- def _get_back_transform(self, position):
- "Gets the back transformation function for a single value."
-
- if (position >= self._list_len) or (self._list_len < 0):
- raise IndexError("Requested position out of range.")
-
- transform_code = struct.unpack_from(
- "b",
- self.shm.buf,
- self._offset_back_transform_codes + position
- )[0]
- transform_function = self._back_transforms_mapping[transform_code]
-
- return transform_function
-
- def _set_packing_format_and_transform(self, position, fmt_as_str, value):
- """Sets the packing format and back transformation code for a
- single value in the list at the specified position."""
-
- if (position >= self._list_len) or (self._list_len < 0):
- raise IndexError("Requested position out of range.")
-
- struct.pack_into(
- "8s",
- self.shm.buf,
- self._offset_packing_formats + position * 8,
- fmt_as_str.encode(_encoding)
- )
-
- transform_code = self._extract_recreation_code(value)
- struct.pack_into(
- "b",
- self.shm.buf,
- self._offset_back_transform_codes + position,
- transform_code
- )
-
- def __getitem__(self, position):
- position = position if position >= 0 else position + self._list_len
- try:
- offset = self._offset_data_start + self._allocated_offsets[position]
- (v,) = struct.unpack_from(
- self._get_packing_format(position),
- self.shm.buf,
- offset
- )
- except IndexError:
- raise IndexError("index out of range")
-
- back_transform = self._get_back_transform(position)
- v = back_transform(v)
-
- return v
-
- def __setitem__(self, position, value):
- position = position if position >= 0 else position + self._list_len
- try:
- item_offset = self._allocated_offsets[position]
- offset = self._offset_data_start + item_offset
- current_format = self._get_packing_format(position)
- except IndexError:
- raise IndexError("assignment index out of range")
-
- if not isinstance(value, (str, bytes)):
- new_format = self._types_mapping[type(value)]
- encoded_value = value
- else:
- allocated_length = self._allocated_offsets[position + 1] - item_offset
-
- encoded_value = (value.encode(_encoding)
- if isinstance(value, str) else value)
- if len(encoded_value) > allocated_length:
- raise ValueError("bytes/str item exceeds available storage")
- if current_format[-1] == "s":
- new_format = current_format
- else:
- new_format = self._types_mapping[str] % (
- allocated_length,
- )
-
- self._set_packing_format_and_transform(
- position,
- new_format,
- value
- )
- struct.pack_into(new_format, self.shm.buf, offset, encoded_value)
-
- def __reduce__(self):
- return partial(self.__class__, name=self.shm.name), ()
-
- def __len__(self):
- return struct.unpack_from("q", self.shm.buf, 0)[0]
-
- def __repr__(self):
- return f'{self.__class__.__name__}({list(self)}, name={self.shm.name!r})'
-
- @property
- def format(self):
- "The struct packing format used by all currently stored items."
- return "".join(
- self._get_packing_format(i) for i in range(self._list_len)
- )
-
- @property
- def _format_size_metainfo(self):
- "The struct packing format used for the items' storage offsets."
- return "q" * (self._list_len + 1)
-
- @property
- def _format_packing_metainfo(self):
- "The struct packing format used for the items' packing formats."
- return "8s" * self._list_len
-
- @property
- def _format_back_transform_codes(self):
- "The struct packing format used for the items' back transforms."
- return "b" * self._list_len
-
- @property
- def _offset_data_start(self):
- # - 8 bytes for the list length
- # - (N + 1) * 8 bytes for the element offsets
- return (self._list_len + 2) * 8
-
- @property
- def _offset_packing_formats(self):
- return self._offset_data_start + self._allocated_offsets[-1]
-
- @property
- def _offset_back_transform_codes(self):
- return self._offset_packing_formats + self._list_len * 8
-
- def count(self, value):
- "L.count(value) -> integer -- return number of occurrences of value."
-
- return sum(value == entry for entry in self)
-
- def index(self, value):
- """L.index(value) -> integer -- return first index of value.
- Raises ValueError if the value is not present."""
-
- for position, entry in enumerate(self):
- if value == entry:
- return position
- else:
- raise ValueError(f"{value!r} not in this container")
-
- __class_getitem__ = classmethod(types.GenericAlias)
+"""Provides shared memory for direct access across processes.
+
+The API of this package is currently provisional. Refer to the
+documentation for details.
+"""
+
+
+__all__ = [ 'SharedMemory', 'ShareableList' ]
+
+
+from functools import partial
+import mmap
+import os
+import errno
+import struct
+import secrets
+import types
+
+if os.name == "nt":
+ import _winapi
+ _USE_POSIX = False
+else:
+ import _posixshmem
+ _USE_POSIX = True
+
+
+_O_CREX = os.O_CREAT | os.O_EXCL
+
+# FreeBSD (and perhaps other BSDs) limit names to 14 characters.
+_SHM_SAFE_NAME_LENGTH = 14
+
+# Shared memory block name prefix
+if _USE_POSIX:
+ _SHM_NAME_PREFIX = '/psm_'
+else:
+ _SHM_NAME_PREFIX = 'wnsm_'
+
+
+def _make_filename():
+ "Create a random filename for the shared memory object."
+ # number of random bytes to use for name
+ nbytes = (_SHM_SAFE_NAME_LENGTH - len(_SHM_NAME_PREFIX)) // 2
+ assert nbytes >= 2, '_SHM_NAME_PREFIX too long'
+ name = _SHM_NAME_PREFIX + secrets.token_hex(nbytes)
+ assert len(name) <= _SHM_SAFE_NAME_LENGTH
+ return name
+
+
+class SharedMemory:
+ """Creates a new shared memory block or attaches to an existing
+ shared memory block.
+
+ Every shared memory block is assigned a unique name. This enables
+ one process to create a shared memory block with a particular name
+ so that a different process can attach to that same shared memory
+ block using that same name.
+
+ As a resource for sharing data across processes, shared memory blocks
+ may outlive the original process that created them. When one process
+ no longer needs access to a shared memory block that might still be
+ needed by other processes, the close() method should be called.
+ When a shared memory block is no longer needed by any process, the
+ unlink() method should be called to ensure proper cleanup."""
+
+ # Defaults; enables close() and unlink() to run without errors.
+ _name = None
+ _fd = -1
+ _mmap = None
+ _buf = None
+ _flags = os.O_RDWR
+ _mode = 0o600
+ _prepend_leading_slash = True if _USE_POSIX else False
+
+ def __init__(self, name=None, create=False, size=0):
+ if not size >= 0:
+ raise ValueError("'size' must be a positive integer")
+ if create:
+ self._flags = _O_CREX | os.O_RDWR
+ if size == 0:
+ raise ValueError("'size' must be a positive number different from zero")
+ if name is None and not self._flags & os.O_EXCL:
+ raise ValueError("'name' can only be None if create=True")
+
+ if _USE_POSIX:
+
+ # POSIX Shared Memory
+
+ if name is None:
+ while True:
+ name = _make_filename()
+ try:
+ self._fd = _posixshmem.shm_open(
+ name,
+ self._flags,
+ mode=self._mode
+ )
+ except FileExistsError:
+ continue
+ self._name = name
+ break
+ else:
+ name = "/" + name if self._prepend_leading_slash else name
+ self._fd = _posixshmem.shm_open(
+ name,
+ self._flags,
+ mode=self._mode
+ )
+ self._name = name
+ try:
+ if create and size:
+ os.ftruncate(self._fd, size)
+ stats = os.fstat(self._fd)
+ size = stats.st_size
+ self._mmap = mmap.mmap(self._fd, size)
+ except OSError:
+ self.unlink()
+ raise
+
+ from .resource_tracker import register
+ register(self._name, "shared_memory")
+
+ else:
+
+ # Windows Named Shared Memory
+
+ if create:
+ while True:
+ temp_name = _make_filename() if name is None else name
+ # Create and reserve shared memory block with this name
+ # until it can be attached to by mmap.
+ h_map = _winapi.CreateFileMapping(
+ _winapi.INVALID_HANDLE_VALUE,
+ _winapi.NULL,
+ _winapi.PAGE_READWRITE,
+ (size >> 32) & 0xFFFFFFFF,
+ size & 0xFFFFFFFF,
+ temp_name
+ )
+ try:
+ last_error_code = _winapi.GetLastError()
+ if last_error_code == _winapi.ERROR_ALREADY_EXISTS:
+ if name is not None:
+ raise FileExistsError(
+ errno.EEXIST,
+ os.strerror(errno.EEXIST),
+ name,
+ _winapi.ERROR_ALREADY_EXISTS
+ )
+ else:
+ continue
+ self._mmap = mmap.mmap(-1, size, tagname=temp_name)
+ finally:
+ _winapi.CloseHandle(h_map)
+ self._name = temp_name
+ break
+
+ else:
+ self._name = name
+ # Dynamically determine the existing named shared memory
+ # block's size which is likely a multiple of mmap.PAGESIZE.
+ h_map = _winapi.OpenFileMapping(
+ _winapi.FILE_MAP_READ,
+ False,
+ name
+ )
+ try:
+ p_buf = _winapi.MapViewOfFile(
+ h_map,
+ _winapi.FILE_MAP_READ,
+ 0,
+ 0,
+ 0
+ )
+ finally:
+ _winapi.CloseHandle(h_map)
+ size = _winapi.VirtualQuerySize(p_buf)
+ self._mmap = mmap.mmap(-1, size, tagname=name)
+
+ self._size = size
+ self._buf = memoryview(self._mmap)
+
+ def __del__(self):
+ try:
+ self.close()
+ except OSError:
+ pass
+
+ def __reduce__(self):
+ return (
+ self.__class__,
+ (
+ self.name,
+ False,
+ self.size,
+ ),
+ )
+
+ def __repr__(self):
+ return f'{self.__class__.__name__}({self.name!r}, size={self.size})'
+
+ @property
+ def buf(self):
+ "A memoryview of contents of the shared memory block."
+ return self._buf
+
+ @property
+ def name(self):
+ "Unique name that identifies the shared memory block."
+ reported_name = self._name
+ if _USE_POSIX and self._prepend_leading_slash:
+ if self._name.startswith("/"):
+ reported_name = self._name[1:]
+ return reported_name
+
+ @property
+ def size(self):
+ "Size in bytes."
+ return self._size
+
+ def close(self):
+ """Closes access to the shared memory from this instance but does
+ not destroy the shared memory block."""
+ if self._buf is not None:
+ self._buf.release()
+ self._buf = None
+ if self._mmap is not None:
+ self._mmap.close()
+ self._mmap = None
+ if _USE_POSIX and self._fd >= 0:
+ os.close(self._fd)
+ self._fd = -1
+
+ def unlink(self):
+ """Requests that the underlying shared memory block be destroyed.
+
+ In order to ensure proper cleanup of resources, unlink should be
+ called once (and only once) across all processes which have access
+ to the shared memory block."""
+ if _USE_POSIX and self._name:
+ from .resource_tracker import unregister
+ _posixshmem.shm_unlink(self._name)
+ unregister(self._name, "shared_memory")
+
+
+_encoding = "utf8"
+
+class ShareableList:
+ """Pattern for a mutable list-like object shareable via a shared
+ memory block. It differs from the built-in list type in that these
+ lists can not change their overall length (i.e. no append, insert,
+ etc.)
+
+ Because values are packed into a memoryview as bytes, the struct
+ packing format for any storable value must require no more than 8
+ characters to describe its format."""
+
+ # The shared memory area is organized as follows:
+ # - 8 bytes: number of items (N) as a 64-bit integer
+ # - (N + 1) * 8 bytes: offsets of each element from the start of the
+ # data area
+ # - K bytes: the data area storing item values (with encoding and size
+ # depending on their respective types)
+ # - N * 8 bytes: `struct` format string for each element
+ # - N bytes: index into _back_transforms_mapping for each element
+ # (for reconstructing the corresponding Python value)
+ _types_mapping = {
+ int: "q",
+ float: "d",
+ bool: "xxxxxxx?",
+ str: "%ds",
+ bytes: "%ds",
+ None.__class__: "xxxxxx?x",
+ }
+ _alignment = 8
+ _back_transforms_mapping = {
+ 0: lambda value: value, # int, float, bool
+ 1: lambda value: value.rstrip(b'\x00').decode(_encoding), # str
+ 2: lambda value: value.rstrip(b'\x00'), # bytes
+ 3: lambda _value: None, # None
+ }
+
+ @staticmethod
+ def _extract_recreation_code(value):
+ """Used in concert with _back_transforms_mapping to convert values
+ into the appropriate Python objects when retrieving them from
+ the list as well as when storing them."""
+ if not isinstance(value, (str, bytes, None.__class__)):
+ return 0
+ elif isinstance(value, str):
+ return 1
+ elif isinstance(value, bytes):
+ return 2
+ else:
+ return 3 # NoneType
+
+ def __init__(self, sequence=None, *, name=None):
+ if name is None or sequence is not None:
+ sequence = sequence or ()
+ _formats = [
+ self._types_mapping[type(item)]
+ if not isinstance(item, (str, bytes))
+ else self._types_mapping[type(item)] % (
+ self._alignment * (len(item) // self._alignment + 1),
+ )
+ for item in sequence
+ ]
+ self._list_len = len(_formats)
+ assert sum(len(fmt) <= 8 for fmt in _formats) == self._list_len
+ offset = 0
+ # The offsets of each list element into the shared memory's
+ # data area (0 meaning the start of the data area, not the start
+ # of the shared memory area).
+ self._allocated_offsets = [0]
+ for fmt in _formats:
+ offset += self._alignment if fmt[-1] != "s" else int(fmt[:-1])
+ self._allocated_offsets.append(offset)
+ _recreation_codes = [
+ self._extract_recreation_code(item) for item in sequence
+ ]
+ requested_size = struct.calcsize(
+ "q" + self._format_size_metainfo +
+ "".join(_formats) +
+ self._format_packing_metainfo +
+ self._format_back_transform_codes
+ )
+
+ self.shm = SharedMemory(name, create=True, size=requested_size)
+ else:
+ self.shm = SharedMemory(name)
+
+ if sequence is not None:
+ _enc = _encoding
+ struct.pack_into(
+ "q" + self._format_size_metainfo,
+ self.shm.buf,
+ 0,
+ self._list_len,
+ *(self._allocated_offsets)
+ )
+ struct.pack_into(
+ "".join(_formats),
+ self.shm.buf,
+ self._offset_data_start,
+ *(v.encode(_enc) if isinstance(v, str) else v for v in sequence)
+ )
+ struct.pack_into(
+ self._format_packing_metainfo,
+ self.shm.buf,
+ self._offset_packing_formats,
+ *(v.encode(_enc) for v in _formats)
+ )
+ struct.pack_into(
+ self._format_back_transform_codes,
+ self.shm.buf,
+ self._offset_back_transform_codes,
+ *(_recreation_codes)
+ )
+
+ else:
+ self._list_len = len(self) # Obtains size from offset 0 in buffer.
+ self._allocated_offsets = list(
+ struct.unpack_from(
+ self._format_size_metainfo,
+ self.shm.buf,
+ 1 * 8
+ )
+ )
+
+ def _get_packing_format(self, position):
+ "Gets the packing format for a single value stored in the list."
+ position = position if position >= 0 else position + self._list_len
+ if (position >= self._list_len) or (self._list_len < 0):
+ raise IndexError("Requested position out of range.")
+
+ v = struct.unpack_from(
+ "8s",
+ self.shm.buf,
+ self._offset_packing_formats + position * 8
+ )[0]
+ fmt = v.rstrip(b'\x00')
+ fmt_as_str = fmt.decode(_encoding)
+
+ return fmt_as_str
+
+ def _get_back_transform(self, position):
+ "Gets the back transformation function for a single value."
+
+ if (position >= self._list_len) or (self._list_len < 0):
+ raise IndexError("Requested position out of range.")
+
+ transform_code = struct.unpack_from(
+ "b",
+ self.shm.buf,
+ self._offset_back_transform_codes + position
+ )[0]
+ transform_function = self._back_transforms_mapping[transform_code]
+
+ return transform_function
+
+ def _set_packing_format_and_transform(self, position, fmt_as_str, value):
+ """Sets the packing format and back transformation code for a
+ single value in the list at the specified position."""
+
+ if (position >= self._list_len) or (self._list_len < 0):
+ raise IndexError("Requested position out of range.")
+
+ struct.pack_into(
+ "8s",
+ self.shm.buf,
+ self._offset_packing_formats + position * 8,
+ fmt_as_str.encode(_encoding)
+ )
+
+ transform_code = self._extract_recreation_code(value)
+ struct.pack_into(
+ "b",
+ self.shm.buf,
+ self._offset_back_transform_codes + position,
+ transform_code
+ )
+
+ def __getitem__(self, position):
+ position = position if position >= 0 else position + self._list_len
+ try:
+ offset = self._offset_data_start + self._allocated_offsets[position]
+ (v,) = struct.unpack_from(
+ self._get_packing_format(position),
+ self.shm.buf,
+ offset
+ )
+ except IndexError:
+ raise IndexError("index out of range")
+
+ back_transform = self._get_back_transform(position)
+ v = back_transform(v)
+
+ return v
+
+ def __setitem__(self, position, value):
+ position = position if position >= 0 else position + self._list_len
+ try:
+ item_offset = self._allocated_offsets[position]
+ offset = self._offset_data_start + item_offset
+ current_format = self._get_packing_format(position)
+ except IndexError:
+ raise IndexError("assignment index out of range")
+
+ if not isinstance(value, (str, bytes)):
+ new_format = self._types_mapping[type(value)]
+ encoded_value = value
+ else:
+ allocated_length = self._allocated_offsets[position + 1] - item_offset
+
+ encoded_value = (value.encode(_encoding)
+ if isinstance(value, str) else value)
+ if len(encoded_value) > allocated_length:
+ raise ValueError("bytes/str item exceeds available storage")
+ if current_format[-1] == "s":
+ new_format = current_format
+ else:
+ new_format = self._types_mapping[str] % (
+ allocated_length,
+ )
+
+ self._set_packing_format_and_transform(
+ position,
+ new_format,
+ value
+ )
+ struct.pack_into(new_format, self.shm.buf, offset, encoded_value)
+
+ def __reduce__(self):
+ return partial(self.__class__, name=self.shm.name), ()
+
+ def __len__(self):
+ return struct.unpack_from("q", self.shm.buf, 0)[0]
+
+ def __repr__(self):
+ return f'{self.__class__.__name__}({list(self)}, name={self.shm.name!r})'
+
+ @property
+ def format(self):
+ "The struct packing format used by all currently stored items."
+ return "".join(
+ self._get_packing_format(i) for i in range(self._list_len)
+ )
+
+ @property
+ def _format_size_metainfo(self):
+ "The struct packing format used for the items' storage offsets."
+ return "q" * (self._list_len + 1)
+
+ @property
+ def _format_packing_metainfo(self):
+ "The struct packing format used for the items' packing formats."
+ return "8s" * self._list_len
+
+ @property
+ def _format_back_transform_codes(self):
+ "The struct packing format used for the items' back transforms."
+ return "b" * self._list_len
+
+ @property
+ def _offset_data_start(self):
+ # - 8 bytes for the list length
+ # - (N + 1) * 8 bytes for the element offsets
+ return (self._list_len + 2) * 8
+
+ @property
+ def _offset_packing_formats(self):
+ return self._offset_data_start + self._allocated_offsets[-1]
+
+ @property
+ def _offset_back_transform_codes(self):
+ return self._offset_packing_formats + self._list_len * 8
+
+ def count(self, value):
+ "L.count(value) -> integer -- return number of occurrences of value."
+
+ return sum(value == entry for entry in self)
+
+ def index(self, value):
+ """L.index(value) -> integer -- return first index of value.
+ Raises ValueError if the value is not present."""
+
+ for position, entry in enumerate(self):
+ if value == entry:
+ return position
+ else:
+ raise ValueError(f"{value!r} not in this container")
+
+ __class_getitem__ = classmethod(types.GenericAlias)