File Explorer

/proc/self/root/proc/self/root/usr/lib64/python3.9/unittest

This explorer reads the filesystem of the server it runs on, so /workspace/user isn't present here. Browsing and the terminal still work against this server's own disk from /.

async_case.py6.2 KB · 171 lines
import asyncioimport inspect from .case import TestCase  class IsolatedAsyncioTestCase(TestCase):    # Names intentionally have a long prefix    # to reduce a chance of clashing with user-defined attributes    # from inherited test case    #    # The class doesn't call loop.run_until_complete(self.setUp()) and family    # but uses a different approach:    # 1. create a long-running task that reads self.setUp()    #    awaitable from queue along with a future    # 2. await the awaitable object passing in and set the result    #    into the future object    # 3. Outer code puts the awaitable and the future object into a queue    #    with waiting for the future    # The trick is necessary because every run_until_complete() call    # creates a new task with embedded ContextVar context.    # To share contextvars between setUp(), test and tearDown() we need to execute    # them inside the same task.     # Note: the test case modifies event loop policy if the policy was not instantiated    # yet.    # asyncio.get_event_loop_policy() creates a default policy on demand but never    # returns None    # I believe this is not an issue in user level tests but python itself for testing    # should reset a policy in every test module    # by calling asyncio.set_event_loop_policy(None) in tearDownModule()     def __init__(self, methodName='runTest'):        super().__init__(methodName)        self._asyncioTestLoop = None        self._asyncioCallsQueue = None     async def asyncSetUp(self):        pass     async def asyncTearDown(self):        pass     def addAsyncCleanup(self, func, /, *args, **kwargs):        # A trivial trampoline to addCleanup()        # the function exists because it has a different semantics        # and signature:        # addCleanup() accepts regular functions        # but addAsyncCleanup() accepts coroutines        #        # We intentionally don't add inspect.iscoroutinefunction() check        # for func argument because there is no way        # to check for async function reliably:        # 1. It can be "async def func()" itself        # 2. Class can implement "async def __call__()" method        # 3. Regular "def func()" that returns awaitable object        self.addCleanup(*(func, *args), **kwargs)     def _callSetUp(self):        self.setUp()        self._callAsync(self.asyncSetUp)     def _callTestMethod(self, method):        self._callMaybeAsync(method)     def _callTearDown(self):        self._callAsync(self.asyncTearDown)        self.tearDown()     def _callCleanup(self, function, *args, **kwargs):        self._callMaybeAsync(function, *args, **kwargs)     def _callAsync(self, func, /, *args, **kwargs):        assert self._asyncioTestLoop is not None, 'asyncio test loop is not initialized'        ret = func(*args, **kwargs)        assert inspect.isawaitable(ret), f'{func!r} returned non-awaitable'        fut = self._asyncioTestLoop.create_future()        self._asyncioCallsQueue.put_nowait((fut, ret))        return self._asyncioTestLoop.run_until_complete(fut)     def _callMaybeAsync(self, func, /, *args, **kwargs):        assert self._asyncioTestLoop is not None, 'asyncio test loop is not initialized'        ret = func(*args, **kwargs)        if inspect.isawaitable(ret):            fut = self._asyncioTestLoop.create_future()            self._asyncioCallsQueue.put_nowait((fut, ret))            return self._asyncioTestLoop.run_until_complete(fut)        else:            return ret     async def _asyncioLoopRunner(self, fut):        self._asyncioCallsQueue = queue = asyncio.Queue()        fut.set_result(None)        while True:            query = await queue.get()            queue.task_done()            if query is None:                return            fut, awaitable = query            try:                ret = await awaitable                if not fut.cancelled():                    fut.set_result(ret)            except (SystemExit, KeyboardInterrupt):                raise            except (BaseException, asyncio.CancelledError) as ex:                if not fut.cancelled():                    fut.set_exception(ex)     def _setupAsyncioLoop(self):        assert self._asyncioTestLoop is None, 'asyncio test loop already initialized'        loop = asyncio.new_event_loop()        asyncio.set_event_loop(loop)        loop.set_debug(True)        self._asyncioTestLoop = loop        fut = loop.create_future()        self._asyncioCallsTask = loop.create_task(self._asyncioLoopRunner(fut))        loop.run_until_complete(fut)     def _tearDownAsyncioLoop(self):        assert self._asyncioTestLoop is not None, 'asyncio test loop is not initialized'        loop = self._asyncioTestLoop        self._asyncioTestLoop = None        self._asyncioCallsQueue.put_nowait(None)        loop.run_until_complete(self._asyncioCallsQueue.join())         try:            # cancel all tasks            to_cancel = asyncio.all_tasks(loop)            if not to_cancel:                return             for task in to_cancel:                task.cancel()             loop.run_until_complete(                asyncio.gather(*to_cancel, return_exceptions=True))             for task in to_cancel:                if task.cancelled():                    continue                if task.exception() is not None:                    loop.call_exception_handler({                        'message': 'unhandled exception during test shutdown',                        'exception': task.exception(),                        'task': task,                    })            # shutdown asyncgens            loop.run_until_complete(loop.shutdown_asyncgens())        finally:            # Prevent our executor environment from leaking to future tests.            loop.run_until_complete(loop.shutdown_default_executor())            asyncio.set_event_loop(None)            loop.close()     def run(self, result=None):        self._setupAsyncioLoop()        try:            return super().run(result)        finally:            self._tearDownAsyncioLoop()     def debug(self):        self._setupAsyncioLoop()        super().debug()        self._tearDownAsyncioLoop()     def __del__(self):        if self._asyncioTestLoop is not None:            self._tearDownAsyncioLoop()