"""Useful operations for unit tests
:copyright: Copyright (c) 2015 RadiaSoft LLC. All Rights Reserved.
:license: http://www.apache.org/licenses/LICENSE-2.0.html
"""
# defer importing pkconfig
from pykern import pkcompat
from pykern import pkconst
from pykern import pkinspect
from pykern import pkio
import contextlib
import functools
import http.server
import importlib
import inspect
import json
import os
import py
import pykern.pkconst
import pykern.util
import pytest
import re
import subprocess
import sys
import threading
import traceback
#: Environment var set by pykern.pkcli.test for each module under test
TEST_FILE_ENV = "PYKERN_PKUNIT_TEST_FILE"
#: Environment var set by pykern.pkcli.test if the test is restartable
RESTARTABLE = "PYKERN_PKUNIT_RESTARTABLE"
#: Where persistent input files are stored (test_base_name_data)
DATA_DIR_SUFFIX = "_data"
#: Used to create test servers
LOCALHOST_IP = pkconst.LOCALHOST_IP
#: Where to write temporary files (test_base_name_work)
WORK_DIR_SUFFIX = "_work"
#: Where `ExceptToFile` writes exception
PKEXCEPT_PATH = "pkexcept"
#: Where `ExceptToFile` writes stack
PKSTACK_PATH = "pkstack"
#: INTERNAL: Set to the most recent test module by `_test_file`
module_under_test = None
#: Type of a regular expression
_RE_TYPE = type(re.compile(""))
#: used by case_dirs for comparing sheets
_CSV_SHEET_ID = re.compile(r"(.+)#(\d)$")
#: _test_file initialized?
_init_test_file = False
#: module being run by `pykern.pkcli.test`
__test_file = None
[docs]
class PKFail(AssertionError):
pass
[docs]
class ExceptToFile:
"""Writes exception or None to `PKEXCEPT_PATH`
Used for deviance testing with `case_dirs`.
If there is an exception, writes that to the file. Otherwise, writes "None"
If there is an exception, will write `PKSTACK_PATH`. Otherwise, no
file exists. Used for diagnostics only.
Usage::
for d in case_dirs():
with ExceptToFile():
command to test
Returns:
None: just for context manager
"""
def __enter__(self):
return None
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is None:
r = str(None)
else:
r = re.sub(
pkio.py_path().dirname + r"\S*/", "", str(exc_val), flags=re.IGNORECASE
)
with open(PKSTACK_PATH, "wt") as f:
traceback.print_exception(exc_type, exc_val, exc_tb, file=f)
pkio.write_text(PKEXCEPT_PATH, r + "\n")
return True
[docs]
def assert_object_with_json(
basename,
actual,
):
"""Converts actual to JSON and compares with data_dir/basename.json
Reads data_dir/basename.json and compares with actual
converted to json. Trailing newline is managed properly. The
keys are sorted and indentation is 4. actual written to work_dir.
Args:
expected_basename (str): file to be found in data_dir with json suffix
actual (object): to be serialized as json
"""
from pykern.pkdebug import pkdpretty
actual = pkdpretty(actual)
fn = "{}.json".format(basename)
a = work_dir().join(fn)
pkio.write_text(a, actual)
e = data_dir().join(fn)
expect = pkio.read_text(e)
pkeq(expect, actual, "diff {} {}", e, a)
[docs]
def case_dirs(group_prefix="", **kwargs):
"""Sets up `work_dir` by iterating ``*.in`` in `data_dir`
Every ``<case-name>.in`` is copied recursively to ``<case-name>`` in
`work_dir`. This function then yields that directory. The test can
then run the function to be tested.
When test yields to the generator, this function looks for all
files in `data_dir` in the sub-directory ``<case-name>.out``. Each
of these expect files is copmared to the corresponding `work_dir`
actual file using `file_eq`.
If you want to only use cases from some specific `<case-name>.in`
subdir, and not all `*.in` subdirs, you can pass a `group_prefix`
default parameter value ('' by default) to `case_dirs()`. This will
perform the regular operations but only on `<case-name>.in`.
Excel spreadsheets are supported. If the name of the expect
(out) file is ``foo.csv``, then the first sheet (sheet 0) in the
corresponding work_dir xlsx will be converted to ``foo.csv``
before comparison. If the expect (out) file has a ``#<digit>``,
e.g. ``foo#3.csv``, then the fourth sheet will be extracted from
the actual xlsx to ``foo#3.csv`` in the work_dir.
If ExceptToFile in the body of a case_dirs loop, the exception
will be output if a file is not found.
Args:
group_prefix (string): target subdir ['']
j2_ctx (dict): passed to `pykern.pkjinja.render_file`
ignore_lines (iterable): `POSIX standard regular expressions <https://www.gnu.org/software/findutils/manual/html_node/find_html/posix_002dbasic-regular-expression-syntax.html>`_ to be passed to `diff`
is_bytes (bool): do a binary comparison [False]
Returns:
py.path.local: case directory created in work_dir (also PWD)
"""
import shutil
def _compare(in_d, work_d):
o = in_d.new(ext="out")
for e in pkio.walk_tree(o):
if e.basename.endswith("~"):
continue
a = work_d.join(o.bestrelpath(e))
file_eq(
expect_path=e,
actual_path=a,
**kwargs,
)
d = work_dir()
n = 0
for i in pkio.sorted_glob(data_dir().join(group_prefix + "*.in")):
w = d.join(i.purebasename)
shutil.copytree(str(i), str(w))
n += 1
with pkio.save_chdir(w):
_pkdlog("case_dir={}", i.basename)
yield w
try:
_compare(i, w)
continue
except Exception as e:
# Not found indicates expected output not found.
# It might have been caused by an exception which was
# caught by ExceptToFile.
if not pkio.exception_is_not_found(e):
raise
f = w.join(PKSTACK_PATH)
if not f.exists():
raise
_pkdlog("Exception in case_dir={}\n{}", w, f.read())
# This avoids confusing "during handling of above exception"
pkfail("See stack above")
if n == 0:
pkfail(f"No files found for group_prefix={group_prefix}")
[docs]
def data_dir():
"""Compute the data directory based on the test name
The test data directory is always ``<test>_data``, where ``<test>``
is the name of the test's python module with the ``_test`` or
``test_`` removed. For example, if the test file is
``setup_test.py`` then the directory will be ``setup_data``.
Returns:
py.path.local: data directory
"""
return _base_dir(DATA_DIR_SUFFIX)
[docs]
def data_yaml(base_name):
"""Load base_name.yml from data_dir
Args:
base_name (str): name of YAML file with ``.yml`` extension
Returns:
object: YAML data structure, usually dict or array
"""
from pykern import pkyaml
return pkyaml.load_file(data_dir().join(base_name) + ".yml")
[docs]
def empty_work_dir():
"""Remove `work_dir` if it exists and create.
All contents of the test directory will be removed.
Returns:
py.path.local: empty work directory
"""
d = work_dir()
if os.path.exists(str(d)):
# doesn't ignore "not found" errors
d.remove(rec=1, ignore_errors=True)
return d.ensure(dir=True)
[docs]
def file_eq(expect_path, *args, **kwargs):
"""If actual is not expect_path, throw assertion with calling context.
`expect_path` and `actual_path` both exist, they will be compared as plain text.
If `actual_path` does not exist, it will be created from `actual`.
If `expect_path` ends in ``.json`` and `actual_path` does not exist,
`pkjson` will be used to load `expect_path` and a data structure comparison
will be used with `actual` (and `actual_path` will be written.
This allows easy testing of complex results.
If `expect_path` ends with ``.jinja``, it will be rendered
with `pykern.pkjina.render_file`, and you must supply `j2_ctx`
in kwargs.
Args:
expect_path (str or py.path): text file to be read; if str, then joined with `data_dir`
actual (object): string or json data structure; if missing, read `actual_path` (may be positional)
actual_path (py.path or str): where to write results; if str, then joined with `work_dir`; if None, ``work_dir().join(expect_path.relto(data_dir()))``
j2_ctx (dict): passed to `pykern.pkjinja.render_file`
ignore_lines (iterable): `POSIX standard regular expressions <https://www.gnu.org/software/findutils/manual/html_node/find_html/posix_002dbasic-regular-expression-syntax.html>`_ to be passed to `diff`
is_bytes (bool): do a binary comparison [False]
"""
_FileEq(expect_path, *args, **kwargs)
[docs]
@contextlib.contextmanager
def insert_data_dir_in_sys_path():
"""Context manager to insert `data_dir` first in `sys.path`"""
p = sys.path
try:
sys.path = [str(data_dir())] + p
yield
finally:
sys.path = p
[docs]
def is_test_run():
"""Running in a test?
Returns:
bool: whether this is running in a test
"""
return bool(_test_file())
[docs]
def import_module_from_data_dir(module_name):
"""Add `data_dir` to sys.path and import module_name.
Note that `module_name` with be removed from the sys.modules cache
before loading in case the module was loaded by another test.
Args:
module_name (str): module relative to `data_dir` to import.
Returns:
module: imported module
"""
d = str(data_dir())
prev_path = sys.path
try:
sys.path = [d]
try:
del sys.modules[module_name]
except KeyError:
pass
m = importlib.import_module(module_name)
return m
finally:
sys.path = prev_path
[docs]
def pkeq(expect, actual, *args, **kwargs):
"""If actual is not expect, throw assertion with calling context.
Opposite of `pkne`.
Args:
expect (object): what to test for
actual (object): run-time value
args (tuple): passed to pkfail()
kwargs (dict): passed to pkfail()
"""
if expect != actual:
_fail(("expect={} != actual={}", expect, actual), *args, **kwargs)
[docs]
@contextlib.contextmanager
def pkexcept(exc_or_re, *args, **kwargs):
"""Expect an exception to be thrown and match or output msg
Examples::
# Expect an exception (or its subclass)
with pkexcept(AssertionError, 'did not expect this'):
assert 0
# Expect exception to contain a specific message
with pkexcept('match this', 'problem with matching'):
assert 0, 'some string with "match this" in it'
# Use a default output message
with pkexcept(KeyError):
something['key will not be found']
Args:
exc_or_re (object): BaseException, re, or str; if str, compiled with `re.IGNORECASE`
args (tuple): passed to format
kwargs (dict): passed to format
Yields:
None: just for context manager
"""
try:
yield None
except BaseException as e:
from pykern.pkdebug import pkdexc
e_str = "{} {}".format(type(e), e)
if isinstance(exc_or_re, type) and issubclass(exc_or_re, BaseException):
if isinstance(e, exc_or_re):
return
m = (
"{}: an exception was raised, but expected it to be {}; stack={}",
e_str,
exc_or_re,
pkdexc(),
)
else:
if not isinstance(exc_or_re, _RE_TYPE):
exc_or_re = re.compile(exc_or_re, flags=re.IGNORECASE)
if exc_or_re.search(e_str):
return
m = (
'{}: an exception was raised, but did not match "{}"; stack={}',
e_str,
exc_or_re.pattern,
pkdexc(),
)
else:
m = ("Exception was not raised: expecting={}", exc_or_re)
_fail(m, *args, **kwargs)
[docs]
def pkfail(fmt, *args, **kwargs):
"""Format message and raise PKFail.
Args:
fmt (str): to be passed to `string.format`
args (tuple): passed to format
kwargs (dict): passed to format
"""
msg = fmt.format(*args, **kwargs)
call = pkinspect.caller(ignore_modules=[contextlib])
raise PKFail("{} {}".format(call, msg))
[docs]
def pkne(expect, actual, *args, **kwargs):
"""If actual is equal to expect, throw assertion with calling context
Opposite of `pkeq`.
Args:
expect (object): what to test for
actual (object): run-time value
args (tuple): passed to pkfail()
kwargs (dict): passed to pkfail()
"""
if expect == actual:
_fail(("expect={} == actual={}", expect, actual), *args, **kwargs)
[docs]
def pkok(cond, fmt, *args, **kwargs):
"""If cond is not true, throw PKFail with calling context
Args:
cond (object): expression which should evaluate to true
fmt (str): to be passed to `string.format`
args (tuple): passed to format
kwargs (dict): passed to format
Returns:
object: `obj` value
"""
if not cond:
pkfail(fmt, *args, **kwargs)
return cond
[docs]
def pkre(expect_re, actual, flags=re.IGNORECASE + re.DOTALL):
"""If actual does not match (re.search) expect_re, throw PKFail with calling context.
Args:
expect_re (object): string or re object
actual (object): run-time value
flags: passed on to re.search [IGNORECASE + DOTALL]
"""
if not re.search(expect_re, pkcompat.from_bytes(actual), flags=flags):
pkfail("expect_re={} != actual={}", expect_re, actual)
[docs]
def restart_or_fail(*args, **kwargs):
"""Test will be restarted (at process level) if it can, else `pkfail`
Called by tests which experience known CI failures such
as not being able to connect to servers.
Communicates with pykern.pkcli.test
Args:
fmt (str): to be passed to `string.format`
args (tuple): passed to format
kwargs (dict): passed to format
"""
if os.environ.get(RESTARTABLE):
raise KeyboardInterrupt()
pkfail(*args, **kwargs)
[docs]
def save_chdir_work(is_pkunit_prefix=False, want_empty=True):
"""Change to `work_dir` which will be created.
Default to `empty_work_dir` before chdir.
Args:
is_pkunit_prefix (bool): use as root of (most) file I/O (optional)
want_empty (bool): call `empty_work_dir` before chdir if True [True]
Returns:
py.path.local: empty work directory
"""
return pkio.save_chdir(
empty_work_dir() if want_empty else work_dir(),
is_pkunit_prefix=is_pkunit_prefix,
)
#: DEPRECATED
unbound_localhost_tcp_port = pykern.util.unbound_localhost_tcp_port
[docs]
class WebServer:
"""Serves files from `data_dir` on a random port in a separate thread.
Usage::
with pkunit.WebServer() as server:
# server.url is "http://localhost:<port>"
do_something(server.url)
"""
def __enter__(self):
h = functools.partial(
http.server.SimpleHTTPRequestHandler,
directory=str(data_dir()),
)
self._srv = http.server.HTTPServer(("localhost", 0), h)
self.url = f"http://localhost:{self._srv.server_address[1]}"
self._thread = threading.Thread(target=self._srv.serve_forever, daemon=True)
self._thread.start()
return self
def __exit__(self, *args):
self._srv.shutdown()
return False
[docs]
def test_path_to_work_dir(path):
"""Convert a test file path to its work directory path.
Strips ``_test`` suffix or ``test_`` prefix from the basename and
appends ``_work``.
Args:
path (str or py.path.local): test file path ending in ``_test`` or starting with ``test_``
Returns:
py.path.local: work directory path
"""
p = pkio.py_path(path)
b = _strip_test_affix(p.purebasename)
if b is None:
pkfail("{}: path must be a test file (_test suffix or test_ prefix)", p)
return p.new(basename=b + WORK_DIR_SUFFIX)
[docs]
def work_dir():
"""Returns ephemeral work directory, created if necessary.
To enable easier debugging, the test directory is always
``<test>_work``, where ``<test>`` is the name of the test's python
module with the ``_test`` or ``test_`` removed. For example, if the
test file is ``setup_test.py`` then the directory will be
``setup_work``.
The name "work" distinguishes from "tmp", which could imply
anything. Also, with editor autocomplete, "setup_work" and
"setup_test" are more easily distinguishable.
Returns:
py.path: directory name
"""
f = _test_file()
if not f:
raise PKFail("unable to find test file path; not running in pykern.pkcli.test?")
return test_path_to_work_dir(f).realpath().ensure(dir=True)
class _FileEq:
"""Implements `file_eq`"""
def __init__(self, expect_path, *args, **kwargs):
self._validate_args(expect_path, *args, **kwargs)
self._set_expect_and_actual()
self._compare()
def _actual_xlsx(self):
from pykern.pkcli import xlsx
try:
b = self._actual_path.new(ext=".xlsx")
m = _CSV_SHEET_ID.search(b.purebasename)
s = 0
if m:
b = b.new(purebasename=m.group(1))
s = int(m.group(2))
if b.check(file=True):
xlsx.to_csv(b, sheet=s, csv_path=self._actual_path)
return True
return False
except Exception:
_pkdlog(
"ERROR converting xlsx to csv expect={} actual={}",
self._expect_path,
self._actual_path,
)
raise
def _compare(self):
w = work_dir()
def _cmd():
if self.is_bytes:
r = ["cmp"]
else:
r = ["diff"]
for l in self._ignore_lines or ():
r.extend(("-I", l))
return r + [str(self._expect_path), str(self._actual_path)]
def _failed_msg(process):
r = "'" + "' '".join(process.args) + f"'\n" + process.stdout + "\n"
if not (process.returncode == 1 or self.is_bytes):
return r + "diff command failed\n"
if self._expect_is_jinja:
return (
r
+ f"""
Implementation restriction: expect is a jinja template which has been processed to
produce the diff. A simple copy of actual to expect is not possible. You will need to update
the expect jinja template={self._expect_path} manually.
"""
)
return r + self._update_message
def _ndiff_config(work_d):
return pykern.pkio.write_text(
work_d.join("ndiff_conf.txt"),
f"* * {'abs' if self._ndiff_epsilon_is_abs else 'rel'}={float(self._ndiff_epsilon)}",
)
def _ndiff_files(expect_path, actual_path):
p = subprocess.run(
(
"ndiff",
actual_path,
expect_path,
_ndiff_config(w),
),
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
)
d = pkcompat.from_bytes(p.stderr)
if not re.search(r"processing '.*'\n\s*\d+ lines have been diffed\s*$", d):
pkfail("diffs detected: {} {}", d, self._update_message)
if self._is_ndiff:
_ndiff_files(
self._expect_path,
self._actual_path,
)
return
if self._expect == self._actual:
return
p = subprocess.run(
_cmd(), stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True
)
if p.returncode != 0:
pkfail("expect != actual:\n{}", _failed_msg(p))
def _expect_csv(self):
if not self._expect_path.ext == ".csv":
return False
self._actual_xlsx()
self._actual = pkio.read_text(self._actual_path)
self._expect = pkio.read_text(self._expect_path)
return True
def _expect_default(self):
self._expect = self._read(self._expect_path)
if self._have_actual_kwarg:
self._write(self._actual_path, self._actual)
def _expect_jinja(self):
if not self._expect_is_jinja:
return False
import pykern.pkjinja
self._expect = pykern.pkjinja.render_file(
self._expect_path, self.j2_ctx, strict_undefined=True
)
if self._have_actual_kwarg:
pkio.write_text(self._actual_path, self._actual)
return True
def _expect_json(self):
if not self._expect_path.ext == ".json" or self._actual_path.exists():
return False
self._expect = pkio.read_text(self._expect_path)
if self._have_actual_kwarg:
import pykern.pkjson
pkio.mkdir_parent_only(self._actual_path)
self._actual = pykern.pkjson.dump_pretty(
self._actual, filename=self._actual_path
)
return True
def _set_expect_and_actual(self):
self._read, self._write = (
(pkio.read_binary, pkio.write_binary)
if self.is_bytes
else (pkio.read_text, pkio.write_text)
)
self._update_message = f"""
to update test data:
cp '{self._actual_path}' '{self._expect_path}'
"""
if self._expect_csv():
assert not self.is_bytes, "csv is not compatible with is_bytes"
return
if self._have_actual_kwarg:
self._actual = self.kwargs["actual"]
if self._actual_path.exists():
pkfail(
"actual={} and actual_path={} both exist",
self._actual,
self._actual_path,
)
else:
self._actual = self._read(self._actual_path)
if self._expect_json() or self._expect_jinja():
assert not self.is_bytes, "json or jinja is not compatible with is_bytes"
return
self._expect_default()
def _validate_args(self, expect_path, *args, **kwargs):
from pykern.pkcollections import PKDict
self.kwargs = kwargs
self._have_actual_kwarg = "actual" in self.kwargs
if args:
assert (
not self._have_actual_kwarg
), f'have actual as positional arg={args[0]} and kwargs={self.kwargs["actual"]}'
assert (
len(args) == 1
), f"too many positional args={args}, may only have one (actual)"
self.kwargs["actual"] = args[0]
self._have_actual_kwarg = True
self._actual_path = kwargs.get("actual_path")
self._expect_path = expect_path
if not isinstance(self._expect_path, pykern.pkconst.PY_PATH_LOCAL_TYPE):
self._expect_path = data_dir().join(self._expect_path)
self._expect_is_jinja = self._expect_path.ext == ".jinja"
self._is_ndiff = self._expect_path.ext == ".ndiff"
self._ndiff_epsilon = kwargs.get("ndiff_epsilon", 1e-13)
self._ndiff_epsilon_is_abs = kwargs.get("ndiff_epsilon_is_abs", False)
b = (
self._expect_path.purebasename
if self._expect_is_jinja
else self._expect_path.relto(data_dir())
)
if self._actual_path is None:
self._actual_path = b
if not isinstance(self._actual_path, pykern.pkconst.PY_PATH_LOCAL_TYPE):
self._actual_path = work_dir().join(self._actual_path)
self._ignore_lines = kwargs.get("ignore_lines")
self.j2_ctx = kwargs.get("j2_ctx", PKDict())
self.is_bytes = kwargs.get("is_bytes", False)
def _base_dir(postfix):
"""Base name with directory.
Args:
postfix (str): what to append to base (``_data`` or ``_work``).
Returns:
py.path.local: base directory with postfix
"""
f = _test_file()
if not f:
raise PKFail("unable to find test file path; not running in pykern.pkcli.test?")
b = _strip_test_affix(f.purebasename)
assert b is not None, "{}: module name must end in _test".format(f)
return f.new(basename=b + postfix).realpath()
def _fail(std_message_args, *args, **kwargs):
"""Augment standard failure messages with args and kwargs
Args:
std_message_args (tuple): fmt string and args. eg. ("expect={} != actual={}", expect, actual)
"""
if args:
pkfail(
f"{std_message_args[0]} {args[0]}",
*std_message_args[1:],
*args[1:],
**kwargs,
)
pkfail(*std_message_args)
def _pkdlog(*args, **kwargs):
from pykern.pkdebug import pkdlog
pkdlog(*args, **kwargs)
def _strip_test_affix(purebasename):
b = re.sub(r"_test$", "", purebasename)
if b == purebasename:
b = re.sub(r"^test_", "", purebasename)
return None if b == purebasename else b
def _test_file():
"""Various ways to initialize _test_file"""
global _init_test_file, __test_file
if not _init_test_file:
_init_test_file = True
# pykern.pkcli.test
t = os.environ.get(TEST_FILE_ENV)
if t:
__test_file = py.path.local(t)
if __test_file:
return __test_file
if module_under_test:
# POSIT: pykern.pytest_plugin or sirepo/tests/conftest.py
m = module_under_test
return py.path.local(m.__file__)
# py.test alone, just guess
s = inspect.currentframe().f_back.f_back
f = None
for _ in range(100):
if s.f_code.co_filename.endswith("_test.py"):
return py.path.local(s.f_code.co_filename)
s = s.f_back
if not s:
break
return None