aboutsummaryrefslogblamecommitdiffstats
path: root/contrib/python/Twisted/py3/twisted/_threads/_memory.py
blob: 5483b8e57704496660b155a888282b6a59120daf (plain) (tree)
1
2
3
4
5
6
7
8






                                                            


                                    

                                      
                              




                                       
 
                        
 
                              






                                                         


                                                                                        




                                 
                                                     






                                                                        
                           





                                        
                                                                    




                                                                             
       
                          
                               
                                 
                        
                              


                            
# -*- test-case-name: twisted._threads.test.test_memory -*-
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
Implementation of an in-memory worker that defers execution.
"""

from __future__ import annotations

from enum import Enum, auto
from typing import Callable, Literal

from zope.interface import implementer

from ._convenience import Quit
from ._ithreads import IExclusiveWorker


class NoMore(Enum):
    Work = auto()


NoMoreWork = NoMore.Work


@implementer(IExclusiveWorker)
class MemoryWorker:
    """
    An L{IWorker} that queues work for later performance.

    @ivar _quit: a flag indicating
    @type _quit: L{Quit}
    """

    def __init__(
        self,
        pending: Callable[[], list[Callable[[], object] | Literal[NoMore.Work]]] = list,
    ) -> None:
        """
        Create a L{MemoryWorker}.
        """
        self._quit = Quit()
        self._pending = pending()

    def do(self, work: Callable[[], object]) -> None:
        """
        Queue some work for to perform later; see L{createMemoryWorker}.

        @param work: The work to perform.
        """
        self._quit.check()
        self._pending.append(work)

    def quit(self) -> None:
        """
        Quit this worker.
        """
        self._quit.set()
        self._pending.append(NoMoreWork)


def createMemoryWorker() -> tuple[MemoryWorker, Callable[[], bool]]:
    """
    Create an L{IWorker} that does nothing but defer work, to be performed
    later.

    @return: a worker that will enqueue work to perform later, and a callable
        that will perform one element of that work.
    """

    def perform() -> bool:
        if not worker._pending:
            return False
        peek = worker._pending[0]
        if peek is NoMoreWork:
            return False
        worker._pending.pop(0)
        peek()
        return True

    worker = MemoryWorker()
    return (worker, perform)