Source code for pykern.pkio

"""Useful I/O operations

:copyright: Copyright (c) 2015 RadiaSoft LLC.  All Rights Reserved.
:license: http://www.apache.org/licenses/LICENSE-2.0.html
"""

# Root module: Limit imports to avoid dependency issues
from pykern import pkconst
from pykern import pkinspect
import pykern.util
import contextlib
import errno
import filecmp
import glob
import io
import os
import os.path
import py
import re
import shutil

#: used during unit testing see ``pykern.pkunit.save_chdir``
pkunit_prefix = None

TEXT_ENCODING = "utf-8"


[docs] def atomic_write(path, contents=None, writer=None, **kwargs): """Overwrites an existing file with contents via rename to ensure integrity Args: path (str or py.path.Local): Path of file to overwrite contents (object): New contents [None] writer (callable): called with path to write as arg [None] kwargs (kwargs): to pass to `py.path.local.write` """ n = py_path(path).new(ext="pkio-tmp-" + pykern.util.random_base62()) assert not n.exists(), f"{n} already exists (file name collision)" try: if contents is not None: n.write(contents, **kwargs) elif writer is not None: writer(n) else: raise AssertionError("must supply writer or contents") n.rename(path) finally: # unchecked_remove is too brutal for this specific case if n.exists(): try: os.remove(str(n)) except Exception: pass
[docs] def compare_files(path1, path2, force=False): """Compares two files using `filecmp.cmp` Note that `filecmp` uses `os.stat` to see if a file is the same. If the size, mtime, and type are not identical, it does a comparison of the contents. `filecmp` caches prior resuls of the content comparisons. `force` "ensures" no caching, but since the cache is global, this can't be guaranteed in multithreaded environments. Args: path1 (str or py.path): first file path2 (str or py.path): second file force (bool): if True, call `filecmp.clear_cache` before comparison and ignore stats. Returns: bool: True if the files exist and have the same stats """ if force: filecmp.clear_cache() try: return filecmp.cmp(str(path1), str(path2), shallow=not force) except Exception as e: if exception_is_not_found(e): return False raise
[docs] def exception_is_not_found(exc): """True if exception is one various file not found errors Checks `FileNotFoundError` and `IOError` with `errno.ENOENT`. Args: exc (BaseException): to check Returns: bool: True if is a file not found exception. """ return ( isinstance(exc, FileNotFoundError) or isinstance(exc, IOError) and exc.errno == errno.ENOENT or isinstance(exc, py.error.ENOENT) )
[docs] def has_file_extension(filename, to_check): """if matches any of the file extensions Args: filename (str|py.path.local): what to check to_check (str|tuple|list): is without '.' and lower Returns: bool: if any of the extensions matches """ if isinstance(to_check, pkconst.STRING_TYPES): to_check = (to_check,) e = py_path(filename).ext[1:].lower() return e in to_check
[docs] def is_pure_text(filepath, test_size=512): """Read test_size bytes of filepath to determine if it is likely a text file. See `pykern.util.is_pure_text` for the heuristics used to test bytes. Args: filepath (str|py.path): file to check test_size (int): number of bytes to read from filename Returns: bool: True if file is likely pure text, false if likely binary """ from pykern import util with open(filepath, "rb") as f: b = f.read(test_size + 1) return util.is_pure_text(b[:test_size], is_truncated=len(b) > test_size)
[docs] def mkdir_parent(path): """Create the directories and their parents (if necessary) Args: path (str): dir to create Returns: py.path.local: path """ return py_path(path).ensure(dir=True)
[docs] def mkdir_parent_only(path): """Create the paths' parent directories. Args: path (str): children of dir to create Returns: py.path.local: parent directory of path """ return mkdir_parent(py_path(path).dirname)
[docs] def open_text(filename, **kwargs): """Open file with utf-8 for text. Args: filename (str or py.path.Local): File to open Returns: object: open file handle """ kwargs.setdefault("mode", "rt") kwargs.setdefault("encoding", TEXT_ENCODING) return io.open(str(py_path(filename)), **kwargs)
[docs] def py_path(path=None): """Creates a py.path.Local object Will expanduser, if needed. If `pkunit_prefix` is set, will prefix, too. Args: path (str): path to convert (or None for current dir) Returns: py.path.Local: path """ global pkunit_prefix res = py.path.local(path, expanduser=True) if pkunit_prefix: # Allow for <test>_work and <test>_data so we don't add # prefix if there's a common parent directory. if not str(res).startswith(pkunit_prefix.dirname): res = pkunit_prefix.join(res) py.path.local(res.dirname).ensure(dir=True) return res
[docs] def random_base62(*args, **kwargs): """DEPRECATED call `pykern.util.random_base62`""" from pykern import util return util.random_base62(*args, **kwargs)
[docs] def read_binary(filename): """Open file, read binary, and close. Args: filename (str or py.path.Local): File to open Returns: bytes: contents of `filename` """ return py_path(filename).read_binary()
[docs] def read_text(filename): """Open file, read with utf-8 text, and close. Args: filename (str or py.path.Local): File to open Returns: str: contents of `filename` """ try: with open_text(filename) as f: return f.read() except Exception as e: pkinspect.append_exception_reason(e, f"filename={filename}") raise
[docs] @contextlib.contextmanager def save_chdir(dirname, mkdir=False, is_pkunit_prefix=False): """Save current directory, change to directory, and restore. Args: dirname (str): directory to change to mkdir (bool): Make the directory? is_pkunit_prefix (bool): If True, sets pkunit_prefix. Returns: str: current directory before `chdir` """ global pkunit_prefix prev_d = py.path.local().realpath() prev_ppp = pkunit_prefix try: if is_pkunit_prefix: d = py.path.local(dirname) else: d = py_path(dirname) if mkdir and not d.check(dir=1): mkdir_parent(d) os.chdir(str(d)) if is_pkunit_prefix: pkunit_prefix = py.path.local(d) yield d.realpath() finally: os.chdir(str(prev_d)) if is_pkunit_prefix: pkunit_prefix = prev_ppp
[docs] def sorted_glob(pattern, key=None): """Returns sorted list of files & dirs matching pattern. Use '**' in pattern for recursive search, else, use * as wildcard. Sorts using key if provided, else in ascending order. Doesn't include dot files unless dot "." is included explicitly at the start of a path component. Doesn't include . and .. To return files only, see walk_tree(). Args: pattern (py.path.Local or str): to match file paths key (str): used to sort, must be name of py.path.Local attribute Returns: list: py.path.Local objects in sorted order """ def _path_sort_attr(path): a = getattr(path, key) if callable(a): return a() return a return sorted( (py_path(f) for f in glob.iglob(str(pattern), recursive=True)), key=_path_sort_attr if key else None, )
[docs] def unchecked_remove(*paths): """Remove files or directories, ignoring OSError. Will not remove '/' or '.' Args: paths (str): paths to remove """ cwd = py_path() for a in paths: p = py_path(a) assert len(p.parts()) > 1, "{}: will not remove root directory".format(p) assert cwd != p, "{}: will not remove current directory".format(p) try: os.remove(str(a)) except OSError: try: shutil.rmtree(str(a), ignore_errors=True) except OSError: pass
[docs] def walk_tree(dirname, file_re=None): """Returns list of all files (only) in dirname (recursive), sorted in ascending order. Include file_re to filter results. Includes dot files, but not . and .. To include dirs in results, see sorted_glob(). Args: dirname (str): top-level directory to walk file_re (re or str): Optionally, only return files which match the regular expression Returns: list: py.path.Local objects in sorted order """ def _walk(dirname): for r, _, x in os.walk(dirname): r = py_path(r) for f in x: yield r.join(f) if not file_re: res = _walk(dirname) else: if not hasattr(file_re, "search"): file_re = re.compile(file_re) d = py_path(dirname) res = [] for p in _walk(dirname): if file_re.search(d.bestrelpath(p)): res.append(p) return sorted(res)
[docs] def write_binary(path, contents): """Open file, write binary, and close. Args: path (str or py.path.Local): Path of file to write to contents (bytes): New contents Returns: py.path.local: `filename` as :class:`py.path.Local` """ p = py_path(path) p.write_binary(contents) return p
[docs] def write_text(path, contents): """Open file, write text with utf-8, and close. Args: path (str or py.path.Local): Path of file to write to contents (str or bytes): New contents Returns: py.path.local: `filename` as :class:`py.path.Local` """ from pykern import pkcompat p = py_path(path) try: with io.open(str(p), "wt", encoding=TEXT_ENCODING) as f: f.write(pkcompat.from_bytes(contents)) except Exception as e: pkinspect.append_exception_reason(e, f"path={path}") raise return p