File Explorer

/proc/self/root/proc/self/root/proc/thread-self/root/lib64/python3.9/concurrent/futures

This explorer reads the filesystem of the server it runs on, so /workspace/user isn't present here. Browsing and the terminal still work against this server's own disk from /.

thread.py8.6 KB · 237 lines
# Copyright 2009 Brian Quinlan. All Rights Reserved.# Licensed to PSF under a Contributor Agreement. """Implements ThreadPoolExecutor.""" __author__ = 'Brian Quinlan (brian@sweetapp.com)' from concurrent.futures import _baseimport itertoolsimport queueimport threadingimport typesimport weakrefimport os  _threads_queues = weakref.WeakKeyDictionary()_shutdown = False# Lock that ensures that new workers are not created while the interpreter is# shutting down. Must be held while mutating _threads_queues and _shutdown._global_shutdown_lock = threading.Lock() def _python_exit():    global _shutdown    with _global_shutdown_lock:        _shutdown = True    items = list(_threads_queues.items())    for t, q in items:        q.put(None)    for t, q in items:        t.join() # Register for `_python_exit()` to be called just before joining all# non-daemon threads. This is used instead of `atexit.register()` for# compatibility with subinterpreters, which no longer support daemon threads.# See bpo-39812 for context.threading._register_atexit(_python_exit) # At fork, reinitialize the `_global_shutdown_lock` lock in the child processif hasattr(os, 'register_at_fork'):    os.register_at_fork(before=_global_shutdown_lock.acquire,                        after_in_child=_global_shutdown_lock._at_fork_reinit,                        after_in_parent=_global_shutdown_lock.release)  class _WorkItem(object):    def __init__(self, future, fn, args, kwargs):        self.future = future        self.fn = fn        self.args = args        self.kwargs = kwargs     def run(self):        if not self.future.set_running_or_notify_cancel():            return         try:            result = self.fn(*self.args, **self.kwargs)        except BaseException as exc:            self.future.set_exception(exc)            # Break a reference cycle with the exception 'exc'            self = None        else:            self.future.set_result(result)     __class_getitem__ = classmethod(types.GenericAlias)  def _worker(executor_reference, work_queue, initializer, initargs):    if initializer is not None:        try:            initializer(*initargs)        except BaseException:            _base.LOGGER.critical('Exception in initializer:', exc_info=True)            executor = executor_reference()            if executor is not None:                executor._initializer_failed()            return    try:        while True:            work_item = work_queue.get(block=True)            if work_item is not None:                work_item.run()                # Delete references to object. See issue16284                del work_item                 # attempt to increment idle count                executor = executor_reference()                if executor is not None:                    executor._idle_semaphore.release()                del executor                continue             executor = executor_reference()            # Exit if:            #   - The interpreter is shutting down OR            #   - The executor that owns the worker has been collected OR            #   - The executor that owns the worker has been shutdown.            if _shutdown or executor is None or executor._shutdown:                # Flag the executor as shutting down as early as possible if it                # is not gc-ed yet.                if executor is not None:                    executor._shutdown = True                # Notice other workers                work_queue.put(None)                return            del executor    except BaseException:        _base.LOGGER.critical('Exception in worker', exc_info=True)  class BrokenThreadPool(_base.BrokenExecutor):    """    Raised when a worker thread in a ThreadPoolExecutor failed initializing.    """  class ThreadPoolExecutor(_base.Executor):     # Used to assign unique thread names when thread_name_prefix is not supplied.    _counter = itertools.count().__next__     def __init__(self, max_workers=None, thread_name_prefix='',                 initializer=None, initargs=()):        """Initializes a new ThreadPoolExecutor instance.         Args:            max_workers: The maximum number of threads that can be used to                execute the given calls.            thread_name_prefix: An optional name prefix to give our threads.            initializer: A callable used to initialize worker threads.            initargs: A tuple of arguments to pass to the initializer.        """        if max_workers is None:            # ThreadPoolExecutor is often used to:            # * CPU bound task which releases GIL            # * I/O bound task (which releases GIL, of course)            #            # We use cpu_count + 4 for both types of tasks.            # But we limit it to 32 to avoid consuming surprisingly large resource            # on many core machine.            max_workers = min(32, (os.cpu_count() or 1) + 4)        if max_workers <= 0:            raise ValueError("max_workers must be greater than 0")         if initializer is not None and not callable(initializer):            raise TypeError("initializer must be a callable")         self._max_workers = max_workers        self._work_queue = queue.SimpleQueue()        self._idle_semaphore = threading.Semaphore(0)        self._threads = set()        self._broken = False        self._shutdown = False        self._shutdown_lock = threading.Lock()        self._thread_name_prefix = (thread_name_prefix or                                    ("ThreadPoolExecutor-%d" % self._counter()))        self._initializer = initializer        self._initargs = initargs     def submit(self, fn, /, *args, **kwargs):        with self._shutdown_lock, _global_shutdown_lock:            if self._broken:                raise BrokenThreadPool(self._broken)             if self._shutdown:                raise RuntimeError('cannot schedule new futures after shutdown')            if _shutdown:                raise RuntimeError('cannot schedule new futures after '                                   'interpreter shutdown')             f = _base.Future()            w = _WorkItem(f, fn, args, kwargs)             self._work_queue.put(w)            self._adjust_thread_count()            return f    submit.__doc__ = _base.Executor.submit.__doc__     def _adjust_thread_count(self):        # if idle threads are available, don't spin new threads        if self._idle_semaphore.acquire(timeout=0):            return         # When the executor gets lost, the weakref callback will wake up        # the worker threads.        def weakref_cb(_, q=self._work_queue):            q.put(None)         num_threads = len(self._threads)        if num_threads < self._max_workers:            thread_name = '%s_%d' % (self._thread_name_prefix or self,                                     num_threads)            t = threading.Thread(name=thread_name, target=_worker,                                 args=(weakref.ref(self, weakref_cb),                                       self._work_queue,                                       self._initializer,                                       self._initargs))            t.start()            self._threads.add(t)            _threads_queues[t] = self._work_queue     def _initializer_failed(self):        with self._shutdown_lock:            self._broken = ('A thread initializer failed, the thread pool '                            'is not usable anymore')            # Drain work queue and mark pending futures failed            while True:                try:                    work_item = self._work_queue.get_nowait()                except queue.Empty:                    break                if work_item is not None:                    work_item.future.set_exception(BrokenThreadPool(self._broken))     def shutdown(self, wait=True, *, cancel_futures=False):        with self._shutdown_lock:            self._shutdown = True            if cancel_futures:                # Drain all work items from the queue, and then cancel their                # associated futures.                while True:                    try:                        work_item = self._work_queue.get_nowait()                    except queue.Empty:                        break                    if work_item is not None:                        work_item.future.cancel()             # Send a wake-up to prevent threads calling            # _work_queue.get(block=True) from permanently blocking.            self._work_queue.put(None)        if wait:            for t in self._threads:                t.join()    shutdown.__doc__ = _base.Executor.shutdown.__doc__