File Explorer

/proc/self/root/proc/thread-self/root/proc/self/task/17/root/lib64/python3.9/asyncio

This explorer reads the filesystem of the server it runs on, so /workspace/user isn't present here. Browsing and the terminal still work against this server's own disk from /.

__main__.py3.3 KB · 128 lines
import astimport asyncioimport codeimport concurrent.futuresimport inspectimport sysimport threadingimport typesimport warnings from . import futures  class AsyncIOInteractiveConsole(code.InteractiveConsole):     def __init__(self, locals, loop):        super().__init__(locals)        self.compile.compiler.flags |= ast.PyCF_ALLOW_TOP_LEVEL_AWAIT         self.loop = loop     def runcode(self, code):        future = concurrent.futures.Future()         def callback():            global repl_future            global repl_future_interrupted             repl_future = None            repl_future_interrupted = False             func = types.FunctionType(code, self.locals)            try:                coro = func()            except SystemExit:                raise            except KeyboardInterrupt as ex:                repl_future_interrupted = True                future.set_exception(ex)                return            except BaseException as ex:                future.set_exception(ex)                return             if not inspect.iscoroutine(coro):                future.set_result(coro)                return             try:                repl_future = self.loop.create_task(coro)                futures._chain_future(repl_future, future)            except BaseException as exc:                future.set_exception(exc)         loop.call_soon_threadsafe(callback)         try:            return future.result()        except SystemExit:            raise        except BaseException:            if repl_future_interrupted:                self.write("\nKeyboardInterrupt\n")            else:                self.showtraceback()  class REPLThread(threading.Thread):     def run(self):        try:            banner = (                f'asyncio REPL {sys.version} on {sys.platform}\n'                f'Use "await" directly instead of "asyncio.run()".\n'                f'Type "help", "copyright", "credits" or "license" '                f'for more information.\n'                f'{getattr(sys, "ps1", ">>> ")}import asyncio'            )             console.interact(                banner=banner,                exitmsg='exiting asyncio REPL...')        finally:            warnings.filterwarnings(                'ignore',                message=r'^coroutine .* was never awaited$',                category=RuntimeWarning)             loop.call_soon_threadsafe(loop.stop)  if __name__ == '__main__':    sys.audit("cpython.run_stdin")     loop = asyncio.new_event_loop()    asyncio.set_event_loop(loop)     repl_locals = {'asyncio': asyncio}    for key in {'__name__', '__package__',                '__loader__', '__spec__',                '__builtins__', '__file__'}:        repl_locals[key] = locals()[key]     console = AsyncIOInteractiveConsole(repl_locals, loop)     repl_future = None    repl_future_interrupted = False     try:        import readline  # NoQA    except ImportError:        pass     repl_thread = REPLThread()    repl_thread.daemon = True    repl_thread.start()     while True:        try:            loop.run_forever()        except KeyboardInterrupt:            if repl_future and not repl_future.done():                repl_future.cancel()                repl_future_interrupted = True            continue        else:            break