Source code for pykern.pkcli.test

"""run test files in separate processes

:copyright: Copyright (c) 2019 RadiaSoft LLC.  All Rights Reserved.
:license: http://www.apache.org/licenses/LICENSE-2.0.html
"""

from pykern import pkconfig
from pykern import pkconst
from pykern import pkio
from pykern import pkunit
from pykern.pkcollections import PKDict
from pykern.pkdebug import pkdp
import os
import pykern.pkcli
import pykern.pkcompat
import re
import signal
import subprocess
import sys
import time


SUITE_D = "tests"
_COROUTINE_NEVER_AWAITED = re.compile(
    r"(.+ coroutine \S+ was never awaited.*)", flags=re.MULTILINE
)
_TEST_SKIPPED = re.compile(r"^.+\s+SKIPPED\s+\(.+\)$", flags=re.MULTILINE)
_TEST_PY = re.compile(r"_test\.py$")

_FAIL_MSG = "FAIL"

_PASS_MSG = "pass"

_TIMED_OUT_MSG = _FAIL_MSG + " (timed out after {} seconds)"

_SKIPPED_MSG = "skipped"

_MAX_RESTARTS = 3

_RESTART_MSG = "restart"

_START_MSG = "start"

_WAIT_LOOP_SLEEP = 0.1

_cfg = pkconfig.init(
    ignore_warnings=(False, bool, "override pytest's output of all warnings"),
    max_case_secs=(300, pkconfig.parse_seconds, "max run time for a case"),
    max_failures=(
        5,
        pkconfig.parse_positive_int,
        "maximum number of test failures before exit",
    ),
    max_procs=(1, pkconfig.parse_positive_int, "maximum number of parallel test runs"),
    restartable=(
        False,
        bool,
        "allow pkunit.restart_or_fail to restart",
    ),
)


[docs] def default_command(*args): """Run tests one at a time with py.test. Searches in ``tests`` sub-directory if not provided a list of tests or not in tests/ already Arguments are directories or files, which are searched for _test.py files. An argument which is ``case=<pattern>``, is passed to pytest as ``-k <pattern>``. An argument of the form ``<file>::<case>`` runs that specific test by node ID. These two forms are mutually exclusive. ``skip_past=<last_to_ignore>`` causes collection to ignore all files up to and including ``<last_to_ignore>``` (may be partial match of the test file). ``max_procs=2`` starts two cases simultaneously. Default is 1, or config value `PYKERN_PKCLI_TEST_MAX_PROCS` if set. Writes failures to ``<base>_test.log`` Args: args (str): test dirs, files, options Returns: str: passed=N if all passed, else raises `pkcli.Error` """ return _Runner(args).result
class _Case: def __init__(self, entry, runner): self.timed_out_secs = 0 self.runner = runner self.rel_path = entry.path self.case_func = entry.case_func self.abs_path = pkio.py_path(self.rel_path) self.tries = _MAX_RESTARTS if _cfg.restartable else 1 self.run() def is_done(self, aborting): if (e := self.process.poll()) is None: return None return self._exit(e, aborting) def kill_after_timeout(self, run_secs): self.timed_out_secs = run_secs self.process.send_signal(signal.SIGABRT) for _ in range(5): time.sleep(0.1) if self.process.poll() is not None: return self.process.kill() def pkdebug_str(self): return f"{self.__class__.__name__}({self.rel_path})" def run(self): self.tries -= 1 self.restartable = self.tries > 0 self.process = self._start() self.started = pykern.pkcompat.utcnow() def _exit(self, returncode, aborting): def _forced_fail(msg): with self.output_path.open(mode="a") as f: f.write(msg) return _FAIL_MSG def _msg(): if 0 == returncode: return _PASS_MSG if 5 == returncode: # 5 means test was skipped # see http://doc.pytest.org/en/latest/usage.html#possible-exit-codes return _SKIPPED_MSG if not aborting and self.restartable and 2 == returncode: # 2 means KeyboardInterrupt # POSIT: pkunit.restart_or_fail uses this return _RESTART_MSG return _FAIL_MSG def _skipped(ouput): return _TEST_SKIPPED.findall(ouput) self.runner.signal_cascade.processes.remove(self.process) self.process = None o = pkio.read_text(self.output_path) self.skipped = _skipped(o) if _TEST_SKIPPED.search(o) else [] if self.timed_out_secs: return _forced_fail(f"TIMEOUT after {self.timed_out_secs} seconds\n") if m := re.findall(_COROUTINE_NEVER_AWAITED, o): return _forced_fail("".join(f"ERROR: {x}\n" for x in m)) return _msg() def _start(self): def _env(): res = os.environ.copy() res.update( { pkunit.TEST_FILE_ENV: str(self.abs_path), pkunit.RESTARTABLE: "1" if self.restartable else "", } ) return res def _ignore_warnings(): if not _cfg.ignore_warnings: return [] rv = [] for w in ( # https://docs.python.org/3/library/warnings.html#default-warning-filter "DeprecationWarning", "PendingDeprecationWarning", "ImportWarning", "ResourceWarning", ): rv.extend(("-W", f"ignore::{w}")) return rv def _remove_work_dir(): pkio.unchecked_remove(pkunit.test_path_to_work_dir(self.rel_path)) def _process(): c = ( ["pytest"] + _ignore_warnings() + [ "--tb=native", "-v", "-s", "-rs", self.case_func or self.rel_path, ] + self.runner.pytest_flags ) v = _env() with self.output_path.open("w") as o, open(os.devnull) as i: return subprocess.Popen( c, stdin=i, stdout=o, stderr=subprocess.STDOUT if o else None, env=v, close_fds=True, ) self.output_path = self.abs_path.new(ext=".log") _remove_work_dir() pkio.unchecked_remove(self.output_path) rv = _process() self.runner.signal_cascade.processes.add(rv) return rv class _Runner: def __init__(self, args): def _too_many_failures(): if len(self.failures) < _cfg.max_failures: return False self._info( None, [ f"too many failures={len(self.failures)} aborting" + ( "; waiting for processes to finish" if self.max_procs > 1 and self.cases else "" ), ], ) return True self._args(args) if not self.rel_paths: pykern.pkcli.command_error("no tests found") c = 0 self.failures = [] self.cases = set() with _SignalCascade() as self.signal_cascade: for v in self.rel_paths: c += 1 self._run(v) if a := _too_many_failures(): break while self._wait_for_one(aborting=a): pass self._assert_failures(self.failures, c) self.result = f"passed={c}" def _args(self, tests): def _case_funcs(case_funcs, cwd): for p, c in case_funcs: if not (t := pkio.py_path(p)).exists(): pykern.pkcli.command_error("not found test={}", t) _file(str(cwd.bestrelpath(t)), case_func=c) def _file(path, case_func=None): if self.skip_past: if self.skip_past in path: self.skip_past = None return self.rel_paths.append(PKDict(path=path, case_func=case_func)) def _find(paths): i = re.compile(r"(?:_work|_data)/") cwd = pkio.py_path() for t in _resolve_test_paths(paths, cwd): t = pkio.py_path(t) if not t.exists(): pykern.pkcli.command_error("test={} does not exist", t) if t.check(file=True): _file(str(cwd.bestrelpath(t))) continue for p in pkio.walk_tree(t, _TEST_PY): p = str(cwd.bestrelpath(p)) if not i.search(p): _file(p) def _flag(name, value): rv = False if len(value) <= 0: pykern.pkcli.command_error(f"empty value for option={name}") elif name == "case": rv = True self.pytest_flags.extend(("-k", value)) elif name == "max_procs": try: self.max_procs = int(value) except Exception: self.max_procs = -1 if self.max_procs <= 0: pykern.pkcli.command_error( f"max_procs={value} must be a positive integer" ) elif name == "skip_past": if self.skip_past: pykern.pkcli.command_error( f"skip_past={value} passed twice (skip_past={self.skip_past})" ) self.skip_past = value else: pykern.pkcli.command_error(f"unsupported option={name}") return rv def _resolve_test_paths(paths, current_dir): if not paths: p = current_dir if p.basename != SUITE_D: p = SUITE_D paths = (p,) return paths case_flag = False paths = [] case_funcs = [] self.pytest_flags = [] self.max_procs = _cfg.max_procs self.skip_past = None for t in tests: if "=" in t: case_flag = _flag(*(t.split("="))) elif "::" in t: v = t.split("::", 1) case_funcs.append((v[0], t)) else: paths.append(t) self.rel_paths = [] if case_funcs: if case_flag: pykern.pkcli.command_error("use case= or test::case, not both") _case_funcs(case_funcs, pkio.py_path()) if paths or not case_funcs: _find(paths) def _assert_failures(self, failures, count): if len(failures) <= 0: return # Avoid dumping too many test logs for c in failures[: _cfg.max_failures]: self._info(None, [pkio.read_text(c.output_path)]) pykern.pkcli.command_error( f"FAILED={len(failures)} passed={count - len(failures)}", ) def _exit(self, case, msg): if rv := msg != _RESTART_MSG: self.cases.remove(case) if _FAIL_MSG in msg: self.failures.append(case) self._info(case, [msg] + case.skipped) case.skipped = None return rv def _info(self, case, lines): if case is None: if not lines[-1].endswith("\n"): # other output on its own line, ensure newline at end lines[-1] += "\n" else: v = case.case_func or case.rel_path if lines[0] == _FAIL_MSG: # add the failure context lines[0] += f" {case.output_path}" if self.max_procs > 1: # line by line when multiprocess lines[0] = v + " " + lines[0] elif lines[0] == _START_MSG: # starting a case lines[0] = v else: # completing a case lines[0] = " " + lines[0] lines[-1] += "\n" o = "\n".join(lines) if self.max_procs > 1 and not o.endswith("\n"): o += "\n" pkconst.builtin_print(o, end="") # TODO(robnagler) is this necessary? sys.stdout.flush() def _run(self, entry): c = _Case(entry, self) self.cases.add(c) self._info(c, [_START_MSG]) if len(self.cases) >= self.max_procs: self._wait_for_one() def _wait_for_one(self, aborting=False): def _check_run_time(case, now): if (t := (now - case.started).seconds) > _cfg.max_case_secs: case.kill_after_timeout(t) while self.cases: n = pykern.pkcompat.utcnow() for c in self.cases: if (m := c.is_done(aborting)) is None: # wait for next loop _check_run_time(c, n) continue if self._exit(c, m): return True c.run() time.sleep(_WAIT_LOOP_SLEEP) return False class _SignalCascade: _SIGNALS = (signal.SIGTERM, signal.SIGINT) _IGNORE_HANDLERS = frozenset((None, signal.SIG_IGN, signal.SIG_DFL)) def __init__(self): self.processes = set() def __enter__(self): self._prev_handlers = PKDict({s: signal.getsignal(s) for s in self._SIGNALS}) for s in self._prev_handlers.keys(): signal.signal(s, self._handler) return self def __exit__(self, type, value, traceback): for s, h in self._prev_handlers.items(): signal.signal(s, h) def _handler(self, sig, frame): for p in self.processes: try: p.send_signal(sig) except Exception: pass if (h := self._prev_handlers.get(sig)) not in self._IGNORE_HANDLERS: h(sig, frame)