Source code for pykern.pkasyncio

"""Utilities to support `asyncio` and to simplify starting `tornado`.

See also `pkykern.api` which provides a higher level mechanism for
websocket clients and servers.

:copyright: Copyright (c) 2022-2025 RadiaSoft LLC.  All Rights Reserved.
:license: http://www.apache.org/licenses/LICENSE-2.0.html

"""

from pykern import pkconfig
from pykern import pkconst
from pykern.pkcollections import PKDict
from pykern.pkdebug import pkdlog, pkdp, pkdexc
import asyncio
import inspect
import queue
import re
import threading
import tornado.gen
import tornado.httpserver
import tornado.ioloop
import tornado.web

_cfg = None

_background_tasks = set()


[docs] class ActionLoop: """Processes actions in a loop fed by a `queue.Queue` running in `threading.Thread`. Useful to avoid blocking main loop in `asyncio` for compute or blocking libraries that do not support `asyncio`. Calling `action` is `asyncio` and thread safe. If the action methods transfer data back to the `asyncio` loop, use `asyncio.call_soon_threadsafe` on the appropriate `asyncio` loop. See discussion in `action`. """ #: sentinel to end loop in `_start` _LOOP_END = object() #: default is to wait forever for next action; subclasses may override _loop_timeout_secs = 0 def __init__(self): # All these attributes must exist even after destroy() self.destroyed = False self.__lock = threading.Lock() self.__actions = queue.Queue() # You can join the thread to block another (e.g. main) thread from exiting # until an ActionLoop is done. self.thread = threading.Thread(target=self.__target, daemon=True) self.__get_args = PKDict() if self._loop_timeout_secs > 0: if not hasattr(self, "_on_loop_timeout"): raise AssertionError( f"_loop_timeout_secs={self._loop_timeout_secs} and not _on_loop_timeout" ) self.__get_args.timeout = self._loop_timeout_secs self.thread.start()
[docs] def action(self, method, arg): """Queue ``method`` to be called in loop thread. Actions are methods that (by convention) begin with ``action_`` and are called sequentially inside `_start`. A lock is used to prevent `destroy` being called during the action and serializing activities within a single action. Actions return ``None`` to continue on to the next action. `_LOOP_END` should be returned to terminate `_start` (the loop) in which case no further actions are performed. Actions can return a callable that will be called inside the loop and outside the lock. These returned callables are known as external callbacks, that is, functions that may do anything so holding the lock could be problematic. The lock is managed by this class and subclasses should not need locking. Resources should be "handed off" to actions via `arg` passed to `method`, which can "return" the resource by returning a callback that gets called outside the lock but within the single thread of control that an `ActionLoop` represents. Read the `Go Channels Tutoral <https://go101.org/article/channel.html>`_ for more information about using queues for resource sharing without locks. Args: method (callable or str): a method or a name used to find a method: ``self.action_<method>`` arg (object): passed verbatim to ``method`` """ self.__actions.put_nowait( ( ( getattr(self, f"action_{method}") if isinstance(method, str) else method ), arg, ), )
[docs] def destroy(self): """Stops thread and calls subclass `_destroy` THREADING: subclasses should not call destroy directly. They should return `_LOOP_END` instead. External callbacks may call destroy, because _ActionLoop does not hold lock during external callbacks. """ try: with self.__lock: if self.destroyed: return self.destroyed = True self.__actions.put_nowait((None, None)) self._destroy() except Exception as e: pkdlog("error={} {} stack={}", e, self, pkdexc(simplify=True))
def _dispatch_action(self, method, arg): """Calls method with arg. Subclasses may re-implement. This function will remain a very simple wrapper for ``return method(arg)``. This function is called inside the lock. Args: method (callable): to be called arg (object): to be passed Returns: object: result of method """ return method(arg) def _dispatch_callback(self, callback): """Calls callback. Subclasses may re-implement. This method will remain a very simple wrapper for ``callback()``. This function is called outside the lock. Args: callback (callable): to be called """ callback() def _on_loop_timeout(self): """Called when a loop timeout occurs. Subclasses must implement this *if* they set `_loop_timeout_secs`. """ # `__init__` prevents this from happening, but good to document. raise NotImplementedError("ActionLoop._on_loop_timeout") def __repr__(self): def _destroyed(): return " DESTROYED" if self.destroyed else "" return f"<{self.__class__.__name__}{_destroyed()} self._repr()>" def _start(self): """Loops over actions and exits on `_LOOP_END` or on unhandled exception. See `action` for details of what actions are. Called by `__target`. Subclasses may override this method to setup the loop. """ while True: with self.__lock: if self.destroyed: return try: m, a = self.__actions.get(**self.__get_args) self.__actions.task_done() except queue.Empty: m, a = self._on_loop_timeout(), None with self.__lock: if self.destroyed: return # Do not need to check m, because only invalid when destroyed is True if (m := self._dispatch_action(m, a)) is self._LOOP_END: return # Will be true if destroy called inside action (m) if self.destroyed: return # Action returned an external callback, which must occur outside lock if m: self._dispatch_callback(m) def __target(self): """Thread's target function""" try: self._start() except Exception as e: pkdlog("error={} {} stack={}", e, self, pkdexc(simplify=True)) try: self._handle_exception(e) except Exception as e: pkdlog( "_handle_exception error={} {} stack={}", e, self, pkdexc(simplify=True), ) finally: self.destroy() def _handle_exception(self, exc): """Exception handler for `_start`. `_handle_exception` is called when there's an exception in `_start`. Subclasses may reimplement. Args: exc (Exception): Captured Exception. """ pass
[docs] class Loop: """HTTP Server loop""" def __init__(self): _init() self._coroutines = [] self.__http_server = False
[docs] def http_server(self, http_cfg): """Instantiate a tornado web server Under the covers Tornado uses the asyncio event loop so asyncio methods can be mixed with Tornado methods. Using asyncio methods, e.g. `asyncio.run`, is preferred over Tornado methods, e.g. `tornado.ioloop.IOLoop.current` to reduce dependency on Tornado. Using this module should allow the code to be portable to other http server frameworks. ``http_config.uri_map`` maps URI expressions to classes, which is passed directly to `tornado.web.Application`. Args: http_cfg (PKDict): quest_start, uri_map, debug, tcp_ip, tcp_port, """ async def _do(): # TODO(e-carlin): pull in the one in job_supervisor.py p = http_cfg.get("tcp_port", _cfg.server_port) i = http_cfg.get("tcp_ip", _cfg.server_ip) tornado.httpserver.HTTPServer( tornado.web.Application( http_cfg.uri_map, debug=http_cfg.get("debug", _cfg.debug), log_function=http_cfg.get("log_function", self.http_log), ), xheaders=True, ).listen(p, i) pkdlog("name={} ip={} port={}", http_cfg.get("name"), i, p) await asyncio.Event().wait() if self.__http_server: raise AssertionError("http_server may only be called once") self.__http_server = True self.run(_do())
[docs] def http_log(self, handler, which="end", fmt="", args=None): r = handler.request f = "{} ip={} uri={}" a = [which, self.remote_peer(r), r.uri] if fmt: f += " " + fmt a += args if which == "start": f += " proto={} {} ref={} ua={}" a += [ r.method, r.version, r.headers.get("Referer") or "", r.headers.get("User-Agent") or "", ] elif which == "end": f += " status={} ms={:.2f}" a += [ handler.get_status(), r.request_time() * 1000.0, ] pkdlog(f, *a)
[docs] def remote_peer(self, request): # https://github.com/tornadoweb/tornado/issues/2967#issuecomment-757370594 # implementation may change; Code in tornado.httputil check connection. if c := request.connection: # socket is not set on stream for websockets. if getattr(c, "stream", None) and (s := getattr(c.stream, "socket", None)): return "{}:{}".format(*s.getpeername()) i = request.headers.get("proxy-for", request.remote_ip) return f"{i}:0"
[docs] def run(self, *coros): for c in coros: if not inspect.iscoroutine(c): raise AssertionError(f"must be a coroutine arg={c} coros={coros}") try: asyncio.get_running_loop() except RuntimeError: self._coroutines.extend(coros) return raise AssertionError("cannot call after event loop has started")
[docs] def start(self): async def _do(): await asyncio.gather(*self._coroutines) if not self._coroutines: raise AssertionError("no coroutines registered; must have at least one") asyncio.run(_do(), debug=_cfg.debug)
[docs] @pkconfig.parse_none def cfg_ip(value): if value is None: return "0.0.0.0" if pkconfig.in_dev_mode() else pkconst.LOCALHOST_IP return value
[docs] def cfg_port(value): v = int(value) l = 3000 u = 32767 if not l <= v <= u: pkconfig.raise_error(f"value must be from {l} to {u}") return v
[docs] def create_task(coro): """Create a task Keeps a global reference to the task so to avoid the garbage collector running before the task is run. https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task """ t = asyncio.create_task(coro) _background_tasks.add(t) t.add_done_callback(_background_tasks.discard) return t
[docs] async def sleep(secs): await asyncio.sleep(secs)
def _init(): global _cfg if _cfg: return _cfg = pkconfig.init( debug=(pkconfig.in_dev_mode(), bool, "enable debugging for asyncio"), server_ip=( None, cfg_ip, "ip to listen on", ), server_port=("9001", cfg_port, "port to listen on"), verify_tls=( not pkconfig.channel_in("dev"), bool, "validate TLS certificates on requests; for self-signed set to False", ), )