Source code for pisa.utils.config_parser

"""
Parse a config file into a dict containing an item for every analysis
stage, that itself contains all necessary instantiation arguments/objects for
that stage. For an example config file, please consider
:file:`$PISA/pisa_examples/resources/settings/pipeline/example.cfg`

Config File Structure
=====================

A pipeline config file is expected to contain something like the following,
with the sections ``[pipeline]`` and corresponding ``[stage.service]``
required, in addition to a ``[binning]`` section:

.. code-block:: cfg

    #include file_x.cfg as x
    #include file_y.cfg as y

    [pipeline]

    order = stageA.serviceA, stageB.serviceB

    output_binning = binning1
    output_key = weights, errors


    [binning]

    #include generic_binning.cfg

    binning1.order = axis1, axis2
    binning1.axis1 = {
        'num_bins': 40, 'is_log': True, 'domain': [1,80] units.GeV, 'tex': r'A_1'
        }
    binning1.axis2 = {
        'num_bins': 10, 'is_lin': True, 'domain': [1,5], 'tex': r'A_2'
        }


    [stageA.serviceA]

    calc_mode = binning1
    apply_mode = binning1
    error_method = None
    debug_mode = False

    param.p1 = 0.0 +/- 0.5 units.deg
    param.p1.fixed = False
    param.p1.range = nominal + [-2.0, +2.0] * sigma

    param.selector1.p2 = 0.5 +/- 0.5 units.deg
    param.selector1.p2.fixed = False
    param.selector1.p2.range = nominal + [-2.0, +2.0] * sigma

    param.selector2.p2 = -0.5 +/- 0.1 units.deg
    param.selector2.p2.fixed = False
    param.selector2.p2.range = nominal + [-2.0, +2.0] * sigma


    [stageB.serviceB]

    calc_mode = binning1
    apply_mode = binning1
    error_method = None
    debug_mode = False

    param.p1 = ${stageA.serviceA:param.p1}
    param.selector1.p2 = ${stageA.serviceA:param.selector1.p2}
    param.selector2.p2 = ${stageA.serviceA:param.selector2.p2}
    ...

* ``#include`` statements can be used to include other config files. The
  #include statement must be the first non-whitespace on a line, and these
  statements can be used anywhere within a config file.
* ``#include resource as xyz`` statements behave similarly, but prepend the
  included file's text with a section header containing ``xyz`` in this case.
* ``pipeline`` is the top-most section that defines the hierarchy of stages
  and services to be instantiated. An ``output_binning`` is required to be able
  to get a :class:`pisa.core.map.MapSet` (set of histograms) output for
  the pipeline; ``output_key`` then specifies the keys of the data passed
  through the pipeline which contain histogram weights and (if desired) errors
  (note: the presence of these keys is in general not obvious from a given
  pipeline config file itself)
* ``binning`` can contain different binning definitions, that are then later
  referred to from within the ``stage.service`` sections.
* ``stage.service``: one such section per stage.service is necessary. It may
  contain the options ``debug_mode``, ``error_method``, ``calc_mode``,
  ``apply_mode``, which are common to all stages, and must contain all the
  necessary arguments and parameters for a given stage.service.
* Duplicate section headers and duplicate keys within a section are illegal.


Param definitions
-----------------

Every key in a stage section that starts with ``param.<name>`` is interpreted and
parsed into a :class:`pisa.core.param.Param` object. These can be strings
(e.g. a filename--but don't use any quotation marks) or quantities (numbers
with units).

Quantities expect an expression that can be converted by the
:func:`parse_quantity` function. The expression for a quantity can optionally
include a simple Gaussian prior and units. The simplest definition of a
quantity with neither Gaussian prior nor units would look something like this:

.. code-block:: cfg

    param.p1 = 12.5

Gaussian priors can be included for a quantity using ``+/-`` notation, where
the number that follows ``+/-`` is the standard deviation. E.g.:

.. code-block:: cfg

    param.p1 = 12.5 +/- 2.3

If no units are explicitly set for a quantity, it is taken to be a quantity
with special units ``dimensionless``. Units can be set by multiplying (using
``*``) by ``units.<unit>`` where ``<unit>`` is the short or long name
(optionally including metric prefix) of a unit. For example, the following
lines set equivalent values for params ``p1`` and ``p2``:

.. code-block:: cfg

    param.p1 = 12.5 * units.GeV
    param.p2 = 12.5 * units.gigaelectronvolt

and this can be combined with the Gaussian-prior ``+/-`` notation:

.. code-block:: cfg

    param.p1 = 12.5 +/- 2.3 * units.GeV

Additional arguments to a parameter are passed in with the ``.`` notation, for
example ``param.p1.fixed = False``, which makes p1 a free parameter in the
fit (by default a parameter is fixed unless specified like this).

A uniform, spline, or Jeffreys :class:`pisa.core.prior.Prior` can also be set
using the ``.prior`` attribute:

.. code-block:: cfg

    param.p1 = 12.5
    param.p1.prior = uniform

    param.p2 = 12.5
    param.p2.prior = spline
    param.p2.prior.data = resource_loc

    param.p3 = 12.5
    param.p3.prior = jeffreys

In the second case, a ``.prior.data`` option will be expected, pointing to the
spline data file.
If no prior is specified, it is taken to have no prior (or, equivalently, a
uniform prior with no penalty).

A range must be given for a free parameter. Either as absolute range ``[x,y]``
or in conjunction with the keywords ``nominal`` (= nominal parameter value) and
``sigma`` if the param was specified with the ``+/-`` notation.


N.B.
++++
Params that have the same name in multiple stages of the pipeline are
instantiated as references to a single param in memory, so updating one updates
all of them.

Note that this mechanism of synchronizing parameters holds only within the
scope of a single pipeline; synchronization of parameters across pipelines is
done by adding the pipelines to a single
:class:`pisa.core.distribution_maker.DistributionMaker` object and updating
params through the DistributionMaker's
:func:`pisa.core.distribution_maker.DistributionMaker.update_params` method.

If you DO NOT want parameters to be synchronized, provide a ``unique_id`` for them.
This is simply done by setting ``.unique_id``


Param selector
--------------

A special mechanism allows the user to specify different values for
the same param via the :class:`pisa.core.param.ParamSelector` mechanism.
This can be used for example for hypothesis testing, where for hypothesis A a
param takes one value and for hypothesis B another.

A given param, say ``foo``, then needs two definitions like the following,
assuming we name our selections ``A`` and ``B``:

.. code-block:: cfg

    param.A.foo = 1
    param.B.foo = 2

The default param selector needs to be specified in the ``pipeline`` section as
e.g.

.. code-block:: cfg

    param_selections = A

, which will default to the value of 1 for param ``foo``. An instantiated
pipeline can dynamically switch to another selection after instantiation.

Multiple different param selections are allowed in a single config. In the
default selection they must be separated by commas.


N.B.
++++
Currently, for better or worse, the param selector mechanism requires at least
one stage which contains `all` of the specified selections.


"""

# TODO: consistency, etc.
# * Order-independent hashing of the PISAConfigParser object (recursively sort
#   contents?). This is still a worse idea than hashing on instantiated PISA
#   objects since things like meaningless whitespace will modify the hash of
#   the config.
# * Add explicit gaussian prior (should NOT just rely on +/- notation to make
#   consistent with other priors)
# * Furthermore, all priors should be able to be defined in one line, e.g.:
#     p1.prior = guassian: std_dev = 1.2
#     p2.prior = uniform
#     p3.prior = spline: data = resource/location/config.cfg
#     p4.prior = None
# * Make interoperable with pisa.utils.resources. I.e., able to work with
#   Python package resources, not just filesystem files.
# * Docstrings
# * TODO: add try: except: blocks around class instantiation calls to give
#   maximally useful error info to the user (spit out a good message, but then
#   re-raise the exception)


from __future__ import absolute_import, division

from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser
from collections import OrderedDict
from collections.abc import Mapping
from io import StringIO
import math
from os.path import abspath, expanduser, expandvars, isfile, join
import re
import sys

from configparser import (
    RawConfigParser, ExtendedInterpolation, DuplicateOptionError,
    SectionProxy, MissingSectionHeaderError, DuplicateSectionError,
    NoSectionError
)
import numpy as np
from uncertainties import ufloat, ufloat_fromstr

from pisa import ureg
from pisa.utils.fileio import from_file
from pisa.utils.format import split
from pisa.utils.hash import hash_obj
from pisa.utils.log import Levels, logging, set_verbosity
from pisa.utils.resources import find_resource


__all__ = ['PARAM_RE', 'PARAM_ATTRS', 'STAGE_SEP',
           'parse_quantity', 'parse_string_literal',
           'interpret_param_subfields', 'parse_param', 'parse_pipeline_config',
           'MutableMultiFileIterator', 'PISAConfigParser']

__author__ = 'P. Eller, J. Lanfranchi'

__license__ = '''Copyright (c) 2014-2026, The IceCube Collaboration

 Licensed under the Apache License, Version 2.0 (the "License");
 you may not use this file except in compliance with the License.
 You may obtain a copy of the License at

   http://www.apache.org/licenses/LICENSE-2.0

 Unless required by applicable law or agreed to in writing, software
 distributed under the License is distributed on an "AS IS" BASIS,
 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 limitations under the License.'''


PARAM_RE = re.compile(
    r'^param\.(?P<subfields>(([^.\s]+)(\.|$))+)',
    re.IGNORECASE
)

PARAM_ATTRS = ['range', 'prior', 'fixed', 'tex', 'scales_as_log']

STAGE_SEP = '.'

# Define names that users can specify in configs such that the eval of those
# strings works.
numpy = np # pylint: disable=invalid-name
inf = np.inf # pylint: disable=invalid-name
units = ureg # pylint: disable=invalid-name


[docs] def parse_quantity(string): """Parse a string into a pint/uncertainty quantity. Parameters ---------- string : string Returns ------- value : pint.quantity of uncertainties.core.AffineScalarFunc Examples -------- >>> quant = parse_quantity('1.2 +/- 0.7 * units.meter') >>> print(str(quant)) 1.2+/-0.7 meter >>> print('{:~}'.format(quant)) 1.2+/-0.7 m >>> print(quant.magnitude) 1.2+/-0.7 >>> print(quant.units) meter >>> print(quant.nominal_value) 1.2 >>> print(quant.std_dev) 0.7 >>> quant = parse_quantity('5 * units.gigametric_ton') >>> print(quant.std_dev) nan Also note that spaces and the "*" are optional: >>> print(parse_quantity('1+/-1units.GeV')) 1.0+/-1.0 gigaelectron_volt """ value = string.replace(' ', '') if 'units.' in value: value, unit = value.split('units.') else: unit = None value = value.rstrip('*') if '+/-' in value: value = ufloat_fromstr(value) else: # Assign a std. dev. of NaN rather than 0 (see uncertainties user guide # on handling NaN) value = ufloat(float(value), float("nan")) value *= ureg(unit) return value
[docs] def parse_string_literal(string): """Evaluate a string with certain special values, or return the string. Any further parsing must be done outside this module, as this is as specialized as we're willing to be in assuming/interpreting what a string is supposed to mean. Parameters ---------- string : string Returns ------- val : bool, None, or str Examples -------- >>> print(parse_string_literal('true')) True >>> print(parse_string_literal('False')) False >>> print(parse_string_literal('none')) None >>> print(parse_string_literal('something else')) 'something else' """ if string.strip().lower() == 'true': return True if string.strip().lower() == 'false': return False if string.strip().lower() == 'none': return None return string
[docs] def interpret_param_subfields(subfields, selector=None, pname=None, attr=None): """Recursively interpret and parse parameter subfields. Parameters ---------- subfields : list of strings selector : string pname : string attr : list of strings Returns ------- infodict : dict Examples -------- >>> print(interpret_param_subfields(subfields=['nh', 'deltam31', 'range'])) {'pname': 'deltam31', 'subfields': [], 'attr': ['range'], 'selector': 'nh'} """ infodict = dict(subfields=subfields, selector=selector, pname=pname, attr=attr) # Everything has been parsed if not infodict['subfields']: return infodict # If only one field, this must be the param's name, and we're done if len(infodict['subfields']) == 1: infodict['pname'] = infodict['subfields'].pop() return interpret_param_subfields(**infodict) # Look for and remove attr field and any subsequent fields attr_indices = [] for n, field in enumerate(infodict['subfields']): if field in PARAM_ATTRS: attr_indices.append(n) # TODO: not clear what's being done here; also, would slicing be more clear # than iterating & calling pop()? if len(attr_indices) == 1: attr_idx = attr_indices[0] infodict['attr'] = [ infodict['subfields'].pop(attr_idx) for _ in range(attr_idx, len(infodict['subfields'])) ] return interpret_param_subfields(**infodict) elif len(attr_indices) > 1: raise ValueError('Found multiple attrs in config name "%s"' %pname) if len(infodict['subfields']) == 2: infodict['pname'] = infodict['subfields'].pop() infodict['selector'] = infodict['subfields'].pop() return interpret_param_subfields(**infodict) raise ValueError('Unable to parse param subfields %s' %infodict['subfields'])
[docs] def parse_param(config, section, selector, fullname, pname, value): """Parse a param specification from a PISA config file. Note that if the param specification does not include ``fixed``, ``prior``, and/or ``range``, the defaults for these are: ``fixed = True``, ``prior = None``, and ``range = None``. If a prior is specified explicitly via ``.prior``, this takes precendence, but if no ``.prior`` is specified and the param's value is parsed to be a :class:`uncertainties.AffineScalarFunc` (i.e. have ``std_dev`` attribute), a Gaussian prior is constructed from that and then the AffineScalarFunc is stripped out of the param's value (such that it is just a :class:`~pint.quantity.Quantity`). Parameters ---------- config : pisa.utils.config_parser.PISAConfigParser section : string selector : string or None fullname : string pname : string value : string Returns ------- param : pisa.core.param.Param """ # imports placed here to avoid circular imports from pisa.core.param import Param, DerivedParam from pisa.core.prior import Prior kwargs = dict(name=pname, is_fixed=True, prior=None, range=None) try: value = parse_quantity(value) kwargs['value'] = value.nominal_value * value.units except ValueError: value = parse_string_literal(value) kwargs['value'] = value # Search for explicit attr specifications if config.has_option(section, fullname + '.fixed'): kwargs['is_fixed'] = config.getboolean(section, fullname + '.fixed') if config.has_option(section, fullname + '.scales_as_log'): kwargs['scales_as_log'] = config.getboolean(section, fullname + '.scales_as_log') if config.has_option(section, fullname + '.unique_id'): kwargs['unique_id'] = config.get(section, fullname + '.unique_id') if config.has_option(section, fullname + '.tex'): kwargs['tex'] = config.get(section, fullname + '.tex') if config.has_option(section, fullname + '.range'): range_ = config.get(section, fullname + '.range') # Note: `nominal` and `sigma` are called out in the `range_` string if 'nominal' in range_: nominal = value.n * value.units # pylint: disable=unused-variable if 'sigma' in range_: sigma = value.s * value.units # pylint: disable=unused-variable range_ = range_.replace('[', 'np.array([') range_ = range_.replace(']', '])') # Strip out uncertainties from value itself (as we will rely on the # prior from here on out) kwargs['range'] = eval(range_).to(value.units) # pylint: disable=eval-used if config.has_option(section, fullname + '.function_file'): kwargs["function_file"] = config.get(section, fullname + '.function_file') if config.has_option(section, fullname + '.depends_names'): # This means this is a derived parameter, so we throw away the default # `fixed` and `prior` kwargs del kwargs['is_fixed'] del kwargs['prior'] depends = config.get(section, fullname + '.depends_names') kwargs['depends_names'] = depends.split(' ') if config.has_option(section, fullname + '.prior'): prior = str(config.get(section, fullname + '.prior')).strip().lower() if prior == 'uniform': kwargs['prior'] = Prior(kind='uniform') elif prior == 'jeffreys': kwargs['prior'] = Prior( kind='jeffreys', A=kwargs['range'][0], # pylint: disable=unsubscriptable-object B=kwargs['range'][1], # pylint: disable=unsubscriptable-object ) elif prior == 'spline': priorname = pname if selector is not None: priorname += '_' + selector data = config.get(section, fullname + '.prior.data') data = from_file(data) data = data[priorname] knots = ureg.Quantity(np.asarray(data['knots']), data['units']) knots = knots.to(value.units) coeffs = np.asarray(data['coeffs']) deg = data['deg'] kwargs['prior'] = Prior(kind='spline', knots=knots, coeffs=coeffs, deg=deg) elif prior == 'none': kwargs['prior'] = None elif 'gauss' in prior: raise Exception('Please use new style +/- notation for gaussian' ' priors in config') else: raise Exception('Prior type unknown') elif hasattr(value, 'std_dev') and not math.isnan(value.std_dev): kwargs['prior'] = Prior(kind='gaussian', mean=value.nominal_value * value.units, stddev=value.std_dev * value.units) # Strip out any uncertainties from value itself (an explicit ``.prior`` # specification takes precedence over this) if hasattr(value, 'std_dev'): value = value.nominal_value * value.units try: if "depends_names" in kwargs: param = DerivedParam(**kwargs) else: param = Param(**kwargs) except: logging.error('Failed to instantiate new Param object with kwargs %s', kwargs) raise return param
def _parse_varbinning(config, binning, order, bin_split): """Parse and initialize a `VarBinning` instance from config. Returns ------- VarBinning """ # imports placed here to avoid circular imports from pisa.core.binning import MultiDimBinning, OneDimBinning, VarBinning # pylint: disable=import-outside-toplevel try: bin_split = eval(bin_split) # pylint: disable=eval-used except: assert isinstance(bin_split, str) # Just split original str into individual selection strs bin_split = split(bin_split) else: assert isinstance(bin_split, Mapping) # If input can be parsed as dict, split events according to # the presumably contained OneDimBinning definition bin_split = OneDimBinning(**bin_split) nselections = len(bin_split) # instantiate the OneDimBinnings corresponding to each selection bins = [[] for i in range(nselections)] for bin_name in order: def_raw = config.get('binning', binning + '.' + bin_name) kwargs = eval(def_raw) # pylint: disable=eval-used if isinstance(kwargs, list): # Dedicated OneDimBinning kwargs for each selection assert len(kwargs) == nselections else: # Broadcast the universal OneDimBinning kwargs across # all selections kwargs = [kwargs] * nselections for i, kw in enumerate(kwargs): bins[i].append(OneDimBinning(name=bin_name, **kw)) mask = config['binning'].get(binning + '.mask', None) if mask is not None: mask = eval(mask) # pylint: disable=eval-used if isinstance(mask[0], list): # Dedicated mask for each selection assert len(mask) == nselections else: # Broadcast the universal mask across all selections mask = [mask] * nselections else: # No mask for any selection mask = [None] * nselections multibins = [] for i in range(nselections): mb = MultiDimBinning( dimensions=bins[i], name=binning+f"_{i}", mask=mask[i] ) multibins.append(mb) return VarBinning(binnings=multibins, selections=bin_split) def _parse_multidimbinning(config, binning, order): """Parse and initialize a `MultiDimBinning` instance from config. Returns ------- MultiDimBinning """ # imports placed here to avoid circular imports from pisa.core.binning import MultiDimBinning, OneDimBinning # pylint: disable=import-outside-toplevel bins = [] for bin_name in order: try: def_raw = config.get('binning', binning + '.' + bin_name) except: dims_defined = [ split(dim, sep='.')[1] for dim in config['binning'].keys() if dim.startswith(binning + '.') and not dim.endswith('.order') ] logging.error( "Failed to find definition of '%s' dimension of '%s'" " binning entry. Only found definition(s) of: %s", bin_name, binning, dims_defined ) del dims_defined raise try: kwargs = eval(def_raw) # pylint: disable=eval-used except: logging.error( "Failed to evaluate definition of '%s' dimension of" " '%s' binning entry:\n'%s'", bin_name, binning, def_raw ) raise try: bins.append(OneDimBinning(name=bin_name, **kwargs)) except: logging.error( "Failed to instantiate new `OneDimBinning` from '%s'" " dimension of '%s' binning entry with definition:\n" "'%s'\n", bin_name, binning, kwargs ) raise # Get the bin mask, if there is one mask = config['binning'].get(binning + '.mask', None) if mask is not None : mask = eval(mask) # pylint: disable=eval-used # Create the binning object return MultiDimBinning(dimensions=bins, name=binning, mask=mask)
[docs] def parse_pipeline_config(config): """Parse pipeline config. Parameters ---------- config : string or pisa.utils.config_parser.PISAConfigParser Returns ------- stage_dicts : OrderedDict Keys are (stage_name, service_name) tuples and values are OrderedDicts with arguments' names as keys and values as values. Some known arg values are parsed out fully into Python objects, while the rest remain as strings that must be used or parsed elsewhere. """ # Note: imports placed here to avoid circular imports from pisa.core.param import ParamSelector, DerivedParam # pylint: disable=import-outside-toplevel if isinstance(config, str): config = from_file(config) elif isinstance(config, PISAConfigParser): pass else: raise TypeError( '`config` must either be a string or PISAConfigParser. Got %s ' 'instead.' % type(config) ) if not config.has_section('binning'): raise NoSectionError( "Could not find 'binning'. Only found sections: %s" % config.sections() ) # Create binning objects binning_dict = {} # Loop over binning lines for name, value in config['binning'].items(): if name.endswith('.order'): # Found the first line in a new binning, get the individual bin # dimension definitions... order = split(config.get('binning', name)) binning, _ = split(name, sep='.') bin_split = config['binning'].get(binning + '.split', None) if bin_split is not None: # User requested split into several event samples with their # own MultiDimBinning definitions. binning_dict[binning] = _parse_varbinning(config, binning, order, bin_split) else: # Requested only one single MultiDimBinning for all events binning_dict[binning] = _parse_multidimbinning(config, binning, order) stage_dicts = OrderedDict() # Pipeline section section = 'pipeline' stage_dicts[section] = {} # Get and parse the order of the stages (and which services implement them) order = [split(x, STAGE_SEP) for x in split(config.get(section, 'order'))] # Name of pipeline if config.has_option(section, 'name'): stage_dicts[section]['name'] = config.get(section, 'name') else: stage_dicts[section]['name'] = "none" if config.has_option(section, 'output_binning'): stage_dicts[section]['output_binning'] = binning_dict[ config.get(section, 'output_binning') ] output_key = split(config.get(section, 'output_key')) if len(output_key) == 1: stage_dicts[section]['output_key'] = output_key[0] elif len(output_key) == 2: stage_dicts[section]['output_key'] = tuple(output_key) else: raise ValueError( f'''Output key should be exactly one key, or a tuple (key, error_key), but is {output_key}''' ) else: stage_dicts[section]['output_binning'] = None stage_dicts[section]['output_format'] = None stage_dicts[section]['output_key'] = None param_selections = [] if config.has_option(section, 'param_selections'): param_selections = split(config.get(section, 'param_selections')) if config.has_option(section, 'detector_name'): stage_dicts[section]['detector_name'] = config.get( section, 'detector_name' ) else: stage_dicts[section]['detector_name'] = None # Parse [stage.<stage_name>] sections and store to stage_dicts for stage, service in order: # pylint: disable=too-many-nested-blocks old_section_header = 'stage%s%s' % (STAGE_SEP, stage) new_section_header = '%s%s%s' % (stage, STAGE_SEP, service) if config.has_section(old_section_header): logging.warning( '"%s" is an old-style section header, in the future use "%s"', old_section_header, new_section_header ) section = old_section_header elif config.has_section(new_section_header): section = new_section_header else: raise IOError( 'missing section in cfg for stage "%s" service "%s"' % (stage, service) ) # Instantiate dict to store args to pass to this stage service_kwargs = OrderedDict() param_selector = ParamSelector(selections=param_selections) service_kwargs['params'] = param_selector n_params = 0 n_derived_params = 0 for fullname in config.options(section): try: value = config.get(section, fullname) except: logging.error( 'Unable to obtain value of option "%s" in section "%s".', fullname, section ) raise # See if this matches a param specification param_match = PARAM_RE.match(fullname) if param_match is not None: n_params += 1 param_match_dict = param_match.groupdict() param_subfields = param_match_dict['subfields'].split('.') # Figure out what the dotted fields represent... infodict = interpret_param_subfields(subfields=param_subfields) # If field is an attr, skip since these are located manually if infodict['attr'] is not None: continue # Check if this param already exists in a previous stage; if # so, make sure there are no specs for this param, but just a # link to previous the param object that is already # instantiated. for kw in stage_dicts.values(): # Stage did not get a `params` argument from config if not 'params' in kw: continue # Retrieve the param from the ParamSelector try: param = kw['params'].get( name=infodict['pname'], selector=infodict['selector'] ) except KeyError: continue # Make sure there are no other specs (in this section) for # the param defined defined in previous section for a in PARAM_ATTRS: if config.has_option(section, '%s.%s' %(fullname, a)): raise ValueError("Parameter spec. '%s' of '%s' " "found in section '%s', but " "parameter exists in previous " "stage!"%(a, fullname, section)) break # Param *not* found in a previous stage (i.e., no explicit # `break` encountered in `for` loop above); therefore must # instantiate it. else: param = parse_param( config=config, section=section, selector=infodict['selector'], fullname=fullname, pname=infodict['pname'], value=value ) if isinstance(param, DerivedParam): n_derived_params += 1 param_selector.update(param, selector=infodict['selector']) # If it is a binning defined in the "binning" section, use the # parsed value elif value in binning_dict.keys(): service_kwargs[fullname] = binning_dict[value] # If it's not a param spec but contains 'binning', assume it's a # binning spec for CAKE stages elif 'binning' in fullname: service_kwargs[fullname] = binning_dict[value] # it's gonna be a PI stage elif fullname in ['calc_mode', 'apply_mode', 'output_format']: value = parse_string_literal(value) # is it None? if value is None: service_kwargs[fullname] = value # is it a binning? if value in binning_dict.keys(): service_kwargs[fullname] = binning_dict[value] # whatever else: service_kwargs[fullname] = value # it's a list on in/output names list elif fullname.endswith('_names'): value = split(value) service_kwargs[fullname] = value # Otherwise it's some other stage instantiation argument; identify # this by its full name and try to interpret and instantiate a # Python object using the string else: if re.search(r'[^a-z_]units\.[a-z]+', value, flags=re.IGNORECASE): try: new_value = parse_quantity(value) new_value = new_value.nominal_value * new_value.units except ValueError: new_value = parse_string_literal(value) else: new_value = parse_string_literal(value) service_kwargs[fullname] = new_value # If no params actually specified in config, remove 'params' from the # service's keyword args if n_params == 0: service_kwargs.pop('params') # finish setting up the derived parameters if n_derived_params != 0: for param in param_selector: if isinstance(param, DerivedParam): # Give the derived parameter references to the parameters # it depends on param.dependson = [ param_selector.get(name) for name in param.depends_names ] # Store the service's kwargs to the stage_dicts stage_dicts[(stage, service)] = service_kwargs return stage_dicts
[docs] class MutableMultiFileIterator(object): """ Iterate through the lines of an already-open file (`fp`) but then can pause at any point and open and iterate through another file via the `switch_to_file` method (and this file can be paused to iterate through another, etc.). This has the effect of in-lining files within files for e.g. parsing multiple files as if they're a singe file. Which line comes from which file is also tracked for generating maximally-useful error messages, via the `location` method. Note that circular references are not allowed. Parameters ---------- fp : file-like object The (opened) main config to be read. E.g. can be an opened file, io.StringIO object, etc. fpname : string Identifier for the initially `fp` object """ def __init__(self, fp, fpname, fpath=None): self._iter_stack = [] """Stack for storing dicts with 'fp', 'fpname', 'fpath', 'lineno', and 'line' for keeping track of the hierarchy of master config & included configs""" # It's ok to not find the fpname / fpname to not be a file for the # *master* config, since this could e.g. be a io.StringIO file-like # object (`read_string`) which comes from no actual file/resource on # disk. if not fpname and hasattr(fp, 'name'): fpname = fp.name if fpath is None: try: resource = find_resource(fpname) except IOError: pass else: if isfile(resource): fpath = abspath(expanduser(expandvars(fpname))) if fpath is None: try: resource = find_resource(fpname) except IOError: pass else: if isfile(resource): fpath = resource if fpath is None: self.fpaths_processed = [] else: self.fpaths_processed = [fpath] self.fps_processed = [fp] record = dict(fp=fp, fpname=fpname, fpath=fpath, lineno=0, line='') self._iter_stack.append(record) self.file_hierarchy = OrderedDict([(fpname, OrderedDict())]) def __next__(self): """Iterate through lines in the file(s). Returns ------- line : string The next line from the current file. fpname : string The `fpname` of the file from which the line was gotten. lineno : int The line number in the file. """ if not self._iter_stack: self._cleanup() raise StopIteration try: record = self._iter_stack[-1] record['line'] = next(record['fp']) record['lineno'] += 1 return record except StopIteration: record = self._iter_stack.pop() logging.trace(('Finished processing "{fpname:s}" with {lineno:d}' ' line(s)').format(**record)) return next(self) except: self._cleanup() raise
[docs] def switch_to_file(self, fp=None, fpname=None): """Switch iterator to a new resource location to continue processing. Parameters ---------- fp : None or file-like object If `fp` is specified, this takes precedence over `fpname`. fpname : None or string Path of the file or resource to read from. This resource will be located and opened if `fp` is None. encoding Argument is passed to the builtin ``open`` function for opening the file. """ fpath = None if fp is None: assert fpname resource = find_resource(fpname) if isfile(resource): fpath = abspath(expanduser(expandvars(resource))) if fpath in self.fpaths_processed: self._cleanup() raise ValueError( 'Circular reference; already processed "%s" at path' ' "%s"' % (fpname, fpath) ) else: self._cleanup() raise ValueError('`fpname` "%s" is not a file') fp_ = open(fpath, encoding=None) else: fp_ = fp if fpname is None: if hasattr(fp_, 'name'): fpname = fp_.name else: fpname = '' try: resource = find_resource(fpname) except IOError: pass else: if isfile(resource): fpath = resource if fp in self.fps_processed: self._cleanup() raise ValueError( 'Circular reference; already processed file pointer "%s"' ' at path "%s"' % (fp_, fpname) ) if fpath is not None: if fpath in self.fpaths_processed: self._cleanup() raise ValueError( 'Circular reference; already processed "%s" at path' ' "%s"' % (fpname, fpath) ) self.fpaths_processed.append(fpath) self.fps_processed.append(fp) if fpath is not None: self.fpaths_processed.append(fpath) logging.trace('Switching to "%s" at path "%s"' % (fpname, fpath)) record = dict(fp=fp_, fpname=fpname, fpath=fpath, lineno=0, line='') self._iter_stack.append(record)
@property def location(self): """string : Full hierarchical location, formatted for display""" info = ['File hierarchy (most recent last):\n'] for record_num, record in enumerate(self._iter_stack): s = ' Line {lineno:d}, fpname "{fpname:s}"' if record_num > 0: s += ' at path "{fpath:s}"' s += '\n {line:s}' info.append(s.format(**record)) return ''.join(info) def __iter__(self): return self def __del__(self): self._cleanup() def _cleanup(self): """Close all file handles opened by this object (i.e. all except the first file pointer, which is provided as an argument to `__init__`)""" for record in self._iter_stack[1:]: record['fp'].close()
[docs] class PISAConfigParser(RawConfigParser): # pylint: disable=too-many-ancestors """ Parses a PISA config file, extending :class:`configparser.RawConfigParser` (the backport of RawConfigParser from Python 3.x) by adding the ability to include external files inline via, for example: .. code-block:: cfg #include /path/to/file.cfg #include path/to/resource.cfg #include path/to/resource2.cfg as section2 [section1] key11 = value1 key12 = ${section2:key21} key13 = value3 where the files or resources located at "/path/to/file.cfg", "path/to/resource.cfg", and "path/to/resource2.cfg" are effectively inlined wherever the ``#include`` statements occur. The ``#include path/to/resource2.cfg as section_name`` syntax prefixes the contents of ``resource2.cfg`` by a section header named "section2", expanding ``resource2.cfg`` as: .. code-block:: cfg [section2] line1 of resource2.cfg line2 of resource2.cfg ... etc. Special parsing rules we have added to make ``#include`` behavior sensible: 1. Using an ``#include file`` that contains a section header (``[section_name]``) *or* using ``#include file as section_name`` requires that the next non-blank / non-comment / non-``#include`` line be a new section header (``[section_name2]``). 2. Empty sections after fully parsing a config will raise a ``ValueError``. This is likely never a desired behavior, and should alert the user to inadvertent use of ``#include``. Also note that, unlike the default :class:`~configparser.ConfigParser` behavior, :class:`~configparser.ExtendedInterpolation` is used, whitespace surrounding text in a section header is ignored, empty lines are *not* allowed between multi-line values, and section names, keys, and values are all case-sensitive. All other options are taken as the defaults / default behaviors of :class:`~configparser.ConfigParser`. See help for :class:`configparser.ConfigParser` for further help on valid config file syntax and parsing behavior. """ _DEFAULT_INTERPOLATION = ExtendedInterpolation() INCLUDE_RE = re.compile(r'\s*#include\s+(?P<include>\S.*)') INCLUDE_AS_RE = re.compile(r'\s*(?P<file>.+)((?:\s+as\s+)(?P<as>\S+))') SECTCRE = re.compile(r'\[\s*(?P<header>[^]]+?)\s*\]') def __init__(self): #self.default_section = None #DEFAULTSECT # Instantiate parent class with PISA-specific options #super().__init__( RawConfigParser.__init__( self, interpolation=ExtendedInterpolation(), empty_lines_in_values=False, ) self.file_iterators = []
[docs] def set(self, section, option, value=None): """Set an option. Extends RawConfigParser.set by validating type and interpolation syntax on the value.""" self._validate_value_types(option=option, value=value) super().set(section, option, value)
[docs] def add_section(self, section): """Create a new section in the configuration. Extends RawConfigParser.add_section by validating if the section name is a string.""" self._validate_value_types(section=section) super().add_section(section)
[docs] def optionxform(self, optionstr): """Enable case-sensitive options in .cfg files, and force all values to be ASCII strings.""" return optionstr #.encode('ascii')
@property def hash(self): """int : Hash value of the contents (does not depend on order of sections, but does depend on order of keys within each section)""" return self.__hash__() def __hash__(self): return hash_obj([(sec, (self.items(sec))) for sec in sorted(self.sections())]) @staticmethod def _get_include_info(line): match = PISAConfigParser.INCLUDE_RE.match(line) if not match: return None include = match.groupdict()['include'] match = PISAConfigParser.INCLUDE_AS_RE.match(include) if match is None: return {'file': include, 'as': None} return match.groupdict()
[docs] def read(self, filenames, encoding=None): """Override `read` method to interpret `filenames` as PISA resource locations, then call overridden `read` method. Also, IOError fails here, whereas it is ignored in RawConfigParser. For further help on this method and its arguments, see :method:`~backports.configparser.configparser.read` """ if isinstance(filenames, str): filenames = [filenames] resource_locations = [] for filename in filenames: resource_location = find_resource(filename) if not isfile(resource_location): raise ValueError( '"%s" is not a file or could not be located' % filename ) resource_locations.append(resource_location) filenames = resource_locations # NOTE: From here on, most of the `read` method is copied, but # ignoring IOError exceptions is removed here. Python copyrights apply. if isinstance(filenames, str): filenames = [filenames] read_ok = [] for filename in filenames: with open(filename, encoding=encoding) as fp: self._read(fp, filename) read_ok.append(filename) return read_ok
# NOTE: the `_read` method is copy-pasted (then modified slightly) from # Python's backports.configparser (version 3.5.0), and so any copyright # notices at the top of this file might need modification to be compatible # with copyrights on that module. # # Also, diff this function with future releases in case something needs # modification. # pylint: disable=E,W,C,R def _read(self, fp, fpname): """Parse a sectioned configuration file. Each section in a configuration file contains a header, indicated by a name in square brackets (`[]'), plus key/value options, indicated by `name' and `value' delimited with a specific substring (`=' or `:' by default). Values can span multiple lines, as long as they are indented deeper than the first line of the value. Depending on the parser's mode, blank lines may be treated as parts of multiline values or ignored. Configuration files may include comments, prefixed by specific characters (`#' and `;' by default). Comments may appear on their own in an otherwise empty line or may be entered in lines holding values or section names. This implementation is extended from the original to also accept .. code:: ini #include <file or pisa_resource> or .. code:: ini #include <file or pisa_resource> as <section_name> syntax anywhere in the file, which switches (via :class:`MutableMultiFileIterator`) to the new file as if it were in-lined within the original file. The latter syntax also prepends a section header .. code:: ini [section_name] before the text of the specified file or pisa_resource. """ elements_added = set() cursect = None # None, or a dictionary sectname = None optname = None lineno = 0 indent_level = 0 e = None # None, or an exception file_iter = MutableMultiFileIterator(fp=fp, fpname=fpname) self.file_iterators.append(file_iter) for record in file_iter: fpname = record['fpname'] lineno = record['lineno'] line = record['line'] comment_start = sys.maxsize # strip inline comments inline_prefixes = dict( (p, -1) for p in self._inline_comment_prefixes) while comment_start == sys.maxsize and inline_prefixes: next_prefixes = {} for prefix, index in inline_prefixes.items(): index = line.find(prefix, index+1) if index == -1: continue next_prefixes[prefix] = index if index == 0 or (index > 0 and line[index-1].isspace()): comment_start = min(comment_start, index) inline_prefixes = next_prefixes # parse #include statement include_info = self._get_include_info(line) if include_info: file_iter.switch_to_file(fpname=include_info['file']) if include_info['as']: as_header = '[%s]\n' % include_info['as'] file_iter.switch_to_file( # Aaron Fienberg # commented out as part of python3 update # fp=StringIO(as_header.decode('utf-8')) fp=StringIO(as_header) ) continue # strip full line comments for prefix in self._comment_prefixes: if line.strip().startswith(prefix): comment_start = 0 break if comment_start == sys.maxsize: comment_start = None value = line[:comment_start].strip() if not value: if self._empty_lines_in_values: # add empty line to the value, but only if there was no # comment on the line if (comment_start is None and cursect is not None and optname and cursect[optname] is not None): cursect[optname].append('') # newlines added at join else: # empty line marks end of value indent_level = sys.maxsize continue # continuation line? first_nonspace = self.NONSPACECRE.search(line) cur_indent_level = first_nonspace.start() if first_nonspace else 0 if (cursect is not None and optname and cur_indent_level > indent_level): cursect[optname].append(value) # a section header or option header? else: indent_level = cur_indent_level # is it a section header? mo = self.SECTCRE.match(value) if mo: sectname = mo.group('header') if sectname in self._sections: if self._strict and sectname in elements_added: raise DuplicateSectionError(sectname, fpname, lineno) cursect = self._sections[sectname] elements_added.add(sectname) elif sectname == self.default_section: cursect = self._defaults else: cursect = self._dict() self._sections[sectname] = cursect self._proxies[sectname] = SectionProxy(self, sectname) elements_added.add(sectname) # So sections can't start with a continuation line optname = None # no section header in the file? elif cursect is None: raise MissingSectionHeaderError(fpname, lineno, line) # an option line? else: mo = self._optcre.match(value) if mo: optname, vi, optval = mo.group('option', 'vi', 'value') # pylint: disable=unused-variable if not optname: e = self._handle_error(e, fpname, lineno, line) optname = self.optionxform(optname.rstrip()) if (self._strict and (sectname, optname) in elements_added): raise DuplicateOptionError(sectname, optname, fpname, lineno) elements_added.add((sectname, optname)) # This check is fine because the OPTCRE cannot # match if it would set optval to None if optval is not None: optval = optval.strip() cursect[optname] = [optval] else: # valueless option handling cursect[optname] = None else: # a non-fatal parsing error occurred. set up the # exception but keep going. the exception will be # raised at the end of the file and will contain a # list of all bogus lines e = self._handle_error(e, fpname, lineno, line) # if any parsing errors occurred, raise an exception if e: raise e self._join_multiline_values()
def test_parse_pipeline_config(config='settings/pipeline/example.cfg'): """Unit test for function `parse_pipeline_config`""" # Load via PISAConfigParser config0 = PISAConfigParser() config0.read(config) config0 = parse_pipeline_config(config0) # Load directly config1 = parse_pipeline_config(config) logging.info('Keys and values found in config:') for key, vals in config1.items(): logging.info('%s: %s', key, vals) assert vals == config0[key] # set some option after parsing and compare config2 = PISAConfigParser() config2.read(config) suffix = '_edit' config2.set( section='pipeline', option='name', value=config1['pipeline']['name'] + suffix ) config2 = parse_pipeline_config(config2) for key, vals in config2.items(): if vals != config1[key]: assert key == 'pipeline' assert vals['name'] == config1['pipeline']['name'] + suffix def test_MutableMultiFileIterator(): """Unit test for class `MutableMultiFileIterator`""" import shutil import tempfile prefixes = ['a', 'b', 'c'] file_len = 4 reference_lines = [ # start in file a 'a0', 'a1', # switch to file b after second line of a 'b0', 'b1', # switch to file c after second line of b 'c0', 'c1', 'c2', 'c3', # switch back to b after exhausting c 'b2', 'b3', # switch back to a after exhausting b 'a2', 'a3' ] tempdir = tempfile.mkdtemp() try: # Create test files paths = [join(tempdir, prefix) for prefix in prefixes] for prefix, path in zip(prefixes, paths): with open(path, 'w') as f: for i in range(file_len): f.write('%s%d\n' % (prefix, i)) logging.trace(path) actual_lines = [] with open(paths[0]) as fp: file_iter = MutableMultiFileIterator(fp=fp, fpname=paths[0]) remaining_paths = paths[1:] for record in file_iter: actual_lines.append(record['line'].strip()) logging.trace(str(record)) if record['line'][1:].strip() == '1': if remaining_paths: path = remaining_paths.pop(0) file_iter.switch_to_file(fpname=path) else: for l in str(file_iter.location).split('\n'): logging.trace(l) except: shutil.rmtree(tempdir) raise if actual_lines != reference_lines: raise ValueError('<< FAIL : test_MutableMultiFileIterator >>') logging.info('<< PASS : test_MutableMultiFileIterator >>') def parse_args(): """Parse command line arguments""" parser = ArgumentParser( description='Print contents of a parsed config file', formatter_class=ArgumentDefaultsHelpFormatter, ) parser.add_argument( 'config', metavar='CONFIGFILE', nargs='?', default='settings/pipeline/example.cfg', help='Pipeline config file to parse', ) parser.add_argument( '-v', action='count', default=Levels.WARN, help='Set verbosity level', ) kwargs = vars(parser.parse_args()) set_verbosity(kwargs.pop('v')) return kwargs if __name__ == '__main__': test_parse_pipeline_config(**parse_args())