aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/Twisted/py3/twisted/_threads/_pool.py
blob: 99c055d2404e8f686277f6f547dd8447bac526af (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# -*- test-case-name: twisted._threads.test -*-
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
Top level thread pool interface, used to implement
L{twisted.python.threadpool}.
"""


from queue import Queue
from threading import Lock, Thread, local as LocalStorage
from typing import Callable, Optional

from typing_extensions import Protocol

from twisted.python.log import err
from ._ithreads import IWorker
from ._team import Team
from ._threadworker import LockWorker, ThreadWorker


class _ThreadFactory(Protocol):
    def __call__(self, *, target: Callable[..., object]) -> Thread:
        ...


def pool(
    currentLimit: Callable[[], int], threadFactory: _ThreadFactory = Thread
) -> Team:
    """
    Construct a L{Team} that spawns threads as a thread pool, with the given
    limiting function.

    @note: Future maintainers: while the public API for the eventual move to
        twisted.threads should look I{something} like this, and while this
        function is necessary to implement the API described by
        L{twisted.python.threadpool}, I am starting to think the idea of a hard
        upper limit on threadpool size is just bad (turning memory performance
        issues into correctness issues well before we run into memory
        pressure), and instead we should build something with reactor
        integration for slowly releasing idle threads when they're not needed
        and I{rate} limiting the creation of new threads rather than just
        hard-capping it.

    @param currentLimit: a callable that returns the current limit on the
        number of workers that the returned L{Team} should create; if it
        already has more workers than that value, no new workers will be
        created.
    @type currentLimit: 0-argument callable returning L{int}

    @param threadFactory: Factory that, when given a C{target} keyword argument,
        returns a L{threading.Thread} that will run that target.
    @type threadFactory: callable returning a L{threading.Thread}

    @return: a new L{Team}.
    """

    def startThread(target: Callable[..., object]) -> None:
        return threadFactory(target=target).start()

    def limitedWorkerCreator() -> Optional[IWorker]:
        stats = team.statistics()
        if stats.busyWorkerCount + stats.idleWorkerCount >= currentLimit():
            return None
        return ThreadWorker(startThread, Queue())

    team = Team(
        coordinator=LockWorker(Lock(), LocalStorage()),
        createWorker=limitedWorkerCreator,
        logException=err,
    )
    return team