Source code for pykern.pkconfig

# -*- coding: utf-8 -*-
"""Declarative module configuration with dynamic value injection

Module Declaration
------------------

Modules declare their configuration via `init`. Here is how `pkdebug`
declares its config params::

    cfg = pkconfig.init(
        control=(None, re.compile, 'Pattern to match against pkdc messages'),
        want_pid_time=(False, bool, 'Display pid and time in messages'),
        output=(None, _cfg_output, 'Where to write messages either as a "writable" or file name'),
    )

A param tuple contains three values:

    0. Default value, in the expected type
    1. Callable that can convert a string or other type into a valid value
    2. A docstring briefly explaining the configuration element

The returned ``cfg`` object is ready to use after the call. It will contain
the config params as defined or an exception will be raised.

Config Values
-------------

Configuration is returned as nested dicts. The values themselves could
be any Python object. In this case, we have a string and a file object
for the two parameters. We called `os.getcwd` and referred to
`sys.stdout` in param values.

Summary
-------

Here are the steps to configuring an application:

1. When the first module calls `init`, pkconfig gets environment variables
   to create a single dict of param values, unparsed.

2. `init` looks for the module's params in the unparsed values.

3. If the parameter is found, that value is used. Else, the default is merged
   into the dict and used.

4. The parameter value is then resolved with `str.format`. If the value
   is a `list` it will be joined with any previous value (e.g. default).

5. The resolved value is parsed using the param's declared ``parser``.

6. The result is stored in the merged config and also stored in the module's
   `Params` object .

7. Once all params have been parsed for the module, `init` returns the `Params`
   object to the module, which can then use those params to initialize itself.

:copyright: Copyright (c) 2015-2019 RadiaSoft LLC.  All Rights Reserved.
:license: http://www.apache.org/licenses/LICENSE-2.0.html

"""
from __future__ import absolute_import, division, print_function

# Import the minimum number of modules and none from pykern
# pkconfig is the first module imported by all other modules in pykern
import collections
import copy
import importlib
import inspect
import os
import re
import sys

# These modules have very limited imports to avoid loops with config imports
from pykern.pkcollections import PKDict
from pykern import pkconst
from pykern import pkinspect

#: Python version independent value of string instance check
STRING_TYPES = pkconst.STRING_TYPES

#: Environment variable holding channel (defaults to "dev")
CHANNEL_ATTR = "pykern_pkconfig_channel"

_KEY_PATTERN = "^[A-Z][A-Z0-9_]*[A-Z0-9]$"

#: Validate environ key: Must be upper case, cannot begin with non-letter or end with an underscore
_ENV_KEY_RE = re.compile(_KEY_PATTERN)

#: Validate key: Cannot begin with non-letter or end with an underscore
KEY_RE = re.compile(_KEY_PATTERN, flags=re.IGNORECASE)

#: parse_tuple splits strings on this
TUPLE_SEP = ":"

#: Order of channels from least to most stable
VALID_CHANNELS = ("dev", "alpha", "beta", "prod")

#: Channels which can have more verbose output from the server
INTERNAL_TEST_CHANNELS = VALID_CHANNELS[0:2]

#: Configuration for this module: channel
cfg = None

#: Initialized channel (same as cfg.channel)
CHANNEL_DEFAULT = VALID_CHANNELS[0]

#: Attribute to detect parser which can parse None
_PARSE_NONE_ATTR = "pykern_pkconfig_parse_none"

#: Value to add to os.environ (see `reset_state_for_testing`)
_add_to_environ = None

#: All values in environ and add_to_environ
_raw_values = None

#: All values parsed via init()
_parsed_values = None

#: Regex used by `parse_seconds`
_PARSE_SECONDS = re.compile(
    r"^(?:(\d+)d)?(?:(?:(?:(\d+):)?(\d+):)?(\d+))?$",
    flags=re.IGNORECASE,
)

#: Regex used by `parse_bytes`
_PARSE_BYTES = re.compile(r"^(\d+)([kmgtp]?)b?$", flags=re.IGNORECASE)

#: multiplier used for qualifier on `parse_bytes`
_PARSE_BYTES_MULTIPLIER = PKDict(
    k=1024,
    m=1024**2,
    g=1024**3,
    t=1024**4,
)


[docs] class ReplacedBy(tuple, object): """Container for a required parameter declaration. Example:: cfg = pkconfig.init( gone=pkconfig.ReplacedBy('pykern.pkexample.foo'), ) Args: new_name: name of new config parameter """ @staticmethod def __new__(cls, new_name): msg = "replaced by name=${}".format(new_name.upper().replace(".", "_")) return super(ReplacedBy, cls).__new__( cls, (None, lambda x: raise_error(msg), msg), )
[docs] class Required(tuple, object): """Container for a required parameter declaration. Example:: cfg = pkconfig.init( any_param=(1, int, 'A parameter with a default'), needed=pkconfig.Required(int, 'A parameter without a default'), ) Args: converter (callable): how to string to internal value docstring (str): description of parameter """ @staticmethod def __new__(cls, *args): assert len(args) == 2, "{}: len(args)!=2".format(args) return super(Required, cls).__new__(cls, (None,) + args)
[docs] class RequiredUnlessDev(tuple, object): """Container for a required parameter declaration only `in_dev_mode`. Only required if `in_dev_mode` is true. Example:: cfg = pkconfig.init( maybe_needed=pkconfig.RequiredUnlessDev('dev default', str, 'A parameter with a default'), ) Args: dev_default (object): value compatible with type converter (callable): how to string to internal value docstring (str): description of parameter """ @staticmethod def __new__(cls, *args): assert len(args) == 3, "{}: len(args)!=3".format(args) if in_dev_mode(): return args return Required(args[1], args[2])
[docs] def append_load_path(load_path): """DEPRECATED""" pass
[docs] def channel_in(*args, **kwargs): """Test against configured channel Args: args (str): list of channels to valid channel (str): channel to test (default: [cfg.channel]) Returns: bool: True if current channel in ``args`` """ if not cfg: _coalesce_values() res = False to_test = cfg.channel if kwargs and kwargs["channel"]: to_test = kwargs["channel"] assert to_test in VALID_CHANNELS, "{}: invalid channel keyword arg".format( to_test ) for a in args: assert a in VALID_CHANNELS, "{}: invalid channel to test".format(a) if a == to_test: res = True return res
[docs] def channel_in_internal_test(channel=None): """Is this a internal test channel? Args: channel (str): channel to test (default: [cfg.channel]) Returns: bool: True if current channel in (alpha, dev) """ return channel_in(*INTERNAL_TEST_CHANNELS, channel=channel)
[docs] def init(**kwargs): """Declares and initializes config params for calling module. Args: kwargs (dict): param name to (default, parser, docstring) Returns: Params: `PKDict` populated with param values """ if "_caller_module" in kwargs: # Internal use only: _values() calls init() to initialize pkconfig.cfg m = kwargs["_caller_module"] del kwargs["_caller_module"] else: if pkinspect.is_caller_main(): pkconst.builtin_print( "pkconfig.init() called from __main__; cannot configure, ignoring", file=sys.stderr, ) return None m = pkinspect.caller_module() mnp = m.__name__.split(".") for k in reversed(mnp): kwargs = {k: kwargs} decls = {} _flatten_keys([], kwargs, decls) _coalesce_values() res = PKDict() _iter_decls(decls, res) for k in mnp: res = res[k] return res
[docs] def in_dev_mode(): """Turn on developer features Returns: bool: value of `pykern.pykern.cfg.dev_mode` """ if not cfg: _coalesce_values() return cfg.dev_mode
[docs] def flatten_values(base, new): """Merge flattened values into base Keys are made all lowercase. Lists instances are prepended and not recursively merged. Args: base (object): dict-like that is already flattened new (object): dict-like that will be flattened and overriden Returns: dict: modified `base` """ new_values = {} _flatten_keys([], new, new_values) # TODO(robnagler) Verify that a value x_y_z isn't set when x_y # exists already as a None. The other way is ok, because it # clears the value unless of course it's not a dict # then it would be a type collision for k in sorted(new_values.keys()): n = new_values[k] if k in base: b = base[k] if isinstance(b, list) or isinstance(n, list): if b is None or n is None: pass elif isinstance(b, list) and isinstance(n, list): n.extend(b) else: raise_error( "{}: type mismatch between new value ({}) and base ({})".format( k.msg, n, b ), ) base[k] = n return base
[docs] def parse_none(func): """Decorator for a parser which can parse None Args: callable: function to be decorated Returns: callable: func with attr indicating it can parse None """ setattr(func, _PARSE_NONE_ATTR, True) return func
[docs] @parse_none def parse_bool(value): """Default parser for `bool` types When the parser is `bool`, it will be replaced with this routine, which parses strings and None specially. `bool` values cannot be defaulted to `None`. They must be True or False. String values which return true: t, true, y, yes, 1. False values: f, false, n, no, 0, '', None Other values are parsed by `bool`. Args: value (object): to be parsed Returns: bool: True or False """ if value is None: return False if isinstance(value, bool): return value if not isinstance(value, STRING_TYPES): return bool(value) v = value.lower() if v in ("t", "true", "y", "yes", "1"): return True if v in ("f", "false", "n", "no", "0", ""): return False raise_error("unknown boolean value={}".format(value))
[docs] def parse_bytes(value): """Parse bytes in `int` or n[KMGT]B? formats Args: value (object): to be parsed Returns: int: non-negative number of bytes """ if isinstance(value, int): if value < 0: raise_error("bytes may not be negative value={}".format(value)) return value if not isinstance(value, str): raise_error("bytes must be int or str value={}".format(value)) m = _PARSE_BYTES.search(value) if not m: raise_error("bytes must match n[KMGT]B? value={}".format(value)) v = int(m.group(1)) x = m.group(2) if x: v *= _PARSE_BYTES_MULTIPLIER[x.lower()] return v
[docs] def parse_positive_int(value): """Parse is a positive integer Args: value (object): to be parsed Returns: int: positive value """ if isinstance(value, str): try: value = int(value) except Exception as e: raise_error(f"value={value} not an integer; exception={e}") elif not isinstance(value, int): raise_error(f"value={value} must be int or str") if value <= 0: raise_error(f"value={value} must be positive") return value
[docs] def parse_seconds(value): """Parse seconds in `int` or DdH:M:S formats Args: value (object): to be parsed Returns: int: non-negative number of seconds """ if isinstance(value, int): if value < 0: raise_error(f"seconds may not be negative value={value}") return value if not isinstance(value, str): raise_error(f"seconds must be int or str value={value}") m = _PARSE_SECONDS.search(value) if not m or not any(m.groups()): raise_error(f"seconds must match [Dd][[[H:]M:]S] value={value}") v = 0 for x, i in zip((86400, 3600, 60, 1), m.groups()): if i is not None: v += int(i) * x return v
#: deprecated version of parse_seconds parse_secs = parse_seconds
[docs] @parse_none def parse_set(value): """Default parser for `set` and `frozenset` types When the parser is `set` or `frozenset`, it will be replaced with this routine, which parses strings, lists, and sets. It splits strings on ':'. Args: value (object): to be parsed Returns: frozenset: may be an empty set """ return frozenset(parse_tuple(value))
[docs] @parse_none def parse_tuple(value): """Default parser for `tuple` types When the parser is `tuple`, it will be replaced with this routine, which parses strings, lists, sets, and tuples. It splits strings on ':'. Args: value (object): to be parsed Returns: tuple: may be an empty tuple """ if value is None: return tuple() if isinstance(value, tuple): return value if isinstance(value, (list, set, frozenset)): return tuple(value) assert isinstance( value, STRING_TYPES ), "unable to convert type={} to tuple; value={}".format(type(value), value) return tuple(value.split(TUPLE_SEP))
[docs] def raise_error(msg): """Call when there is a config problem""" raise AssertionError(msg)
[docs] def reset_state_for_testing(add_to_environ=None): """Clear the raw values and append add_to_environ Only used for unit tests. ``add_to_environ`` overrides previous value. Args: add_to_environ (dict): values to augment to os.environ """ global _raw_values, _add_to_environ _raw_values = None _add_to_environ = copy.deepcopy(add_to_environ)
[docs] def to_environ(cfg_keys, values=None, exclude_re=None): """Export config (key, values) as dict for environ cfg_keys is a list of dotted words (``['pykern.pkdebug.control']``). Simple globs (``pykern.pkdebug.*``) are supported, which match any word character. This can be used to ensure there is enough depth, e.g. pykern.*.* means there must be at least two undercores in the environment variable. Only environ and add_to_environ config will be considered, not default values, which will be assumed to be processed the same way in a subprocess using this environ. Args: cfg_keys (iter): keys to find values for values (mapping): use for configuration to parse [actual config] exclude_re (object): compile re or str to ignore matches Returns: PKDict: keys and values (str) """ c = flatten_values({}, values) if values else _coalesce_values() res = PKDict() if exclude_re and isinstance(exclude_re, STRING_TYPES): exclude_re = re.compile(exclude_re, flags=re.IGNORECASE) def a(k, v): if exclude_re and exclude_re.search(k): return if not isinstance(v, STRING_TYPES): if v is None: v = "" elif isinstance(v, bool): v = "1" if v else "" elif isinstance(v, (frozenset, list, set, tuple)): v = TUPLE_SEP.join(v) else: v = str(v) res[k.upper()] = v for k in cfg_keys: k = k.lower().replace(".", "_") if "*" not in k: if k in c: a(k, c[k]) continue r = re.compile(k.replace("*", r"\w+"), flags=re.IGNORECASE) for x, v in c.items(): if r.search(x): a(x, v) return res
class _Declaration(object): """Initialize a single parameter declaration Args: name (str): for error output value (tuple or dict): specification for parameter Attributes: default (object): value to be assigned if not explicitly configured docstring (str): documentation for the parameter group (Group): None or Group instance parser (callable): how to parse a configured value required (bool): the param must be explicitly configured """ def __init__(self, value): if isinstance(value, dict): self.group = value self.parser = None self.default = None self.docstring = "" # TODO(robnagler) _group_has_required(value) self.required = False return assert len(value) == 3, "{}: declaration must be a 3-tuple".format(value) self.default = value[0] self.parser = value[1] self.docstring = value[2] assert callable(self.parser), "{}: parser must be a callable: ".format( self.parser, self.docstring ) self.group = None self.required = isinstance(value, Required) self._fixup_parser() def _fixup_parser(self): if self.parser == bool: t = (int,) self.parser = parse_bool elif self.parser == tuple: t = (tuple,) self.parser = parse_tuple elif self.parser in (set, frozenset): t = (frozenset, set, tuple) self.parser = parse_set else: return if self.required: return # better error message than what parser might put out assert isinstance( self.default, t ), "default={} must be a type={} docstring={}".format( self.default, [str(x) for x in t], self.docstring, ) # validate the default self.default = self.parser(self.default) class _Key(str, object): """Internal representation of a key for a value The str value is lowercase joined with ``_``. For debugging, ``msg`` is printed (original case, joined on '.'). The parts are saved for creating nested values. """ @staticmethod def __new__(cls, parts): self = super(_Key, cls).__new__(cls, "_".join(parts).lower()) self.parts = parts self.msg = ".".join(parts) return self def _clean_environ(): """Ensure os.environ keys are valid (no bash function names) Also sets empty string to `None`. Returns: dict: copy of a cleaned up `os.environ` """ def _value(env, k): return env[k] if len(env[k]) > 0 else None def _add_others(env, res): for k in sorted(env.keys()): if KEY_RE.search(k): x = k.upper() if x not in res: res[x] = _value(env, k) def _add_uppers(env, res): for k in sorted(env.keys()): if _ENV_KEY_RE.search(k): res[k] = _value(env, k) del env[k] res = {} env = os.environ.copy() # TODO(robnagler) this makes it easier to set debugging, but it's a hack if "pkdebug" in env and "PYKERN_PKDEBUG_CONTROL" not in env: env["PYKERN_PKDEBUG_CONTROL"] = env["pkdebug"] if _add_to_environ: env.update(_add_to_environ) _add_uppers(env, res) _add_others(env, res) return res def _coalesce_values(): """Coalesce environ and add_to_environ Sets up channel. Returns: dict: raw values """ global _raw_values, _parsed_values global cfg if _raw_values: return _raw_values values = {} env = _clean_environ() flatten_values(values, env) channel = values.get(CHANNEL_ATTR, CHANNEL_DEFAULT) assert channel in VALID_CHANNELS, "{}: invalid ${}; must be {}".format( channel, CHANNEL_ATTR.upper(), VALID_CHANNELS ) values[CHANNEL_ATTR] = channel _raw_values = values _parsed_values = dict(((_Key([k]), v) for k, v in env.items())) a = PKDict( _caller_module=sys.modules[__name__], channel=Required(str, "which (stage) function returns config"), ) cfg = init(**a) a.dev_mode = (channel_in("dev"), bool, "controls development features") cfg = init(**a) return _raw_values def _flatten_keys(key_parts, values, res): """Turns values into non-nested dict with `_Key` keys, flat Args: key_parts (list): call with ``[]`` values (dict): nested dicts of config values res (dict): result container (call with ``{}``) """ for k in values: v = values[k] k = _Key(key_parts + k.split(".")) assert KEY_RE.search(k), "{}: invalid key must match {}".format(k.msg, KEY_RE) assert not k in res, "{}: duplicate key".format(k.msg) if isinstance(v, dict): _flatten_keys(k.parts, v, res) else: # Only store leaves res[k] = v def _iter_decls(decls, res): """Iterates decls and resolves values into res Args: decls (dict): nested dictionary of a module's cfg values res (PKDict): result configuration for module """ for k in sorted(decls.keys()): # TODO(robnagler) deal with keys with '.' in them (not possible?) d = _Declaration(decls[k]) r = res for kp in k.parts[:-1]: if kp not in r: r[kp] = PKDict() r = r[kp] kp = k.parts[-1] if d.group: r[kp] = PKDict() continue try: r[kp] = _resolver(d)(k, d) except Exception as e: pkinspect.append_exception_reason(e, f"key={kp}, value={decls[k]}") raise _parsed_values[k] = r[kp] def _resolver(decl): """How to resolve values for declaration Args: decl (_Declaration): what to resolve Returns: callable: `_resolve_dict`, `_resolve_list`, or `_resolve_value` """ if dict == decl.parser: return _resolve_dict if list == decl.parser: return _resolve_list return _resolve_value def _resolve_dict(key, decl): # TODO(robnagler) assert "required" res = PKDict(copy.deepcopy(decl.default) if decl.default else {}) assert isinstance(res, dict), "{}: default ({}) must be a dict".format( key.msg, decl.default ) key_prefix = key + "_" for k in reversed(sorted(_raw_values.keys())): if k != key and not k.startswith(key_prefix): continue r = res if len(k.parts) == 1: # os.environ has only one part (no way to split on '.') # so we have to assign the key's suffix manually ki = k.parts[0][len(key_prefix) :] # TODO(robnagler) if key exists, preserve case (only for environ) else: kp = k.parts[len(key.parts) : -1] for k2 in kp: if not k2 in r: r[k2] = PKDict() else: assert isinstance( r[k2], dict ), "{}: type collision on existing non-dict ({}={})".format( k.msg, k2, r[k2] ) r = r[k2] ki = k.parts[-1] r[ki] = _raw_values[k] return res def _resolve_list(key, decl): # TODO(robnagler) assert required res = copy.deepcopy(decl.default) if decl.default else [] assert isinstance(res, list), "{}: default ({}) must be a list".format( key.msg, decl.default ) if key not in _raw_values: assert not decl.required, "{}: config value missing and is required".format(k) return res if not isinstance(_raw_values[key], list): if _raw_values[key] is None: return None raise_error( "{}: value ({}) must be a list or None".format(key.msg, _raw_values[key]), ) return _raw_values[key] + res def _resolve_value(key, decl): if key in _raw_values: res = _raw_values[key] else: assert not decl.required, "{}: config value missing and is required".format( key.msg ) res = decl.default # TODO(robnagler) FOO_BAR='' will not be evaluated. It may need to be # if None is not a valid option and there is a default if res is None and not hasattr(decl.parser, _PARSE_NONE_ATTR): return None return decl.parser(res) def _z(msg): """Useful for debugging this module""" with open("/dev/tty", "w") as f: f.write(str(msg) + "\n")