opt
/
alt
/
python310
/
lib64
/
python3.10
/
unittest
/
Go to Home Directory
+
Upload
Create File
root@0UT1S:~$
Execute
By Order of Mr.0UT1S
[DIR] ..
N/A
[DIR] __pycache__
N/A
__init__.py
3.67 KB
Rename
Delete
__main__.py
472 bytes
Rename
Delete
_log.py
2.68 KB
Rename
Delete
async_case.py
6.22 KB
Rename
Delete
case.py
56.33 KB
Rename
Delete
loader.py
22.17 KB
Rename
Delete
main.py
10.99 KB
Rename
Delete
mock.py
100.86 KB
Rename
Delete
result.py
8.32 KB
Rename
Delete
runner.py
7.86 KB
Rename
Delete
signals.py
2.35 KB
Rename
Delete
suite.py
13.20 KB
Rename
Delete
util.py
5.09 KB
Rename
Delete
import asyncio import inspect from .case import TestCase class IsolatedAsyncioTestCase(TestCase): # Names intentionally have a long prefix # to reduce a chance of clashing with user-defined attributes # from inherited test case # # The class doesn't call loop.run_until_complete(self.setUp()) and family # but uses a different approach: # 1. create a long-running task that reads self.setUp() # awaitable from queue along with a future # 2. await the awaitable object passing in and set the result # into the future object # 3. Outer code puts the awaitable and the future object into a queue # with waiting for the future # The trick is necessary because every run_until_complete() call # creates a new task with embedded ContextVar context. # To share contextvars between setUp(), test and tearDown() we need to execute # them inside the same task. # Note: the test case modifies event loop policy if the policy was not instantiated # yet. # asyncio.get_event_loop_policy() creates a default policy on demand but never # returns None # I believe this is not an issue in user level tests but python itself for testing # should reset a policy in every test module # by calling asyncio.set_event_loop_policy(None) in tearDownModule() def __init__(self, methodName='runTest'): super().__init__(methodName) self._asyncioTestLoop = None self._asyncioCallsQueue = None async def asyncSetUp(self): pass async def asyncTearDown(self): pass def addAsyncCleanup(self, func, /, *args, **kwargs): # A trivial trampoline to addCleanup() # the function exists because it has a different semantics # and signature: # addCleanup() accepts regular functions # but addAsyncCleanup() accepts coroutines # # We intentionally don't add inspect.iscoroutinefunction() check # for func argument because there is no way # to check for async function reliably: # 1. It can be "async def func()" itself # 2. Class can implement "async def __call__()" method # 3. Regular "def func()" that returns awaitable object self.addCleanup(*(func, *args), **kwargs) def _callSetUp(self): self.setUp() self._callAsync(self.asyncSetUp) def _callTestMethod(self, method): self._callMaybeAsync(method) def _callTearDown(self): self._callAsync(self.asyncTearDown) self.tearDown() def _callCleanup(self, function, *args, **kwargs): self._callMaybeAsync(function, *args, **kwargs) def _callAsync(self, func, /, *args, **kwargs): assert self._asyncioTestLoop is not None, 'asyncio test loop is not initialized' ret = func(*args, **kwargs) assert inspect.isawaitable(ret), f'{func!r} returned non-awaitable' fut = self._asyncioTestLoop.create_future() self._asyncioCallsQueue.put_nowait((fut, ret)) return self._asyncioTestLoop.run_until_complete(fut) def _callMaybeAsync(self, func, /, *args, **kwargs): assert self._asyncioTestLoop is not None, 'asyncio test loop is not initialized' ret = func(*args, **kwargs) if inspect.isawaitable(ret): fut = self._asyncioTestLoop.create_future() self._asyncioCallsQueue.put_nowait((fut, ret)) return self._asyncioTestLoop.run_until_complete(fut) else: return ret async def _asyncioLoopRunner(self, fut): self._asyncioCallsQueue = queue = asyncio.Queue() fut.set_result(None) while True: query = await queue.get() queue.task_done() if query is None: return fut, awaitable = query try: ret = await awaitable if not fut.cancelled(): fut.set_result(ret) except (SystemExit, KeyboardInterrupt): raise except (BaseException, asyncio.CancelledError) as ex: if not fut.cancelled(): fut.set_exception(ex) def _setupAsyncioLoop(self): assert self._asyncioTestLoop is None, 'asyncio test loop already initialized' loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.set_debug(True) self._asyncioTestLoop = loop fut = loop.create_future() self._asyncioCallsTask = loop.create_task(self._asyncioLoopRunner(fut)) loop.run_until_complete(fut) def _tearDownAsyncioLoop(self): assert self._asyncioTestLoop is not None, 'asyncio test loop is not initialized' loop = self._asyncioTestLoop self._asyncioTestLoop = None self._asyncioCallsQueue.put_nowait(None) loop.run_until_complete(self._asyncioCallsQueue.join()) try: # cancel all tasks to_cancel = asyncio.all_tasks(loop) if not to_cancel: return for task in to_cancel: task.cancel() loop.run_until_complete( asyncio.gather(*to_cancel, return_exceptions=True)) for task in to_cancel: if task.cancelled(): continue if task.exception() is not None: loop.call_exception_handler({ 'message': 'unhandled exception during test shutdown', 'exception': task.exception(), 'task': task, }) # shutdown asyncgens loop.run_until_complete(loop.shutdown_asyncgens()) finally: # Prevent our executor environment from leaking to future tests. loop.run_until_complete(loop.shutdown_default_executor()) asyncio.set_event_loop(None) loop.close() def run(self, result=None): self._setupAsyncioLoop() try: return super().run(result) finally: self._tearDownAsyncioLoop() def debug(self): self._setupAsyncioLoop() super().debug() self._tearDownAsyncioLoop() def __del__(self): if self._asyncioTestLoop is not None: self._tearDownAsyncioLoop()
Save