#! /usr/bin/env python
"""
DistributionMaker class definition and a simple script to generate, save, and
plot a distribution from pipeline config file(s).
"""
from __future__ import absolute_import
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
from collections import OrderedDict
from collections.abc import Mapping
import inspect
import os
from tabulate import tabulate
import numpy as np
from pisa import ureg
from pisa.core.map import MapSet
from pisa.core.pipeline import Pipeline
from pisa.core.param import ParamSet
from pisa.utils.config_parser import PISAConfigParser
from pisa.utils.fileio import expand, mkdir, to_file
from pisa.utils.hash import hash_obj
from pisa.utils.log import set_verbosity, logging
from pisa.utils.random_numbers import get_random_state
__all__ = ['DistributionMaker', 'test_DistributionMaker', 'parse_args', 'main']
__author__ = 'J.L. Lanfranchi, P. Eller'
__license__ = '''Copyright (c) 2014-2017, The IceCube Collaboration
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.'''
[docs]
class DistributionMaker(object):
"""Container for one or more pipelines; the outputs from all contained
pipelines are added together to create the distribution.
Parameters
----------
pipelines : Pipeline or convertible thereto, or iterable thereof
A new pipline is instantiated with each object passed. Legal objects
are already-instantiated Pipelines and anything interpret-able by the
Pipeline init method.
label : str or None, optional
A label for the DistributionMaker.
set_livetime_from_data : bool, optional
If a (data) pipeline is found with the attr `metadata` and which has
the contained key "livetime", this livetime is used to set the livetime
on all pipelines which have param `params.livetime`. If multiple such
data pipelines are found and `set_livetime_from_data` is True, all are
checked for consistency (you should use multiple `Detector`s if you
have incompatible data sets).
profile : bool
timing of inidividual pipelines / stages
Notes
-----
Free params with the same name in two pipelines are updated at the same
time so long as you use the `update_params`, `set_free_params`, or
`_set_rescaled_free_params` methods. Also use `select_params` to select
params across all pipelines (if a pipeline does not have one or more of
the param selectors specified, those param selectors have no effect in
that pipeline).
`_*_rescaled_*` properties and methods are for interfacing with a
minimizer, where values are linearly mapped onto the interval [0, 1]
according to the parameter's allowed range. Avoid interfacing with these
except if using a minimizer, since, e.g., units are stripped and values and
intervals are non-physical.
"""
def __init__(self, pipelines, label=None, set_livetime_from_data=True, profile=False):
self.label = label
self._source_code_hash = None
self.metadata = OrderedDict()
self._profile = profile
self._pipelines = []
if isinstance(pipelines, (str, PISAConfigParser, OrderedDict,
Pipeline)):
pipelines = [pipelines]
for pipeline in pipelines:
if not isinstance(pipeline, Pipeline):
pipeline = Pipeline(pipeline, profile=profile)
else:
if profile:
# Only propagate if set to `True` (don't allow negative
# default to negate any original choice for the instance)
pipeline.profile = profile
self._pipelines.append(pipeline)
data_run_livetime = None
if set_livetime_from_data:
#
# Get livetime metadata if defined in any stage in any pipeline
#
for pipeline_idx, pipeline in enumerate(self):
for stage_idx, stage in enumerate(pipeline):
if not (
hasattr(stage, "metadata")
and isinstance(stage.metadata, Mapping)
and "livetime" in stage.metadata
):
continue
if data_run_livetime is None:
data_run_livetime = stage.metadata["livetime"]
if stage.metadata["livetime"] != data_run_livetime:
raise ValueError(
"Pipeline index {}, stage index {} has data"
" livetime = {}, in disagreement with"
" previously-found livetime = {}".format(
pipeline_idx,
stage_idx,
stage.metadata["livetime"],
data_run_livetime,
)
)
# Save the last livetime found inside the pipeline's metadata
# TODO: implement metadata in the pipeline class instead
self.metadata['livetime'] = data_run_livetime
#
# Set param `params.livetime` for any pipelines that have it
#
if data_run_livetime is not None:
data_run_livetime *= ureg.sec
for pipeline_idx, pipeline in enumerate(self):
if "livetime" not in pipeline.params.names:
continue
pipeline.params.livetime.is_fixed = True
if pipeline.params.livetime != data_run_livetime:
logging.warning(
"Pipeline index %d has params.livetime = %s, in"
" disagreement with data livetime = %s defined by"
" data. The pipeline's livetime param will be"
" reset to the latter value and set to be fixed"
" (if it is not alredy).",
pipeline_idx,
pipeline.params.livetime.value,
data_run_livetime,
)
pipeline.params.livetime = data_run_livetime
#for pipeline in self:
# pipeline.select_params(self.param_selections,
# error_on_missing=False)
# Make sure that all the pipelines have the same detector name (or None)
self.detector_name = 'no_name'
for p in self._pipelines:
name = p.detector_name
if name != self.detector_name and self.detector_name != 'no_name':
raise NameError(
'Different detector names in distribution_maker pipelines'
)
self.detector_name = name
# set parameters with an identical name to the same object
# otherwise we get inconsistent behaviour when setting repeated params
# See Isues #566 and #648
# Also, do this for all selections!
original_selection = self.param_selections
all_selections = set()
for pipeline in self:
for stage in pipeline.stages:
all_selections.update(stage._param_selector._selector_params.keys())
for selection in all_selections:
self.select_params(selection)
all_params = self.params
for pipeline in self:
pipeline.update_params(all_params, existing_must_match=True, extend=False)
self.select_params(original_selection)
def __repr__(self):
return self.tabulate(tablefmt="presto")
def _repr_html_(self):
return self.tabulate(tablefmt="html")
[docs]
def tabulate(self, tablefmt="plain"):
headers = ['pipeline number', 'name', 'detector name', 'output_binning', 'output_key', 'profile']
colalign=["right"] + ["center"] * (len(headers) -1 )
table = []
for i, p in enumerate(self.pipelines):
table.append([i, p.name, p.detector_name, p.output_binning, p.output_key, p.profile])
return tabulate(table, headers, tablefmt=tablefmt, colalign=colalign)
def __iter__(self):
return iter(self._pipelines)
[docs]
def report_profile(self, detailed=False, format_num_kwargs=None):
"""Report timing information on contained pipelines.
See `Pipeline.report_profile` for details.
"""
for pipeline in self.pipelines:
pipeline.report_profile(
detailed=detailed, format_num_kwargs=format_num_kwargs
)
@property
def profile(self):
return self._profile
@profile.setter
def profile(self, value):
for pipeline in self.pipelines:
pipeline.profile = value
self._profile = value
[docs]
def run(self):
"""Run all pipelines"""
for pipeline in self:
pipeline.run()
[docs]
def setup(self):
"""Setup (reset) all pipelines"""
for p in self:
p.setup()
[docs]
def get_outputs(self, return_sum=False, sum_map_name='total',
sum_map_tex_name='Total', **kwargs):
"""Compute and return the outputs.
Parameters
----------
return_sum : bool
If True, add up all Maps in all MapSets returned by all pipelines.
The result will be a single Map contained in a MapSet.
If False, return a list where each element is the full MapSet
returned by each pipeline in the DistributionMaker.
**kwargs
Passed on to each pipeline's `get_outputs` method.
Returns
-------
MapSet if `return_sum=True` or list of MapSets if `return_sum=False`
"""
outputs = [pipeline.get_outputs(**kwargs) for pipeline in self] # pylint: disable=redefined-outer-name
if return_sum:
# Case where the output of a pipeline is a mapSet
if isinstance(outputs[0], MapSet):
outputs = sum([sum(x) for x in outputs]) # This produces a Map
outputs.name = sum_map_name
outputs.tex = sum_map_tex_name
outputs = MapSet(outputs) # final output must be a MapSet
# Case where the output of a pipeline is a list of different MapSets
elif isinstance(outputs[0], list):
outs = []
for i in range(len(outputs[0])):
o = sum([sum(x) for x in np.array(outputs)[:, i]])
o.name = sum_map_name
o.tex = sum_map_tex_name
outs.append(MapSet(o))
outputs = outs
return outputs
[docs]
def update_params(self, params):
for pipeline in self:
pipeline.update_params(params)
[docs]
def select_params(self, selections, error_on_missing=True):
successes = 0
if selections is not None:
for pipeline in self:
try:
pipeline.select_params(selections, error_on_missing=True)
except KeyError:
pass
else:
successes += 1
if error_on_missing and successes == 0:
raise KeyError(
'None of the stages from any pipeline in this distribution'
' maker has all of the selections %s available.'
%(selections,)
)
else:
for pipeline in self:
possible_selections = pipeline.param_selections
if possible_selections:
logging.warning(
"Although you didn't make a parameter "
"selection, the following were available: %s."
" This may cause issues.", possible_selections
)
[docs]
def add_covariance(self,covmat):
"""
Incorporates covariance between parameters.
This is done by replacing relevant correlated parameters with "DerivedParams"
that depend on new parameters in an uncorrelated basis
The parameters are all updated, but this doesn't add the new parameters in
So we go to the first stage we find that has one of the original parameters and manually add this in
See the docstring in "pisa.core.param.ParamSet" for more
"""
paramset = self.params
paramset.add_covariance(covmat)
self.update_params(paramset)
success = False
for pipeline in self.pipelines:
retval = pipeline._add_rotated(paramset, suppress_warning=True)
success = success or retval
if not success:
raise ValueError("unsuccessful?")
@property
def pipelines(self)->'list[Pipeline]':
return self._pipelines
@property
def params(self):
params = ParamSet()
for pipeline in self:
params.extend(pipeline.params)
return params
@property
def param_selections(self):
selections = set()
for pipeline in self:
selections.update(pipeline.param_selections)
return sorted(selections)
@property
def source_code_hash(self):
"""Hash for the source code of this object's class.
Not meant to be perfect, but should suffice for tracking provenance of
an object stored to disk that were produced by a Stage.
"""
if self._source_code_hash is None:
self._source_code_hash = hash_obj(inspect.getsource(self.__class__))
return self._source_code_hash
@property
def hash(self):
return hash_obj([self.source_code_hash] + [p.hash for p in self])
@property
def num_events_per_bin(self):
'''
returns an array of bin indices where none of the
pipelines have MC events
assumes that all pipelines have the same binning output specs
number of events is taken out of the last stage of the pipeline
'''
num_bins = self.pipelines[0].stages[-1].output_specs.tot_num_bins
num_events_per_bin = np.zeros(num_bins)
for p in self.pipelines:
assert p.stages[-1].output_specs.tot_num_bins==num_bins, 'ERROR: different pipelines have different binning'
for c in p.stages[-1].data:
for index in range(num_bins):
index_mask = c.array_data['bin_{}_mask'.format(index)].get('host')
current_weights = c.array_data['weights'].get('host')[index_mask]
n_weights = current_weights.shape[0]
num_events_per_bin[index] += n_weights
return num_events_per_bin
@property
def empty_bin_indices(self):
'''Find indices where there are no events present
'''
empty_counts = self.num_events_per_bin == 0
indices = np.where(empty_counts)[0]
return indices
[docs]
def set_free_params(self, values):
"""Set free parameters' values.
Parameters
----------
values : list of quantities
"""
for name, value in zip(self.params.free.names, values):
for pipeline in self:
if name in pipeline.params.free.names:
pipeline.params[name] = value
elif name in pipeline.params.names:
raise AttributeError(
'Trying to set value for "%s", a parameter that is'
' fixed in at least one pipeline' %name
)
[docs]
def randomize_free_params(self, random_state=None):
if random_state is None:
random = np.random
else:
random = get_random_state(random_state)
n = len(self.params.free)
rand = random.rand(n)
self._set_rescaled_free_params(rand)
[docs]
def reset_all(self):
"""Reset both free and fixed parameters to their nominal values."""
for p in self:
p.params.reset_all()
[docs]
def reset_free(self):
"""Reset only free parameters to their nominal values."""
for p in self:
p.params.reset_free()
[docs]
def set_nominal_by_current_values(self):
"""Define the nominal values as the parameters' current values."""
for p in self:
p.params.set_nominal_by_current_values()
def _set_rescaled_free_params(self, rvalues):
"""Set free param values given a simple list of [0,1]-rescaled,
dimensionless values
"""
names = self.params.free.names
for pipeline in self:
for name, rvalue in zip(names, rvalues):
if name in pipeline.params.free.names:
pipeline.params[name]._rescaled_value = rvalue # pylint: disable=protected-access
elif name in pipeline.params.names:
raise AttributeError(
'Trying to set value for "%s", a parameter that is'
' fixed in at least one pipeline' %name
)
[docs]
def test_DistributionMaker():
"""Unit tests for DistributionMaker"""
#
# Test: select_params and param_selections
#
# TODO: make test config file with materials param selector, then uncomment
# removed tests below
hierarchies = ['nh', 'ih']
#materials = ['iron', 'pyrolite']
materials = []
t23 = dict(
ih=49.5 * ureg.deg,
nh=42.3 * ureg.deg
)
YeO = dict(
iron=0.4656,
pyrolite=0.4957
)
# Instantiate with two pipelines: first has both nh/ih and iron/pyrolite
# param selectors, while the second only has nh/ih param selectors.
dm = DistributionMaker(
['settings/pipeline/example.cfg', 'settings/pipeline/example.cfg']
)
#current_mat = 'iron'
current_hier = 'nh'
#for new_hier, new_mat in product(hierarchies, materials):
for new_hier in hierarchies:
#assert dm.param_selections == sorted([current_hier, current_mat]), \
# str(dm.param_selections)
#assert dm.param_selections == sorted([current_hier, 'earth']), str(dm.param_selections)
assert dm.params.theta23.value == t23[current_hier], str(dm.params.theta23)
#assert dm.params.YeO.value == YeO[current_mat], str(dm.params.YeO)
# Select just the hierarchy
dm.select_params(new_hier)
#assert dm.param_selections == sorted([new_hier, current_mat]), \
# str(dm.param_selections)
#assert dm.param_selections == sorted([new_hier, 'earth']), str(dm.param_selections)
assert dm.params.theta23.value == t23[new_hier], str(dm.params.theta23)
#assert dm.params.YeO.value == YeO[current_mat], str(dm.params.YeO)
## Select just the material
#dm.select_params(new_mat)
#assert dm.param_selections == sorted([new_hier, new_mat]), \
# str(dm.param_selections)
#assert dm.params.theta23.value == t23[new_hier], \
# str(dm.params.theta23)
#assert dm.params.YeO.value == YeO[new_mat], \
# str(dm.params.YeO)
# Reset both to "current"
#dm.select_params([current_mat, current_hier])
dm.select_params(current_hier)
#assert dm.param_selections == sorted([current_hier, current_mat]), \
# str(dm.param_selections)
#assert dm.param_selections == sorted([current_hier, 'earth']), str(dm.param_selections)
assert dm.params.theta23.value == t23[current_hier], str(dm.params.theta23)
#assert dm.params.YeO.value == YeO[current_mat], str(dm.params.YeO)
## Select both hierarchy and material
#dm.select_params([new_mat, new_hier])
#assert dm.param_selections == sorted([new_hier, new_mat]), \
# str(dm.param_selections)
#assert dm.params.theta23.value == t23[new_hier], \
# str(dm.params.theta23)
#assert dm.params.YeO.value == YeO[new_mat], \
# str(dm.params.YeO)
#current_hier = new_hier
#current_mat = new_mat
# test profile flag
p_cfg = 'settings/pipeline/example.cfg'
p = Pipeline(p_cfg, profile=True)
dm = DistributionMaker(pipelines=p)
# default init using Pipeline instance shouldn't negate
assert dm.pipelines[0].profile
# but explicit request should
dm.profile = False
assert not dm.pipelines[0].profile
# now init from cfg path and request profile
dm = DistributionMaker(pipelines=p_cfg, profile=True)
assert dm.pipelines[0].profile
# explicitly request no profile
dm = DistributionMaker(pipelines=p_cfg, profile=False)
assert not dm.pipelines[0].profile
[docs]
def parse_args():
"""Get command line arguments"""
parser = ArgumentParser(
description='''Generate, store, and plot a distribution from pipeline
configuration file(s).''',
formatter_class=ArgumentDefaultsHelpFormatter
)
parser.add_argument(
'-p', '--pipeline', type=str, required=True,
metavar='CONFIGFILE', action='append',
help='''Settings file for each pipeline (repeat for multiple).'''
)
parser.add_argument(
'--select', metavar='PARAM_SELECTIONS', nargs='+', default=None,
help='''Param selectors (separated by spaces) to use to override any
defaults in the config file.'''
)
parser.add_argument(
'--return-sum', action='store_true',
help='''Return a sum of the MapSets output by the distribution maker's
pipelines as a single map (as opposed to a list of MapSets, one per
pipeline)'''
)
parser.add_argument(
'--outdir', type=str, action='store',
help='Directory into which to store the output'
)
parser.add_argument(
'--pdf', action='store_true',
help='''Produce pdf plot(s).'''
)
parser.add_argument(
'--png', action='store_true',
help='''Produce png plot(s).'''
)
parser.add_argument(
'-v', action='count', default=None,
help='Set verbosity level'
)
args = parser.parse_args()
return args
[docs]
def main(return_outputs=False):
"""Main; call as script with `return_outputs=False` or interactively with
`return_outputs=True`"""
from pisa.utils.plotter import Plotter
args = parse_args()
set_verbosity(args.v)
plot_formats = []
if args.pdf:
plot_formats.append('pdf')
if args.png:
plot_formats.append('png')
distribution_maker = DistributionMaker(pipelines=args.pipeline) # pylint: disable=redefined-outer-name
if args.select is not None:
distribution_maker.select_params(args.select)
outputs = distribution_maker.get_outputs(return_sum=args.return_sum) # pylint: disable=redefined-outer-name
if args.outdir:
# TODO: unique filename: append hash (or hash per pipeline config)
fname = 'distribution_maker_outputs.json.bz2'
mkdir(args.outdir)
fpath = expand(os.path.join(args.outdir, fname))
to_file(outputs, fpath)
if args.outdir and plot_formats:
my_plotter = Plotter(
outdir=args.outdir,
fmt=plot_formats, log=False,
annotate=False
)
for num, output in enumerate(outputs):
my_plotter.plot_2d_array(
output,
fname='dist_output_%d' % num
)
if return_outputs:
return distribution_maker, outputs
if __name__ == '__main__':
distribution_maker, outputs = main(return_outputs=True)