blob: 5483b8e57704496660b155a888282b6a59120daf (
plain) (
tree)
|
|
# -*- test-case-name: twisted._threads.test.test_memory -*-
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Implementation of an in-memory worker that defers execution.
"""
from __future__ import annotations
from enum import Enum, auto
from typing import Callable, Literal
from zope.interface import implementer
from ._convenience import Quit
from ._ithreads import IExclusiveWorker
class NoMore(Enum):
Work = auto()
NoMoreWork = NoMore.Work
@implementer(IExclusiveWorker)
class MemoryWorker:
"""
An L{IWorker} that queues work for later performance.
@ivar _quit: a flag indicating
@type _quit: L{Quit}
"""
def __init__(
self,
pending: Callable[[], list[Callable[[], object] | Literal[NoMore.Work]]] = list,
) -> None:
"""
Create a L{MemoryWorker}.
"""
self._quit = Quit()
self._pending = pending()
def do(self, work: Callable[[], object]) -> None:
"""
Queue some work for to perform later; see L{createMemoryWorker}.
@param work: The work to perform.
"""
self._quit.check()
self._pending.append(work)
def quit(self) -> None:
"""
Quit this worker.
"""
self._quit.set()
self._pending.append(NoMoreWork)
def createMemoryWorker() -> tuple[MemoryWorker, Callable[[], bool]]:
"""
Create an L{IWorker} that does nothing but defer work, to be performed
later.
@return: a worker that will enqueue work to perform later, and a callable
that will perform one element of that work.
"""
def perform() -> bool:
if not worker._pending:
return False
peek = worker._pending[0]
if peek is NoMoreWork:
return False
worker._pending.pop(0)
peek()
return True
worker = MemoryWorker()
return (worker, perform)
|