Source code for pisa.core.detectors

#! /usr/bin/env python
"""
Detector class definition and a simple script to generate, save, and
plot distributions for different detectors from pipeline config file(s).
A detector is represented by a DistributionMaker.

DistributionMaker: A single detector
Detectors: A sequence of detectors
"""

from __future__ import absolute_import

from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
from collections import OrderedDict
import inspect
import os
from tabulate import tabulate
from copy import deepcopy

import numpy as np

from pisa import ureg
from pisa.core.pipeline import Pipeline
from pisa.core.distribution_maker import DistributionMaker
from pisa.core.param import ParamSet, Param
from pisa.utils.config_parser import PISAConfigParser
from pisa.utils.fileio import expand, mkdir, to_file
from pisa.utils.hash import hash_obj
from pisa.utils.log import set_verbosity, logging, Levels
from pisa.utils.random_numbers import get_random_state


__all__ = ['Detectors', 'test_Detectors', 'parse_args', 'main']


[docs] class Detectors(object): """Container for one or more distribution makers, that represent different detectors. Parameters ---------- pipelines : Pipeline or convertible thereto, or iterable thereof A new pipline is instantiated with each object passed. Legal objects are already-instantiated Pipelines and anything interpret-able by the Pipeline init method. shared_params : Parameter to be treated the same way in all the distribution_makers that contain them. """ def __init__(self, pipelines, label=None, set_livetime_from_data=True, profile=False, shared_params=None): self.label = label self._source_code_hash = None self._profile = profile if shared_params == None: self.shared_params = [] else: self.shared_params = shared_params if isinstance(pipelines, (str, PISAConfigParser, OrderedDict, Pipeline)): pipelines = [pipelines] self._distribution_makers , self.det_names = [] , [] for pipeline in pipelines: if not isinstance(pipeline, Pipeline): pipeline = Pipeline(pipeline, profile=profile) name = pipeline.detector_name if name in self.det_names: self._distribution_makers[self.det_names.index(name)].append(pipeline) else: self._distribution_makers.append([pipeline]) self.det_names.append(name) if None in self.det_names and len(self.det_names) > 1: raise NameError('At least one of the used pipelines has no detector_name.') for i, pipelines in enumerate(self._distribution_makers): self._distribution_makers[i] = DistributionMaker(pipelines=pipelines, set_livetime_from_data=set_livetime_from_data, profile=profile ) for sp in self.shared_params: N, n = 0, 0 for distribution_maker in self._distribution_makers: if sp in distribution_maker.params.names: N += 1 if sp in distribution_maker.params.free.names: n += 1 if N < 2: raise NameError(f'Shared param {sp} only exists in {N} detectors.') if n > 0 and n != N: raise NameError(f'Shared param {sp} exists in {N} detectors but only a free param in {n} detectors.') self.init_params() def __repr__(self): return self.tabulate(tablefmt="presto") def _repr_html_(self): return self.tabulate(tablefmt="html")
[docs] def tabulate(self, tablefmt="plain"): headers = ['DistributionMaker number', 'detector name', 'output_binning', 'output_key', 'profile'] colalign=["right"] + ["center"] * (len(headers) -1 ) table = [] for i, d in enumerate(self.distribution_makers): p = d.pipelines[0] #assuming binning and key are the same for all pipelines in DistributionMaker table.append([i, d.detector_name, p.output_binning, p.output_key, d.profile]) return tabulate(table, headers, tablefmt=tablefmt, colalign=colalign)
def __iter__(self): return iter(self._distribution_makers)
[docs] def report_profile(self, detailed=False, format_num_kwargs=None): """Report timing information on contained distribution makers. See `Pipeline.report_profile` for details. """ for distribution_maker in self._distribution_makers: print(distribution_maker.detector_name + ':') distribution_maker.report_profile( detailed=detailed, format_num_kwargs=format_num_kwargs )
@property def profile(self): return self._profile @profile.setter def profile(self, value): for d in self.distribution_makers: d.profile = value self._profile = value
[docs] def run(self): for distribution_maker in self: distribution_maker.run()
[docs] def setup(self): """Setup (reset) all distribution makers""" for d in self: d.setup()
[docs] def get_outputs(self, **kwargs): """Compute and return the outputs. Parameters ---------- **kwargs Passed on to each distribution_maker's `get_outputs` method. Returns ------- List of MapSets if `return_sum=True` or list of lists of MapSets if `return_sum=False` """ outputs = [distribution_maker.get_outputs(**kwargs) for distribution_maker in self] return outputs
[docs] def update_params(self, params): if isinstance(params,Param): params = ParamSet(params) for distribution_maker in self.distribution_makers: ps = deepcopy(params) for p in ps.names: if distribution_maker.detector_name in p: p_name = p.replace('_'+distribution_maker.detector_name, "") if p_name in ps.names: ps.remove(p_name) ps[p].name = p_name distribution_maker.update_params(ps) self.init_params()
[docs] def select_params(self, selections, error_on_missing=True): for distribution_maker in self: distribution_maker.select_params(selections=selections, error_on_missing=error_on_missing) self.init_params()
@property def distribution_makers(self): return self._distribution_makers @property def params(self): return self._params
[docs] def init_params(self): """Returns a ParamSet including all params of all detectors. First the shared params (if there are some), then all the "single detector" params. If two detectors use a parameter with the same name (but not shared), the name of the detector is added to the parameter name (except for the first detector). """ params = ParamSet() for p_name in self.shared_params: for distribution_maker in self: try: params.extend(distribution_maker.params[p_name]) break # shared param found, can continue with the next shared param except: continue # shared param was not in this DistributionMaker, so search in the next one for distribution_maker in self: for param in distribution_maker.params: if param.name in self.shared_params: continue # shared param is already in param set, can continue with the next param elif param.name in params.names: # two parameters with the same name but not shared # add detector name to the parameter name changed_param = deepcopy(param) changed_param.name = param.name + '_' + distribution_maker.detector_name params.extend(changed_param) else: params.extend(param) self._params = params
@property def shared_param_ind_list(self): """ A list of lists (one for each detector) containing the position of the shared params in the free params of the DistributionMaker (that belongs to the detector) together with their position in the shared parameter list. """ if not self.shared_params: return [] shared_param_ind_list = [] for distribution_maker in self: free_names = distribution_maker.params.free.names spi = [] for p_name in free_names: if p_name in self.shared_params: spi.append((free_names.index(p_name), self.shared_params.index(p_name))) shared_param_ind_list.append(spi) return shared_param_ind_list @property def param_selections(self): selections = None for distribution_maker in self: if selections != None and sorted(distribution_maker.param_selections) != selections: raise AssertionError('Different param_selections for different detectors.') selections = sorted(distribution_maker.param_selections) return selections @property def source_code_hash(self): """Hash for the source code of this object's class. Not meant to be perfect, but should suffice for tracking provenance of an object stored to disk that were produced by a Stage. """ if self._source_code_hash is None: self._source_code_hash = hash_obj(inspect.getsource(self.__class__)) return self._source_code_hash @property def hash(self): return hash_obj([self.source_code_hash] + [d.hash for d in self]) @property def num_events_per_bin(self): ''' returns list of arrays of bin indices where none of the pipelines in the respective distribution maker have MC events ''' num_events_per_bin = [] for d in self.distribution_makers: num_events_per_bin.append(d.num_events_per_bin) return num_events_per_bin @property def empty_bin_indices(self): '''Find indices where there are no events present ''' num_events_per_bin = self.num_events_per_bin indices = [] for i, d in enumerate(self.distribution_makers): empty_counts = num_events_per_bin[i] == 0 indices.append(np.where(empty_counts)[0]) return indices
[docs] def set_free_params(self, values): """Set free parameters' values. Parameters ---------- values : a list of quantities """ for dist_maker in self: dist_values = [] for dist_name in dist_maker.params.free.names: for name, value in zip(self.params.free.names, values): if name == dist_name: v = value if name == dist_name + '_' + dist_maker.detector_name: v = value dist_values.append(v) dist_maker.set_free_params(dist_values)
[docs] def randomize_free_params(self, random_state=None): if random_state is None: random = np.random else: random = get_random_state(random_state) n = len(self.params.free) rand = random.rand(n) self._set_rescaled_free_params(rand)
[docs] def reset_all(self): """Reset both free and fixed parameters to their nominal values.""" for d in self: d.reset_all()
[docs] def reset_free(self): """Reset only free parameters to their nominal values.""" for d in self: d.reset_free()
[docs] def set_nominal_by_current_values(self): """Define the nominal values as the parameters' current values.""" for d in self: d.set_nominal_by_current_values()
def _set_rescaled_free_params(self, rvalues): """Set free param values given a simple list of [0,1]-rescaled, dimensionless values """ if not isinstance(rvalues,list): rvalues = list(rvalues) if self.shared_params == []: for d in self: rp = [] for j in range(len(d.params.free)): rp.append(rvalues.pop(0)) d._set_rescaled_free_params(rp) else: sp = [] # first get the shared params for i in range(len(self.shared_params)): sp.append(rvalues.pop(0)) spi = self.shared_param_ind_list for i in range(len(self._distribution_makers)): rp = [] for j in range(len(self._distribution_makers[i].params.free) - len(spi[i])): rp.append(rvalues.pop(0)) for j in range(len(spi[i])): rp.insert(spi[i][j][0], sp[spi[i][j][1]]) self._distribution_makers[i]._set_rescaled_free_params(rp)
[docs] def test_Detectors(verbosity=Levels.WARN): from pisa.analysis.analysis import update_param_values_detector """Run a combination of two DeepCore detectors.""" p1_nu = Pipeline("settings/pipeline/IceCube_3y_neutrinos.cfg") p1_mu = Pipeline("settings/pipeline/IceCube_3y_muons.cfg") p1_nu.detector_name, p1_mu.detector_name = 'detector1', 'detector1' p2_nu = Pipeline("settings/pipeline/IceCube_3y_neutrinos.cfg") p2_mu = Pipeline("settings/pipeline/IceCube_3y_muons.cfg") p2_nu.detector_name, p2_mu.detector_name = 'detector2', 'detector2' # Initializing try: set_verbosity(Levels.INFO) logging.info(f'Initializing Detectors') set_verbosity(Levels.WARN) model = Detectors([p1_nu, p1_mu, p2_nu, p2_mu], shared_params=['deltam31', 'theta13', 'theta23', 'nue_numu_ratio', 'Barr_uphor_ratio', 'Barr_nu_nubar_ratio', 'delta_index', 'nutau_norm', 'nu_nc_norm', 'opt_eff_overall', 'opt_eff_lateral', 'opt_eff_headon', 'ice_scattering', 'ice_absorption', 'atm_muon_scale']) except Exception as err: msg = f"<< Error when initializing the Detectors >>" set_verbosity(verbosity) logging.error("=" * len(msg)) logging.error(msg) logging.error("=" * len(msg)) set_verbosity(Levels.TRACE) logging.exception(err) set_verbosity(verbosity) logging.error("#" * len(msg)) else: set_verbosity(verbosity) logging.info("<< Successfully initialized Detectors >>") finally: set_verbosity(verbosity) # Get outputs try: set_verbosity(Levels.INFO) logging.info(f'Running Detectors (takes a bit)') set_verbosity(Levels.WARN) model.get_outputs() except Exception as err: msg = f"<< Error when running the Detectors >>" set_verbosity(verbosity) logging.error("=" * len(msg)) logging.error(msg) logging.error("=" * len(msg)) set_verbosity(Levels.TRACE) logging.exception(err) set_verbosity(verbosity) logging.error("#" * len(msg)) else: set_verbosity(verbosity) logging.info("<< Successfully ran Detectors >>") finally: set_verbosity(verbosity) # Change parameters set_verbosity(Levels.INFO) logging.info(f'Change parameters') set_verbosity(Levels.WARN) model.reset_free() model.params.opt_eff_lateral.value = 20 # shared parameter model.params.aeff_scale.value = 2 # only changes value for detector1 update_param_values_detector(model, model.params) o0 = model.distribution_makers[0].params.opt_eff_lateral.value.magnitude o1 = model.distribution_makers[1].params.opt_eff_lateral.value.magnitude a0 = model.distribution_makers[0].params.aeff_scale.value.magnitude a1 = model.distribution_makers[1].params.aeff_scale.value.magnitude if not o0 == 20 or not o1 == 20: msg = f"<< Error when changing shared parameter >>" set_verbosity(verbosity) logging.error("=" * len(msg)) logging.error(msg) logging.error("=" * len(msg)) elif not a0 == 2 or not a1 == 1: msg = f"<< Error when changing non-shared parameter >>" set_verbosity(verbosity) logging.error("=" * len(msg)) logging.error(msg) logging.error("=" * len(msg)) else: set_verbosity(verbosity) logging.info("<< Successfully changed parameters >>") # Unit test set_verbosity(Levels.INFO) logging.info(f'Unit test') set_verbosity(Levels.WARN) hierarchies = ['nh', 'ih'] t23 = dict( ih=49.5 * ureg.deg, nh=42.3 * ureg.deg ) current_hier = 'nh' for new_hier in hierarchies: #assert model.param_selections == [current_hier], str(model.param_selections) assert model.params.theta23.value == t23[current_hier], str(model.params.theta23) # Select the hierarchy model.select_params(new_hier) #assert model.param_selections == [new_hier], str(model.param_selections) assert model.params.theta23.value == t23[new_hier], str(model.params.theta23) # Reset to "current" model.select_params(current_hier) #assert model.param_selections == [current_hier], str(model.param_selections) assert model.params.theta23.value == t23[current_hier], str(model.params.theta23) set_verbosity(verbosity) logging.info("<< Successfully ran unit test >>") # Done logging.info(f'Detectors class test done')
[docs] def parse_args(): """Get command line arguments""" parser = ArgumentParser( description='''Generate, store, and plot distributions from different pipeline configuration file(s) for one or more detectors.''', formatter_class=ArgumentDefaultsHelpFormatter ) parser.add_argument( '-p', '--pipeline', type=str, required=True, metavar='CONFIGFILE', action='append', help='''Settings file for each pipeline (repeat for multiple).''' ) parser.add_argument( '--shared_params', type=str, default=None, action='append', help='''Shared parameters for multi det analysis (repeat for multiple).''' ) parser.add_argument( '--select', metavar='PARAM_SELECTIONS', nargs='+', default=None, help='''Param selectors (separated by spaces) to use to override any defaults in the config file.''' ) parser.add_argument( '--return-sum', action='store_true', help='''Return a sum of the MapSets output by the distribution maker's pipelines as a single map (as opposed to a list of MapSets, one per pipeline)''' ) parser.add_argument( '--outdir', type=str, action='store', help='Directory into which to store the output' ) parser.add_argument( '--pdf', action='store_true', help='''Produce pdf plot(s).''' ) parser.add_argument( '--png', action='store_true', help='''Produce png plot(s).''' ) parser.add_argument( '-v', action='count', default=None, help='Set verbosity level' ) args = parser.parse_args() return args
[docs] def main(return_outputs=False): """Main; call as script with `return_outputs=False` or interactively with `return_outputs=True`""" from pisa.utils.plotter import Plotter args = parse_args() set_verbosity(args.v) plot_formats = [] if args.pdf: plot_formats.append('pdf') if args.png: plot_formats.append('png') detectors = Detectors(args.pipeline,shared_params=args.shared_params) Names = detectors.det_names if args.select is not None: detectors.select_params(args.select) outputs = detectors.get_outputs(return_sum=args.return_sum) #outputs = outputs[0].fluctuate( # method='poisson', random_state=get_random_state([0, 0, 0])) if args.outdir: # TODO: unique filename: append hash (or hash per pipeline config) fname = 'detectors_outputs.json.bz2' mkdir(args.outdir) fpath = expand(os.path.join(args.outdir, fname)) to_file(outputs, fpath) if args.outdir and plot_formats: my_plotter = Plotter( outdir=args.outdir, fmt=plot_formats, log=False, annotate=False ) for num, output in enumerate(outputs): if args.return_sum: my_plotter.plot_2d_array( output, fname=Names[num] ) else: for out in output: my_plotter.plot_2d_array( out, fname=Names[num] ) if return_outputs: return detectors, outputs
if __name__ == '__main__': detectors, outputs = main(return_outputs=True)