"""
Common tools for performing an analysis collected into a single class
`Analysis` that can be subclassed by specific analyses.
"""
from collections.abc import Sequence, Mapping
from collections import OrderedDict
from copy import deepcopy
from functools import partial
from operator import setitem
from itertools import product
import re
import sys
import time
import warnings
import numpy as np
import scipy
from scipy import optimize
# this is needed for the take_step option in basinhopping
from scipy._lib._util import check_random_state
# to convert from dict constraint type for differential evolution
from scipy.optimize._constraints import old_constraint_to_new
from iminuit import Minuit
import nlopt
from pkg_resources import parse_version
import pisa
from pisa import EPSILON, FTYPE, ureg
from pisa.core.map import Map, MapSet
from pisa.core.param import ParamSet, Param
from pisa.core.pipeline import Pipeline
from pisa.utils.comparisons import recursiveEquality, FTYPE_PREC, ALLCLOSE_KW
from pisa.utils.log import logging, set_verbosity
from pisa.utils.fileio import to_file
from pisa.utils.random_numbers import get_random_state
from pisa.utils.stats import (METRICS_TO_MAXIMIZE, METRICS_TO_MINIMIZE,
LLH_METRICS, CHI2_METRICS, weighted_chi2,
it_got_better, is_metric_to_maximize)
__all__ = ['MINIMIZERS_USING_SYMM_GRAD', 'MINIMIZERS_ACCEPTING_CONSTRS',
'scipy_constraints_to_callables', 'get_nlopt_inequality_constraint_funcs',
'set_minimizer_defaults', 'validate_minimizer_settings',
'Counter', 'Analysis', 'BasicAnalysis']
__author__ = 'J.L. Lanfranchi, P. Eller, S. Wren, E. Bourbeau, A. Trettin, T. Ehrhardt'
__license__ = '''Copyright (c) 2014-2024, The IceCube Collaboration
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.'''
SUPPORTED_LOCAL_SCIPY_MINIMIZERS=(
'cobyla', 'l-bfgs-b', 'nelder-mead', 'slsqp', 'trust-constr'
)
"""Local scipy minimizers that PISA currently supports via this interface."""
MINIMIZERS_USING_SYMM_GRAD = ('l-bfgs-b', 'slsqp')
"""Minimizers that use symmetrical steps on either side of a point to compute
gradients. See https://github.com/scipy/scipy/issues/4916"""
MINIMIZERS_ACCEPTING_CONSTRS = ('cobyla', 'slsqp', 'trust-constr', 'cobyqa')
"""Minimizers that allow constraints to be passed. According to
scipy docs, cobyla and slsqp require dictionaries, whereas
trust-constr and cobyqa require constraints to be passed as `LinearConstraint`
or `NonlinearConstraint` instances. However, as of now, the conversion to the
form required by the minimizer is done internally by scipy. Hence, this global
variable merely serves documentation purposes right now. Note that cobyqa is
only added in scipy version 1.14.0."""
EVAL_MSG = ('Using eval() is potentially dangerous as it can execute '
'arbitrary code! Do not store your config file in a place '
'where others have writing access!')
"""Repeatedly implemented warning about evaluating expressions representing
(in)equality constraints."""
# TODO: Observed or known scipy minimization issues that might be fixable with scipy updates:
# * SHGO ignores various local minimizer options (https://github.com/scipy/scipy/issues/20028)
# * unreliable global scipy minimization with constraints: non-negligible constraint
# violations or stepping out of bounds
[docs]
def scipy_constraints_to_callables(constr_dicts, hypo_maker):
"""Convert constraints expressions in terms of ParamSets
into callables for scipy. Overwrites "fun" entries
in `constr_dicts`.
"""
def constr_func(x, constr_func_params):
hypo_maker._set_rescaled_free_params(x)
if hypo_maker.__class__.__name__ == "Detectors":
# updates values for ALL detectors
update_param_values_detector(hypo_maker, hypo_maker.params.free)
return constr_func_params(hypo_maker.params)
logging.warning(EVAL_MSG)
assert isinstance(constr_dicts, Sequence)
for cd in constr_dicts:
assert isinstance(cd, Mapping)
# the equality constraint is specified as a function that takes a
# ParamSet as its input
assert "fun" in cd
constr = cd["fun"]
logging.debug(f"adding scipy constraint: {constr}")
if callable(constr):
constr_func_params = constr
else:
constr_func_params = eval(constr)
t = type(constr_func_params)
if not callable(constr_func_params):
raise TypeError(f"Evaluated object not a callable, but {t}.")
# overwrite
cd["fun"] = partial(constr_func, constr_func_params=constr_func_params)
def make_scipy_local_minimizer_kwargs(minimizer_settings, constrs=None, bounds=None):
"""Small helper function containing common logic for
creating minimizer keyword args in calls to global
routines via their scipy interface."""
minimizer_kwargs = deepcopy(minimizer_settings)
minimizer_kwargs["method"] = minimizer_settings["method"]["value"]
minimizer_kwargs["options"] = minimizer_settings["options"]["value"]
if constrs is not None:
minimizer_kwargs["constraints"] = constrs
if bounds is not None:
minimizer_kwargs["bounds"] = bounds
return minimizer_kwargs
[docs]
def get_nlopt_inequality_constraint_funcs(method_kwargs, hypo_maker):
"""Convert constraints expressions in terms of ParamSets
into callables for nlopt. Evals expression(s) from `method_kwargs`
and returns list of callables.
"""
def ineq_func(x, grad, ineq_func_params):
if grad.size > 0:
raise RuntimeError("gradients not supported")
hypo_maker._set_rescaled_free_params(x)
if hypo_maker.__class__.__name__ == "Detectors":
# updates values for ALL detectors
update_param_values_detector(hypo_maker, hypo_maker.params.free)
# In NLOPT, the inequality function must stay negative, while in
# scipy, the inequality function must stay positive. We keep with
# the scipy convention by flipping the sign.
return -ineq_func_params(hypo_maker.params)
assert "ineq_constraints" in method_kwargs
logging.warning(EVAL_MSG)
constr_list = method_kwargs["ineq_constraints"]
if isinstance(constr_list, str):
constr_list = [constr_list]
ineq_funcs = []
for constr in constr_list:
# the inequality function is specified as a function that takes a
# ParamSet as its input
logging.debug(f"adding nlopt constraint (must stay positive): {constr}")
if callable(constr):
ineq_func_params = constr
else:
ineq_func_params = eval(constr)
t = type(ineq_func_params)
if not callable(ineq_func_params):
raise TypeError(f"Evaluated object not a callable, but {t}.")
ineq_funcs.append(partial(ineq_func, ineq_func_params=ineq_func_params))
return ineq_funcs
def make_scipy_constraint_dict(constr_type, fun, jac=None, args=None):
"""Makes a constraint dictionary in the form accepted by scipy,
see e.g. https://docs.scipy.org/doc/scipy-1.13.1/reference/generated/scipy.optimize.minimize.html"""
assert constr_type in ["eq", "ineq"]
t = type(fun)
if not callable(fun):
raise TypeError("Constraint function has to be callable, not {t}.")
constr_dict = {'type': constr_type, 'fun': fun}
if jac is not None:
t = type(jac)
if not callable(jac):
raise TypeError("Jacobian has to be callable, not {t}.")
constr_dict['jac'] = jac
if args is not None:
assert isinstance(args, Sequence)
constr_dict['args'] = args
return constr_dict
def merge_mapsets_together(mapset_list=None):
'''Handle merging of multiple MapSets, when they come in
the shape of a dict
TODO: might not be required any longer (see various comments on
generalized_poisson_llh)
'''
if isinstance(mapset_list[0], Mapping):
new_dict = OrderedDict()
for S in mapset_list:
for k,v in S.items():
if k not in new_dict.keys():
new_dict[k] = [m for m in v.maps]
else:
new_dict[k] += [m for m in v.maps]
for k,v in new_dict.items():
new_dict[k] = MapSet(v)
else:
raise TypeError('This function only works when mapsets are provided as dicts')
return new_dict
[docs]
def set_minimizer_defaults(minimizer_settings):
"""Fill in default values for minimizer settings.
Parameters
----------
minimizer_settings : dict
Returns
-------
new_minimizer_settings : dict
"""
new_minimizer_settings = dict(
method=dict(value='', desc=''),
options=dict(value=dict(), desc=dict())
)
new_minimizer_settings.update(minimizer_settings)
sqrt_ftype_eps = np.sqrt(np.finfo(FTYPE).eps)
opt_defaults = {}
method = minimizer_settings['method']['value'].lower()
if method == 'l-bfgs-b' and FTYPE == np.float64:
# From `scipy.optimize.lbfgsb._minimize_lbfgsb`
opt_defaults.update(dict(
maxcor=10, ftol=2.2204460492503131e-09, gtol=1e-5, eps=1e-8,
maxfun=15000, maxiter=15000, iprint=-1, maxls=20
))
elif method == 'l-bfgs-b' and FTYPE == np.float32:
# Adapted to lower precision
opt_defaults.update(dict(
maxcor=10, ftol=sqrt_ftype_eps, gtol=1e-3, eps=1e-5,
maxfun=15000, maxiter=15000, iprint=-1, maxls=20
))
elif method == 'slsqp' and FTYPE == np.float64:
opt_defaults.update(dict(
maxiter=100, ftol=1e-6, iprint=0, eps=sqrt_ftype_eps,
))
elif method == 'slsqp' and FTYPE == np.float32:
opt_defaults.update(dict(
maxiter=100, ftol=1e-4, iprint=0, eps=sqrt_ftype_eps
))
elif method == 'cobyla':
opt_defaults.update(dict(
rhobeg=0.1, maxiter=1000, tol=1e-4,
))
elif method == 'cobyqa':
# just make this solver available for now
pass
elif method == 'trust-constr':
opt_defaults.update(dict(
maxiter=200, gtol=1e-4, xtol=1e-4, barrier_tol=1e-4
))
elif method == 'nelder-mead':
opt_defaults.update(dict(
maxfev=1000, xatol=1e-4, fatol=1e-4
))
else:
raise ValueError(f'Unhandled minimizer "{method}" / FTYPE={FTYPE}')
opt_defaults.update(new_minimizer_settings['options']['value'])
new_minimizer_settings['options']['value'] = opt_defaults
# Populate the descriptions with something
for opt_name in new_minimizer_settings['options']['value']:
if opt_name not in new_minimizer_settings['options']['desc']:
new_minimizer_settings['options']['desc'] = 'no desc'
return new_minimizer_settings
[docs]
def validate_minimizer_settings(minimizer_settings):
"""Validate minimizer settings.
Supported minimizers are the same as in `set_minimizer_defaults`.
See source for specific thresholds set.
Parameters
----------
minimizer_settings : dict
Raises
------
ValueError
If any minimizer settings are deemed to be invalid.
"""
ftype_eps = np.finfo(FTYPE).eps
method = minimizer_settings['method']['value'].lower()
options = minimizer_settings['options']['value']
if method == 'l-bfgs-b':
must_have = ('maxcor', 'ftol', 'gtol', 'eps', 'maxfun', 'maxiter',
'maxls')
may_have = must_have + ('args', 'jac', 'bounds', 'disp', 'iprint',
'callback')
elif method == 'slsqp':
must_have = ('maxiter', 'ftol', 'eps')
may_have = must_have + ('args', 'jac', 'bounds', 'constraints',
'iprint', 'disp', 'callback')
elif method == 'cobyla':
must_have = ('maxiter', 'rhobeg', 'tol')
may_have = must_have + ('disp', 'catol', 'constraints')
elif method == 'cobyqa':
assert parse_version(scipy.__version__) >= parse_version('1.14.0')
must_have = ()
may_have = must_have + ('disp', 'maxiter', 'maxfev', 'f_target',
'feasibility_tol', 'initial_tr_radius',
'final_tr_radius', 'scale', 'constraints')
elif method == 'trust-constr':
must_have = ('maxiter', 'gtol', 'xtol', 'barrier_tol')
may_have = must_have + ('sparse_jacobian', 'initial_tr_radius',
'initial_constr_penalty', 'constraints',
'initial_barrier_parameter',
'initial_barrier_tolerance',
'factorization_method',
'finite_diff_rel_step',
'verbose', 'disp')
elif method == 'nelder-mead':
must_have = ('maxfev', 'xatol', 'fatol')
may_have = must_have + ('disp', 'maxiter', 'return_all',
'initial_simplex', 'adaptive',
'bounds')
else:
raise ValueError(f'Unhandled minimizer "{method}" / FTYPE={FTYPE}')
missing = set(must_have).difference(set(options))
excess = set(options).difference(set(may_have))
if missing:
raise ValueError('Missing the following options for %s minimizer: %s'
% (method, missing))
if excess:
raise ValueError('Excess options for %s minimizer: %s'
% (method, excess))
eps_msg = '%s minimizer option %s(=%e) is < %d * %s_EPS(=%e)'
eps_gt_msg = '%s minimizer option %s(=%e) is > %e'
fp64_eps = np.finfo(np.float64).eps
if method == 'l-bfgs-b':
err_lim, warn_lim = 2, 10
for s in ['ftol', 'gtol']:
val = options[s]
if val < err_lim * ftype_eps:
raise ValueError(eps_msg % (method, s, val, err_lim, 'FTYPE',
ftype_eps))
if val < warn_lim * ftype_eps:
logging.warning(eps_msg, method, s, val, warn_lim, 'FTYPE', ftype_eps)
val = options['eps']
err_lim, warn_lim = 1, 10
if val < err_lim * fp64_eps:
raise ValueError(eps_msg % (method, 'eps', val, err_lim, 'FP64',
fp64_eps))
if val < warn_lim * ftype_eps:
logging.warning(eps_msg, method, 'eps', val, warn_lim, 'FTYPE', ftype_eps)
err_lim, warn_lim = 0.25, 0.1
if val > err_lim:
raise ValueError(eps_gt_msg % (method, 'eps', val, err_lim))
if val > warn_lim:
logging.warning(eps_gt_msg, method, 'eps', val, warn_lim)
elif method == 'slsqp':
err_lim, warn_lim = 2, 10
val = options['ftol']
if val < err_lim * ftype_eps:
raise ValueError(eps_msg % (method, 'ftol', val, err_lim, 'FTYPE',
ftype_eps))
if val < warn_lim * ftype_eps:
logging.warning(eps_msg, method, 'ftol', val, warn_lim, 'FTYPE', ftype_eps)
val = options['eps']
err_lim, warn_lim = 1, 10
if val < err_lim * fp64_eps:
raise ValueError(eps_msg % (method, 'eps', val, 1, 'FP64',
fp64_eps))
if val < warn_lim * ftype_eps:
logging.warning(eps_msg, method, 'eps', val, warn_lim, 'FP64', fp64_eps)
err_lim, warn_lim = 0.25, 0.1
if val > err_lim:
raise ValueError(eps_gt_msg % (method, 'eps', val, err_lim))
if val > warn_lim:
logging.warning(eps_gt_msg, method, 'eps', val, warn_lim)
elif method == 'cobyla':
if options['rhobeg'] > 0.5:
raise ValueError('starting step-size > 0.5 will overstep boundary')
if options['rhobeg'] < 1e-2:
logging.warning('starting step-size is very low, convergence will be slow')
def get_separate_octant_params(
hypo_maker, angle_name, inflection_point, tolerance=None
):
'''
This function creates versions of the angle param that are confined to
a single octant. It does this for both octant cases. This is used to allow
fits to be done where only one of the octants is allowed. The fit can then
be done for the two octant cases and compared to find the best fit.
Parameters
----------
hypo_maker : DistributionMaker or Detector
The hypothesis maker being used by the fitter
angle_name : string
Name of the angle for which to create separate octant params.
inflection_point : quantity
Point distinguishing between the two octants, e.g. 45 degrees
tolerance : quantity
If the starting value is closer to the inflection point than the value of the
tolerance, it is offset away from the inflection point by this amount.
Returns
-------
angle_orig : Param
angle param as it was before applying the octant separation
angle_case1 : Param
angle param confined to first octant
angle_case2 : Param
angle param confined to second octant
'''
# Reset angle before starting
angle = hypo_maker.params[angle_name]
angle.reset()
# Store the original theta23 param before we mess with it
# WARNING: Do not copy here, you want the original object (since this relates to the underlying
# ParamSelector from which theta23 is extracted). Otherwise end up with an incosistent state
# later (e.g. after a new call to ParamSelector.select_params, this copied, and potentially
# modified param will be overwtiten by the original).
angle_orig = angle
# Get the octant definition
octants = (
(angle.range[0], inflection_point) ,
(inflection_point, angle.range[1])
)
# If angle is maximal (e.g. the transition between octants) or very close
# to it, offset it slightly to be clearly in one octant (note that fit can
# still move the value back to maximal). The reason for this is that
# otherwise checks on the parameter bounds (which include a margin for
# minimizer tolerance) can an throw exception.
if tolerance is None:
tolerance = 0.1 * ureg.degree
dist_from_inflection = angle.value - inflection_point
if np.abs(dist_from_inflection) < tolerance :
sign = -1. if dist_from_inflection < 0. else +1. # Note this creates +ve shift also for theta == 45 (arbitary)
angle.value = inflection_point + (sign * tolerance)
# Store the cases
angle_case1 = deepcopy(angle)
angle_case2 = deepcopy(angle)
# Get case 1, e.g. the current octant
case1_octant_index = 0 if angle_case1.value < inflection_point else 1
angle_case1.range = octants[case1_octant_index]
angle_case1.nominal_value = angle_case1.value
# Also get case 2, e.g. the other octant
case2_octant_index = 0 if case1_octant_index == 1 else 1
angle_case2.value = 2*inflection_point - angle_case2.value
# Also setting nominal value so that `reset_free` won't try to set it out of bounds
angle_case2.nominal_value = angle_case2.value
angle_case2.range = octants[case2_octant_index]
return angle_orig, angle_case1, angle_case2
def update_param_values(
hypo_maker,
params,
update_nominal_values=False,
update_range=False,
update_is_fixed=False
):
"""
Update just the values of parameters of a DistributionMaker *without* replacing
the memory references inside.
This should be used in place of `hypo_maker.update_params(params)` unless one
explicitly wants to replace the memory references to which the parameters in
the DistributionMaker are pointing.
"""
# it is possible that only a single param is given
if isinstance(params, Param):
params = [params]
if isinstance(hypo_maker, Pipeline):
hypo_maker = [hypo_maker]
for p in params:
for pipeline in hypo_maker:
if p.name not in pipeline.params.names: continue
# it is crucial that we update the range first because the value
# of the parameter in params might lie outside the range of those in
# hypo_maker.
if update_range:
pipeline.params[p.name].range = p.range
pipeline.params[p.name].value = p.value
if update_nominal_values:
pipeline.params[p.name].nominal_value = p.nominal_value
if update_is_fixed:
pipeline.params[p.name].is_fixed = p.is_fixed
def update_param_values_detector(
hypo_maker,
params,
update_nominal_values=False,
update_range=False,
update_is_fixed=False
):
"""
Modification of the update_param_values function to use with the Detectors class.
"""
assert hypo_maker.__class__.__name__ == "Detectors", "hypo_maker is not Detectors class"
if isinstance(params, Param): params = ParamSet(params)
for distribution_maker in hypo_maker:
ps = deepcopy(params)
for p in ps.names:
if distribution_maker.detector_name in p:
p_name = p.replace('_'+distribution_maker.detector_name, "")
if p_name in ps.names:
ps.remove(p_name)
ps[p].name = p_name
update_param_values(distribution_maker, ps,
update_nominal_values, update_range, update_is_fixed)
hypo_maker.init_params()
# TODO: move this to a central location prob. in utils
[docs]
class Counter():
"""Simple counter object for use as a minimizer callback."""
def __init__(self, i=0):
self._count = i
def __str__(self):
return str(self._count)
def __repr__(self):
return str(self)
def __iadd__(self, inc):
self._count += inc
[docs]
def reset(self):
"""Reset counter"""
self._count = 0
@property
def count(self):
"""int : Current count"""
return self._count
class BoundedRandomDisplacement():
"""
Add a bounded random displacement of maximum size `stepsize` to each coordinate
Calling this updates `x` in-place.
Parameters
----------
stepsize : float, optional
Maximum stepsize in any dimension
bounds : pair of float or sequence of pairs of float
Bounds on x
random_gen : {None, `np.random.RandomState`, `np.random.Generator`}
The random number generator that generates the displacements
"""
def __init__(self, stepsize=0.5, bounds=(0, 1), random_gen=None):
self.stepsize = stepsize
self.random_gen = check_random_state(random_gen)
self.bounds = np.array(bounds).T
def __call__(self, x):
x += self.random_gen.uniform(-self.stepsize, self.stepsize,
np.shape(x))
x = np.clip(x, *self.bounds) # bounds are automatically broadcast
return x
class HypoFitResult():
"""Holds all relevant information about a fit result."""
_state_attrs = ["metric", "metric_val", "params", "param_selections",
"hypo_asimov_dist", "detailed_metric_info", "minimizer_time",
"num_distributions_generated", "minimizer_metadata", "fit_history"]
# TODO: initialize from serialized state
def __init__(
self,
metric=None,
metric_val=None,
data_dist=None,
hypo_maker=None,
minimizer_time=None,
num_distributions_generated=None,
minimizer_metadata=None,
fit_history=None,
other_metrics=None,
counter=None,
include_detailed_metric_info=False,
):
self.metric = metric
self.metric_val = metric_val
# deepcopy done in setter function
self.params = None
self.hypo_asimov_dist = None
if hypo_maker is not None:
self.params = hypo_maker.params
self.param_selections = hypo_maker.param_selections
# Record the distribution with the optimal param values
self.hypo_asimov_dist = hypo_maker.get_outputs(return_sum=True)
self.detailed_metric_info = None
if minimizer_time is not None:
self.minimizer_time = minimizer_time * ureg.sec
self.num_distributions_generated = num_distributions_generated
self.minimizer_metadata = minimizer_metadata
self.fit_history = fit_history
if include_detailed_metric_info:
msg = "missing input to calculate detailed metric info"
assert hypo_maker is not None, msg
assert data_dist is not None, msg
assert metric is not None, msg
# this passes through the setter method, but it should just pass through
# without actually doing anything
if hypo_maker.__class__.__name__ == "Detectors":
self.detailed_metric_info = [self.get_detailed_metric_info(
data_dist=data_dist[i], hypo_asimov_dist=self.hypo_asimov_dist[i],
params=hypo_maker.distribution_makers[i].params, metric=metric[i],
other_metrics=other_metrics, detector_name=hypo_maker.det_names[i], hypo_maker=hypo_maker,
) for i in range(len(data_dist))]
elif isinstance(data_dist, list): # DistributionMaker object with variable binning
self.detailed_metric_info = [self.get_detailed_metric_info(
data_dist=data_dist[i], hypo_asimov_dist=self.hypo_asimov_dist[i],
params=hypo_maker.params, metric=metric[0],
other_metrics=other_metrics, detector_name=hypo_maker.detector_name, hypo_maker=hypo_maker,
) for i in range(len(data_dist))]
else: # DistributionMaker object with regular binning
if metric[0] == 'generalized_poisson_llh':
raise NotImplementedError(
"generalized_poisson_llh isn't correctly implemented any longer!"
)
# FIXME: these `output_mode` and `force_standard_output` kwargs were removed in
# https://github.com/icecube/pisa/commit/7a4e875aa7bdc52ea64a5270e9808d866d1395f3
generalized_poisson_dist = hypo_maker.get_outputs(return_sum=False, force_standard_output=False)
generalized_poisson_dist = merge_mapsets_together(mapset_list=generalized_poisson_dist)
else:
generalized_poisson_dist = None
self.detailed_metric_info = self.get_detailed_metric_info(
data_dist=data_dist, hypo_asimov_dist=self.hypo_asimov_dist, generalized_poisson_hypo=generalized_poisson_dist,
params=hypo_maker.params, metric=metric[0], other_metrics=other_metrics,
detector_name=hypo_maker.detector_name, hypo_maker=hypo_maker,
)
def __getitem__(self, i):
if i in self._state_attrs:
return getattr(self, i)
else:
raise ValueError(f"Unknown property {i}")
def _rehash(self):
self._param_hash = self._params.hash
@property
def params(self):
if self._params is None:
return None
# Safety feature: Because we pass this object as a record of the best fit
# through several function, we need to make sure the parameters are not
# corrupted on the way.
if self._params.hash != self._param_hash:
raise RuntimeError("The parameter hash doesn't match, parameters might have"
" been changed accidentally. This can happen if the parameters from"
" this object have been used to update the params inside a"
" DistributionMaker. Do not access private _params unless you are "
" certain that you want to change the parameters and then _rehash.")
# We MUST ensure that we don't hand out references to the internal params here
# because they could otherwise be manipulated inadvertently.
return deepcopy(self._params)
@params.setter
def params(self, newpars):
if newpars is None:
self._params = None
self._param_hash = None
return
elif isinstance(newpars, list):
# Comparing to `list`, not `Sequence`, because if `newpars` are a `ParamSet`
# the test for membership of `Sequence` would return `True`.
self._params = ParamSet(newpars)
else:
# The constructor of ParamSet is *not* a copy-constructor! The parameters
# making up the ParamSet are instead taken over by reference only. This is
# why we must use `deepcopy` here and can't just use ParamSet(newpars) for
# everything.
self._params = deepcopy(newpars)
self._rehash()
@property
def detailed_metric_info(self):
return self._detailed_metric_info
@detailed_metric_info.setter
def detailed_metric_info(self, new_info):
if new_info is None:
self._detailed_metric_info = None
elif isinstance(new_info, list):
self._detailed_metric_info = [
self.deserialize_detailed_metric_info(i) for i in new_info
]
else:
self._detailed_metric_info = self.deserialize_detailed_metric_info(new_info)
@property
def hypo_asimov_dist(self):
return self._hypo_asimov_dist
@hypo_asimov_dist.setter
def hypo_asimov_dist(self, new_had):
if isinstance(new_had, MapSet) or new_had is None:
self._hypo_asimov_dist = new_had
elif isinstance(new_had, Mapping):
# instantiating from serializable state
self._hypo_asimov_dist = MapSet(**new_had)
elif isinstance(new_had, list) and all(isinstance(item, MapSet) for item in new_had):
# for detector class output
self._hypo_asimov_dist = new_had
else:
raise ValueError("invalid format for hypo_asimov_dist")
@property
def state(self):
state = OrderedDict()
for attr in self._state_attrs:
val = getattr(self, attr)
if hasattr(val, 'state'):
val = val.state
setitem(state, attr, val)
return state
@property
def serializable_state(self):
return self.state
@classmethod
def from_state(cls, state):
assert set(state.keys()) == set(cls._state_attrs), "ill-formed state dict"
new_obj = cls()
for attr in cls._state_attrs:
setattr(new_obj, attr, state[attr])
return new_obj
@staticmethod
def get_detailed_metric_info(data_dist, hypo_maker, hypo_asimov_dist, params, metric,
generalized_poisson_hypo=None, other_metrics=None, detector_name=None):
"""Get detailed fit information, including e.g. maps that yielded the
metric.
Parameters
----------
data_dist
hypo_asimov_dist
params
metric
other_metrics
Returns
-------
detailed_metric_info : OrderedDict
"""
if other_metrics is None:
other_metrics = []
elif isinstance(other_metrics, str):
other_metrics = [other_metrics]
all_metrics = sorted(set([metric] + other_metrics))
detailed_metric_info = OrderedDict()
if detector_name is not None:
detailed_metric_info['detector_name'] = detector_name
for m in all_metrics:
name_vals_d = OrderedDict()
# if the metric is not generalized poisson, but the distribution is a dict,
# retrieve the 'weights' mapset from the distribution output
if m == 'generalized_poisson_llh':
name_vals_d['maps'] = data_dist.maps[0].generalized_poisson_llh(expected_values=generalized_poisson_hypo)
llh_binned = data_dist.maps[0].generalized_poisson_llh(expected_values=generalized_poisson_hypo, binned=True)
map_binned = Map(name=metric,
hist=np.reshape(llh_binned, data_dist.maps[0].shape),
binning=data_dist.maps[0].binning
)
name_vals_d['maps_binned'] = MapSet(map_binned)
name_vals_d['priors'] = params.priors_penalties(metric=metric)
detailed_metric_info[m] = name_vals_d
else:
# TODO: remove this case?
if isinstance(hypo_asimov_dist, OrderedDict):
hypo_asimov_dist = hypo_asimov_dist['weights']
if m == 'weighted_chi2':
actual_values = data_dist.hist['total']
expected_values = hypo_asimov_dist.hist['total']
d = {'output_binning': hypo_maker.pipelines[0].output_binning,
'output_key': 'bin_unc2'}
bin_unc2 = hypo_maker.get_outputs(return_sum=True, **d).hist['total']
metric_hists = weighted_chi2(actual_values, expected_values, bin_unc2)
name_vals_d['maps'] = OrderedDict(total=np.sum(metric_hists))
metric_hists = OrderedDict(total=metric_hists)
else:
name_vals_d['maps'] = data_dist.metric_per_map(
expected_values=hypo_asimov_dist, metric=m
)
metric_hists = data_dist.metric_per_map(
expected_values=hypo_asimov_dist, metric='binned_'+m
)
maps_binned = []
for asimov_map, metric_hist in zip(hypo_asimov_dist, metric_hists):
map_binned = Map(
name=asimov_map.name,
hist=np.reshape(metric_hists[metric_hist],
asimov_map.shape),
binning=asimov_map.binning
)
maps_binned.append(map_binned)
name_vals_d['maps_binned'] = MapSet(maps_binned)
name_vals_d['priors'] = params.priors_penalties(metric=metric)
detailed_metric_info[m] = name_vals_d
return detailed_metric_info
@staticmethod
def deserialize_detailed_metric_info(info_dict):
"""Re-instantiate all PISA objects that used to be in the dictionary."""
detailed_metric_info = OrderedDict()
if "detector_name" in info_dict.keys():
detailed_metric_info['detector_name'] = info_dict["detector_name"]
all_metrics = sorted(set(info_dict.keys()) - {"detector_name"})
for m in all_metrics:
name_vals_d = OrderedDict()
name_vals_d['maps'] = info_dict[m]["maps"]
if isinstance(info_dict[m]["maps_binned"], MapSet):
# If this has already been deserialized or never serialized in the
# first place, just pass through.
name_vals_d["maps_binned"] = info_dict[m]["maps_binned"]
else:
# Deserialize if necessary
name_vals_d['maps_binned'] = MapSet(**info_dict[m]["maps_binned"])
name_vals_d['priors'] = info_dict[m]["priors"]
detailed_metric_info[m] = name_vals_d
return detailed_metric_info
[docs]
class BasicAnalysis():
"""A bare-bones analysis that only fits a hypothesis to data.
Full analyses with functionality beyond just fitting (doing scans, for example)
should sub-class this class.
Every fit is run with the `fit_recursively` method, where the fit strategy is
defined by the three arguments `method`, `method_kwargs` and
`local_fit_kwargs` (see documentation of :py:meth:`fit_recursively` below for
other arguments.) The `method` argument determines which sub-routine should be
run, `method_kwargs` is a dictionary with any keyword arguments of that
sub-routine, and `local_fit_kwargs` is a dictionary (or list thereof) defining any
nested sub-routines that are run within the outer sub-routine. A sub-sub-routine
defined in `local_fit_kwargs` should again be a dictionary with the three keywords
`method`, `method_kwargs` and `local_fit_kwargs`. In this way, sub-routines
can be arbitrarily stacked to define complex fit strategies.
Attributes
----------
pprint : bool, default: True
Whether to show live updates of minimizer progress (overridden by
`blindness`).
blindness : bool or int, default: False
Whether to carry out a blind analysis. If True or 1, this hides actual
parameter values from display and disallows these (as well as Jacobian,
Hessian, etc.) from ending up in logfiles. If given an integer > 1, the
fitted parameters are also prevented from being stored in fit info
dictionaries.
Examples
--------
A canonical standard oscillation fit fits octants in `theta23` separately and then
runs a scipy minimizer to optimize locally in each octant. The arguments that would
produce that result when passed to `fit_recursively` are:
::
method = "octants"
method_kwargs = {
"angle": "theta23"
"inflection_point": 45 * ureg.deg
}
local_fit_kwargs = {
"method": "scipy",
"method_kwargs": minimizer_settings,
"local_fit_kwargs": None
}
Let's say we also have a CP violating phase `deltacp24` that we want to fit
separately per quadrant split at 90 degrees. We want this done within each
quadrant fit for `theta23`, making 4 fits in total. Then we would nest the
quadrant fit for `deltacp24` inside the octant fit like so:
::
method = "octants"
method_kwargs = {
"angle": "theta23"
"inflection_point": 45 * ureg.deg
}
local_fit_kwargs = {
"method": "octants",
"method_kwargs": {
"angle": "deltacp24",
"inflection_point": 90 * ureg.deg,
}
"local_fit_kwargs": {
"method": "scipy",
"method_kwargs": minimizer_settings,
"local_fit_kwargs": None
}
}
Let's suppose we want to apply a grid-scan global fit method to sterile mixing
parameters `theta24` and `deltam41`, but we want to marginalize over all other
parameters with a usual 3-flavor fit configuration. That could be achieved as
follows:
::
method = "grid_scan"
method_kwargs = {
"grid": {
"theta24": np.geomspace(1, 20, 3) * ureg.deg,
"deltam41": np.geomspace(0.01, 0.5, 4) * ureg["eV^2"],
},
"fix_grid_params": False,
}
local_fit_kwargs = {
"method": "octants",
"method_kwargs": {
"angle": "theta23",
"inflection_point": 45 * ureg.deg,
}
"local_fit_kwargs": {
"method": "scipy",
"method_kwargs": minimizer_settings,
"local_fit_kwargs": None
}
}
Instead of `scipy`, we can also use `iminuit` and `nlopt` for local minimization or
global searches by writing a dictionary with ``"method": "iminuit"`` or ``"method":
"nlopt"``, respectively.
**NLOPT Options**
NLOPT can be dropped in place of `scipy` and `iminuit` by writing a dictionary with
``"method": "nlopt"`` and choosing the algorithm by its name of the form
``NLOPT_{G,L}{N}_XXXX``. PISA supports all of the derivative-free global
(https://nlopt.readthedocs.io/en/latest/NLopt_Algorithms/#global-optimization) and
local
(https://nlopt.readthedocs.io/en/latest/NLopt_Algorithms/#local-derivative-free-optimization)
algorithms. Algorithms requiring gradients such as BFGS are not supported. To use
the Nelder-Mead algorithm, for example, the following settings could be used:
::
nlopt_settings = {
"method": "nlopt",
"method_kwargs": {
"algorithm": "NLOPT_LN_NELDERMEAD",
"ftol_abs": 1e-5,
"ftol_rel": 1e-5,
# other options that can be set here:
# xtol_abs, xtol_rel, stopval, maxeval, maxtime
# after maxtime seconds, stop and return best result so far
"maxtime": 60
},
"local_fit_kwargs": None # no further nesting available
}
and then run the fit with
::
best_fit_info = ana.fit_recursively(
data_dist,
dm,
"chi2",
None,
**nlopt_settings
)
. Of course, you can also nest the `nlopt_settings` dictionary in any of the
`octants`, `ranges` and so on by passing it as `local_fit_kwargs`.
*Adding constraints*
Adding inequality constraints to algorithms that support it is possible by writing a
lambda function in a string that expects to get the current parameters as a
`ParamSet` and returns a float. The result will satisfy that the passed function
stays `negative` (to be consistent with scipy). The string will be passed to
`eval` to build the callable function. For example, a silly way to bound
`delta_index` > 0.1 would be:
::
"method_kwargs": {
"algorithm": "NLOPT_LN_COBYLA",
"ftol_abs": 1e-5,
"ftol_rel": 1e-5,
"maxtime": 30,
"ineq_constraints": [
# be sure to convert parameters to their magnitude
"lambda params: params.delta_index.m - 0.1"
]
}
Adding inequality constraints to algorithms that don't support it can be done by
either nesting the local fit in the `constrained_fit` method or to use NLOPT's
AUGLAG method that adds a penalty for constraint violations internally. For example,
we could do this to fulfill the same constraint with the PRAXIS algorithm:
::
"method_kwargs": {
"algorithm": "NLOPT_AUGLAG",
"ineq_constraints":[
"lambda params: params.delta_index.m - 0.1"
],
"local_optimizer": {
# supports all the same options as above
"algorithm": "NLOPT_LN_PRAXIS",
"ftol_abs": 1e-5,
"ftol_rel": 1e-5,
}
}
*Using global searches with local subsidiary minimizers*
Some global searches, like evolutionary strategies, use local subsidiary minimizers.
These can be defined just as above by passing a dictionary with the settings to the
`local_optimizer` keyword. Note that, again, only gradient-free methods are
supported. Here is an example for the "Multi-Level single linkage" (MLSL) algorithm,
using PRAXIS as the local optimizer:
::
"method_kwargs": {
"algorithm": "NLOPT_G_MLSL_LDS",
"local_optimizer": {
"algorithm": "NLOPT_LN_PRAXIS",
"ftol_abs": 1e-5,
"ftol_rel": 1e-5,
}
}
For some evolutionary strategies such as ISRES, the `population` option can also
be set.
::
"method_kwargs": {
"algorithm": "NLOPT_GN_ISRES",
"population": 100,
}
**Custom fitting methods**
Custom fitting methods are added by subclassing the analysis. The fit function
name has to follow the scheme `_fit_{method}` where `method` is the name of the
fit method. For instance, the function for `scipy` is called `_fit_scipy` and can
be called by setting `"method": "scipy"` in the fit strategy dict.
The function has to accept the parameters `data_dist`, `hypo_maker`, `metric`,
`external_priors_penalty`, `method_kwargs`, and `local_fit_kwargs`. See docstring
of `fit_recursively` for descriptions of these arguments. The return value
of the function must be a `HypoFitResult` object. As an example, the following
sub-class of the BasicAnalysis has a custom fit method that, nonsensically,
always sets 42 degrees as the starting value for theta23:
::
class SubclassedAnalysis(BasicAnalysis):
def _fit_nonsense(
self, data_dist, hypo_maker, metric,
external_priors_penalty, method_kwargs, local_fit_kwargs
):
logging.info("Starting nonsense fit (setting theta23 to 42 deg)...")
for pipeline in hypo_maker:
if "theta23" in pipeline.params.free.names:
pipeline.params.theta23.value = 42 * ureg["deg"]
best_fit_info = self.fit_recursively(
data_dist, hypo_maker, metric, external_priors_penalty,
local_fit_kwargs["method"], local_fit_kwargs["method_kwargs"],
local_fit_kwargs["local_fit_kwargs"]
)
return best_fit_info
Now, the `nonsense` fit method can be used and nested with any other fit method
like so:
::
ana = SubclassedAnalysis()
local_minuit = OrderedDict(
method="iminuit",
method_kwargs={
"tol": 10,
},
local_fit_kwargs=None
)
local_nonsense_minuit = OrderedDict(
method="nonsense",
method_kwargs=None,
local_fit_kwargs=local_minuit
)
fit_result = ana.fit_recursively(
data_dist,
distribution_maker,
"chi2",
None,
**local_nonsense_minuit
)
"""
def __init__(self):
self._nit = 0
self.pprint = True
self.blindness = False
# TODO: Defer sub-fits to cluster
[docs]
def fit_recursively(
self, data_dist, hypo_maker, metric, external_priors_penalty,
method, method_kwargs=None, local_fit_kwargs=None
):
"""Recursively apply global search strategies with local sub-fits.
Parameters
----------
data_dist : Sequence of MapSets or MapSet
Data distribution to be fit. Can be an actual-, Asimov-, or pseudo-data
distribution (where the latter two are derived from simulation and so aren't
technically "data").
hypo_maker : Detectors or DistributionMaker
Creates the per-bin expectation values per map based on its param values.
Free params in the `hypo_maker` are modified by the minimizer to achieve a
"best" fit.
metric : string or iterable of strings
Metric by which to evaluate the fit. See documentation of Map.
external_priors_penalty : func
User defined prior penalty function, which takes `hypo_maker` and
`metric` as arguments and returns numerical value of penalty to the metric
value. It is expected sign of the penalty is correctly specified inside the
`external_priors_penalty` (e.g. negative for llh or positive for chi2).
method : str
Name of the sub-routine to be run. Currently, the options are `scipy`,
`octants`, `best_of`, `grid_scan`, `constrained`,
`ranges`, `condition`, `iminuit`, and `nlopt`.
method_kwargs : dict
Any keyword arguments taken by the sub-routine. May be `None` if the
sub-routine takes no additional arguments.
local_fit_kwargs : dict or list thereof
A dictionary defining subsidiary sub-routines with the keywords `method`,
`method_kwargs` and `local_fit_kwargs`. May be `None` if the
sub-routine is itself a local or global fit that runs no further subsidiary
fits.
"""
# Grab the hypo map
hypo_asimov_dist = hypo_maker.get_outputs(return_sum=True)
if isinstance(metric, str):
metric = [metric]
# Check number of used metrics
if hypo_maker.__class__.__name__ == "Detectors":
if len(metric) == 1: # One metric for all detectors
metric = list(metric) * len(hypo_maker.distribution_makers)
elif len(metric) != len(hypo_maker.distribution_makers):
raise IndexError(f"Number of defined metrics does not match with number of detectors.")
elif isinstance(hypo_asimov_dist, MapSet) or isinstance(hypo_asimov_dist, list):
# DistributionMaker object (list means variable binning)
assert len(metric) == 1, f"Only one metric allowed for DistributionMaker"
else:
raise NotImplementedError(f"hypo_maker returned output of type {type(hypo_asimov_dist)}")
# Before starting any fit, check if we already have a perfect match
# between data and template. This can happen if using pseudodata that
# was generated with the nominal values for parameters (which will also
# be the initial values in the fit). If this is the case, don't bother
# to fit and return results right away. TODO: This speedup is currently
# only enabled for a DistributionMaker with regular binning.
if isinstance(data_dist, MapSet) and data_dist.allclose(hypo_asimov_dist):
msg = 'Initial hypo matches data, no need for fit'
logging.info(msg)
# Get the metric value at this initial point
# This is for returning as part of the "fit" results
initial_metric_val = (
data_dist.metric_total(expected_values=hypo_asimov_dist, metric=metric[0])
+ hypo_maker.params.priors_penalty(metric=metric[0])
)
# Return fit results, even though didn't technically fit
return HypoFitResult(
metric,
initial_metric_val,
data_dist,
hypo_maker,
minimizer_time=0.,
minimizer_metadata={"success":True, "nit":0, "message":msg}, # Add some metadata in the format returned by `scipy.optimize.minimize`
fit_history=None,
other_metrics=None,
num_distributions_generated=0,
include_detailed_metric_info=True,
)
if method in ["fit_octants", "fit_ranges"]:
method = method.split("_")[1]
logging.warning(f"fit method 'fit_{method}' has been re-named to '{method}'")
# If made it here, we have a fit to do...
fit_function = getattr(self, f"_fit_{method}")
# Run the fit function
return fit_function(data_dist, hypo_maker, metric, external_priors_penalty,
method_kwargs, local_fit_kwargs)
def _fit_octants(self, data_dist, hypo_maker, metric, external_priors_penalty,
method_kwargs, local_fit_kwargs):
"""
A simple global optimization scheme that searches mixing angle octants.
"""
angle_name = method_kwargs["angle"]
if angle_name not in hypo_maker.params.free.names:
logging.info(f"{angle_name} is not a free parameter, skipping octant check")
return self.fit_recursively(
data_dist, hypo_maker, metric, external_priors_penalty,
local_fit_kwargs["method"], local_fit_kwargs["method_kwargs"],
local_fit_kwargs["local_fit_kwargs"]
)
inflection_point = method_kwargs["inflection_point"]
logging.info(f"Entering octant fit for angle {angle_name} with inflection "
f"point at {inflection_point}")
#### Removed, fitting always separately.
# Is there a reason not to fit separately, ever?
# fit_octants_separately = True
# if "fit_octants_separately" in method_kwargs.keys():
# fit_octants_separately = method_kwargs["fit_octants_separately"]
reset_free = True
if "reset_free" in method_kwargs.keys():
reset_free = method_kwargs["reset_free"]
if not reset_free:
# store so we can reset to the values we currently have rather than
# resetting free parameters back to their nominal value after the octant
# check
minimizer_start_params = deepcopy(hypo_maker.params)
tolerance = method_kwargs["tolerance"] if "tolerance" in method_kwargs else None
# Get new angle parameters each limited to one octant
ang_orig, ang_case1, ang_case2 = get_separate_octant_params(
hypo_maker, angle_name, inflection_point, tolerance=tolerance
)
# Fit the first octant
# In this case it is OK to replace the memory reference, we will reinstate it
# later.
hypo_maker.update_params(ang_case1)
best_fit_info = self.fit_recursively(
data_dist, hypo_maker, metric, external_priors_penalty,
local_fit_kwargs["method"], local_fit_kwargs["method_kwargs"],
local_fit_kwargs["local_fit_kwargs"]
)
if not self.blindness:
logging.info(f"found best fit at angle {best_fit_info.params[angle_name].value}")
logging.info(f'checking other octant of {angle_name}')
if reset_free:
hypo_maker.reset_free()
else:
for param in minimizer_start_params:
hypo_maker.params[param.name].value = param.value
# Fit the second octant
hypo_maker.update_params(ang_case2)
new_fit_info = self.fit_recursively(
data_dist, hypo_maker, metric, external_priors_penalty,
local_fit_kwargs["method"], local_fit_kwargs["method_kwargs"],
local_fit_kwargs["local_fit_kwargs"]
)
if not self.blindness:
logging.info(f"found best fit at angle {new_fit_info.params[angle_name].value}")
# We must not forget to reset the range of the angle to its original value!
# Otherwise, the parameter returned by this function will have a different
# range, which can cause failures further down the line!
# This is one rare instance where we directly manipulate the parameters, so
# we re-hash.
best_fit_info._params[angle_name].range = deepcopy(ang_orig.range)
best_fit_info._rehash()
new_fit_info._params[angle_name].range = deepcopy(ang_orig.range)
new_fit_info._rehash()
# Take the one with the best fit
got_better = it_got_better(new_fit_info.metric_val, best_fit_info.metric_val, metric)
# TODO: Pass alternative fits up the chain
if got_better:
# alternate_fits.append(best_fit_info)
best_fit_info = new_fit_info
if not self.blindness:
logging.info('Accepting other-octant fit')
else:
# alternate_fits.append(new_fit_info)
if not self.blindness:
logging.info('Accepting initial-octant fit')
# Put the original angle parameter (as in, the actual object from memory)
# back into the hypo maker
hypo_maker.update_params(ang_orig)
# Copy the fitted parameter values from the best fit case into the hypo maker's
# parameter values.
# Also reinstate the original parameter range for the angle
if hypo_maker.__class__.__name__ == "Detectors":
update_param_values_detector(hypo_maker, best_fit_info.params.free, update_range=True)
else:
update_param_values(hypo_maker, best_fit_info.params.free, update_range=True)
return best_fit_info
def _fit_best_of(self, data_dist, hypo_maker, metric,
external_priors_penalty, method_kwargs, local_fit_kwargs):
"""Run several manually configured fits and take the best one.
The specialty here is that `local_fit_kwargs` is a list, where each element
defines one fit.
"""
logging.info(f"running several manually configured fits to choose optimum")
reset_free = True
if method_kwargs is not None and "reset_free" in method_kwargs.keys():
reset_free = method_kwargs["reset_free"]
all_fit_results = []
for i, fit_kwargs in enumerate(local_fit_kwargs):
if reset_free:
hypo_maker.reset_free()
logging.info(f"Beginning fit {i+1} / {len(local_fit_kwargs)}")
new_fit_info = self.fit_recursively(
data_dist, hypo_maker, metric, external_priors_penalty,
fit_kwargs["method"], fit_kwargs["method_kwargs"],
fit_kwargs["local_fit_kwargs"]
)
all_fit_results.append(new_fit_info)
all_fit_metric_vals = [fit_info.metric_val for fit_info in all_fit_results]
# Take the one with the best fit
if is_metric_to_maximize(metric):
best_idx = np.argmax(all_fit_metric_vals)
else:
best_idx = np.argmin(all_fit_metric_vals)
logging.info(f"Found best fit being index {best_idx} with metric "
f"{all_fit_metric_vals[best_idx]}")
return all_fit_results[best_idx]
def _fit_condition(self, data_dist, hypo_maker, metric,
external_priors_penalty, method_kwargs, local_fit_kwargs):
"""Run one fit strategy or the other depending on a condition being true.
As in the constrained fit, the condition can be a callable or a string that
can be evaluated to a callable via `eval()`.
`local_fit_kwargs` has to be a list of length 2. The first fit runs if the
condition is true, the second one runs if the condition is false.
"""
assert "condition_func" in method_kwargs.keys()
assert len(local_fit_kwargs) == 2, ("need to fit specs, first runs if True, "
"second runs if false")
if type(method_kwargs["condition_func"]) is str:
logging.warning(EVAL_MSG)
condition_func = eval(method_kwargs["condition_func"])
assert callable(condition_func), "evaluated object is not a valid function"
elif callable(method_kwargs["condition_func"]):
condition_func = method_kwargs["condition_func"]
else:
raise ValueError("Condition function is neither a callable nor a "
"string that can be evaluated to a callable.")
if condition_func(hypo_maker):
logging.info("condition was TRUE, running first fit")
fit_kwargs = local_fit_kwargs[0]
else:
logging.info("condition was FALSE, running second fit")
fit_kwargs = local_fit_kwargs[1]
return self.fit_recursively(
data_dist, hypo_maker, metric, external_priors_penalty,
fit_kwargs["method"], fit_kwargs["method_kwargs"],
fit_kwargs["local_fit_kwargs"]
)
def _fit_grid_scan(self, data_dist, hypo_maker, metric,
external_priors_penalty, method_kwargs, local_fit_kwargs):
"""
Do a grid scan over starting positions and choose the best fit from the grid.
Alternatively, the parameters used for the grid can be fixed in the fit at each
grid point, and only the very best fit is then freed up to be refined.
"""
assert "grid" in method_kwargs.keys()
reset_free = True
if "reset_free" in method_kwargs.keys():
reset_free = method_kwargs["reset_free"]
fix_grid_params = False
if "fix_grid_params" in method_kwargs.keys():
fix_grid_params = method_kwargs["fix_grid_params"]
grid_params = list(method_kwargs["grid"].keys())
logging.info(f"Starting grid scan over parameters {grid_params}")
grid_1d_arrs = []
grid_units = []
for p in grid_params:
d_spec = method_kwargs["grid"][p]
logging.info(f"{p}: {d_spec}")
grid_1d_arrs.append(d_spec.m)
grid_units.append(d_spec.u)
scan_mesh = np.meshgrid(*grid_1d_arrs)
scan_mesh = [m * u for m, u in zip(scan_mesh, grid_units)]
if not fix_grid_params:
logging.info("This grid only defines the starting value of each fit, "
"all parameters that are free will stay free.")
else:
logging.info("The grid parameters will be fixed at each grid point.")
do_refined_fit = False
if ("refined_fit" in method_kwargs.keys()
and method_kwargs["refined_fit"] is not None):
do_refined_fit = True
logging.info("The best fit on the grid will be refined using "
f"{method_kwargs['refined_fit']['method']}")
if reset_free:
hypo_maker.reset_free()
# when we return from the scan, we want to set all parameters free again that
# were free to begin with
originally_free = hypo_maker.params.free.names
all_fit_results = []
grid_shape = scan_mesh[0].shape
for grid_idx in np.ndindex(grid_shape):
point = {name: mesh[grid_idx] for name, mesh in zip(grid_params, scan_mesh)}
logging.info(f"working on grid point {point}")
if reset_free:
hypo_maker.reset_free()
for param, value in point.items():
mod_param = deepcopy(hypo_maker.params[param])
mod_param.value = value
if fix_grid_params:
# it is possible to do a scan over fixed parameters as well as
# free ones; fixed ones always stay fixed, free ones are fixed
# if requested
mod_param.is_fixed = True
# It is important not to use hypo_maker.update_params(mod_param) here
# because we don't want to overwrite the memory reference!
if hypo_maker.__class__.__name__ == "Detectors":
update_param_values_detector(hypo_maker, mod_param, update_is_fixed=True)
else:
update_param_values(hypo_maker, mod_param, update_is_fixed=True)
new_fit_info = self.fit_recursively(
data_dist, hypo_maker, metric, external_priors_penalty,
local_fit_kwargs["method"], local_fit_kwargs["method_kwargs"],
local_fit_kwargs["local_fit_kwargs"]
)
all_fit_results.append(new_fit_info)
for param in originally_free:
hypo_maker.params[param].is_fixed = False
all_fit_metric_vals = np.array([fit_info.metric_val for fit_info in all_fit_results])
all_fit_metric_vals = all_fit_metric_vals.reshape(grid_shape)
if not self.blindness:
logging.info(f"Grid scan metrics:\n{all_fit_metric_vals}")
# Take the one with the best fit
if is_metric_to_maximize(metric):
best_idx = np.argmax(all_fit_metric_vals)
best_idx_grid = np.unravel_index(best_idx, all_fit_metric_vals.shape)
else:
best_idx = np.argmin(all_fit_metric_vals)
best_idx_grid = np.unravel_index(best_idx, all_fit_metric_vals.shape)
logging.info(f"Found best fit being index {best_idx_grid} with metric "
f"{all_fit_metric_vals[best_idx_grid]}")
best_fit_result = all_fit_results[best_idx]
if do_refined_fit:
if hypo_maker.__class__.__name__ == "Detectors":
update_param_values_detector(hypo_maker, best_fit_result.params.free)
else:
update_param_values(hypo_maker, best_fit_result.params.free)
# the params stored in the best fit may come from a grid point where
# parameters were fixed, so we free them up again
for param in originally_free:
hypo_maker.params[param].is_fixed = False
logging.info("Refining best fit result...")
# definitely don't want to reset the parameters here, that would defeate
# the entire purpose...
method_kwargs["refined_fit"]["reset_free"] = False
best_fit_result = self.fit_recursively(
data_dist, hypo_maker, metric, external_priors_penalty,
method_kwargs["refined_fit"]["method"],
method_kwargs["refined_fit"]["method_kwargs"],
method_kwargs["refined_fit"]["local_fit_kwargs"]
)
return best_fit_result
def _fit_constrained(self, data_dist, hypo_maker, metric,
external_priors_penalty, method_kwargs, local_fit_kwargs):
"""Run a fit subject to an arbitrary inequality constraint.
The constraint is given as a function that must stay positive. The value of
this function is scaled by a pre-factor and applied as a penalty to the test
statistic, where the initial scaling factor is not too large to avoid
minimizer problems. Should the fit converge to a point violating the
constraint, the penalty scale is doubled.
The constraining function should calculate the distance of the constraint
over-stepping in *rescaled* parameter space to make the over-all scale
uniform.
"""
assert "ineq_func" in method_kwargs.keys()
# If certain parameters aren't free, it will be impossible to satisfy the
# constraint and we would end up in an infinite loop! If we detect that
# these parameters aren't free, we just pass through the inner fit without
# adding a constraining penalty.
assert "necessary_free_params" in method_kwargs.keys()
if not set(method_kwargs["necessary_free_params"]).issubset(
set(hypo_maker.params.free.names)):
logging.info("Necessary parameters to satisfy the constraints aren't "
"free, running inner fit without constraint...")
return self.fit_recursively(
data_dist, hypo_maker, metric, external_priors_penalty,
local_fit_kwargs["method"], local_fit_kwargs["method_kwargs"],
local_fit_kwargs["local_fit_kwargs"]
)
if "starting_values" in method_kwargs.keys():
assert set(
method_kwargs["starting_values"].keys()
).issubset(set(method_kwargs["necessary_free_params"]))
logging.info("entering constrained fit...")
if type(method_kwargs["ineq_func"]) is str:
logging.warning(EVAL_MSG)
ineq_func = eval(method_kwargs["ineq_func"])
assert callable(ineq_func), "evaluated object is not a valid function"
elif callable(method_kwargs["ineq_func"]):
ineq_func = method_kwargs["ineq_func"]
else:
raise ValueError("Inequality function is neither a callable nor a "
"string that can be evaluated to a callable.")
def constraint_func(params):
value = ineq_func(params)
# inequality function must stay positive, so there is no penalty if
# it is positive, but otherwise we want to return a *positive* penalty
return 0. if value > 0. else -value
penalty = 1000.
if "minimum_penalty" in method_kwargs.keys():
penalty = method_kwargs["minimum_penalty"]
tol = 1e-4
if "constraint_tol" in method_kwargs.keys():
tol = method_kwargs["constraint_tol"]
penalty_sign = -1 if is_metric_to_maximize(metric) else 1
# It would be very inefficient to reset all free values each time when
# the penalty is doubled. However, we might still want to reset just once
# at the beginning of the constrained fit. We could still, if we wanted
# to, reset in the inner loop via the local_fit_kwargs.
reset_free = False
if "reset_free" in method_kwargs.keys():
reset_free = method_kwargs["reset_free"]
if reset_free:
hypo_maker.reset_free()
if external_priors_penalty is None:
penalty_func = lambda hypo_maker, metric: (
penalty_sign * penalty * constraint_func(params=hypo_maker.params)
)
else:
penalty_func = lambda hypo_maker, metric: (
penalty_sign * penalty * constraint_func(params=hypo_maker.params)
+ external_priors_penalty(hypo_maker=hypo_maker, metric=metric)
)
# emulating do-while loop
while True:
if "starting_values" in method_kwargs.keys():
for param, value in method_kwargs["starting_values"].items():
for pipeline in hypo_maker.pipelines:
if param in pipeline.params.names:
pipeline.params[param].value = value
fit_result = self.fit_recursively(
data_dist, hypo_maker, metric, penalty_func,
local_fit_kwargs["method"], local_fit_kwargs["method_kwargs"],
local_fit_kwargs["local_fit_kwargs"]
)
penalty *= 2
if constraint_func(fit_result.params) <= tol:
break
elif not self.blindness:
logging.info("Fit result violates constraint condition, re-running "
f"with new penalty multiplier: {penalty}")
return fit_result
def _fit_ranges(self, data_dist, hypo_maker, metric,
external_priors_penalty, method_kwargs, local_fit_kwargs):
"""Fit given ranges of a parameter separately."""
assert "param_name" in method_kwargs.keys()
assert "ranges" in method_kwargs.keys()
if not method_kwargs["param_name"] in hypo_maker.params.free.names:
logging.info(f"parameter {method_kwargs['param_name']} not free, "
"skipping fit over ranges...")
return self.fit_recursively(
data_dist, hypo_maker, metric, external_priors_penalty,
local_fit_kwargs["method"], local_fit_kwargs["method_kwargs"],
local_fit_kwargs["local_fit_kwargs"]
)
logging.info(f"entering fit over separate ranges in {method_kwargs['param_name']}")
reset_free = False
if "reset_free" in method_kwargs.keys():
reset_free = method_kwargs["reset_free"]
# Store a copy of the original parameter such that we can reset the ranges
# and nominal values after the fit is done.
original_param = deepcopy(hypo_maker.params[method_kwargs["param_name"]])
if not self.blindness:
logging.info(f"original parameter:\n{original_param}")
# this is the param we play around with (NOT same object in memory)
mod_param = deepcopy(original_param)
# The way this works is that we change the range and the set the rescaled
# value of the parameter to the same number it originally had. This means
# that, if the parameter was originally set at the lower end of the original
# range, it will now always start at the lower end of each interval to be
# fit separately. If it was in the middle, it will start in the middle of
# each interval.
original_rescaled_value = original_param._rescaled_value
all_fit_results = []
for i, interval in enumerate(method_kwargs["ranges"]):
mod_param.range = interval
mod_param._rescaled_value = original_rescaled_value
# to make sure that a `reset_free` command will not try to reset the
# parameter to a place outside of the modified range we also set the
# nominal value
mod_param.nominal_value = mod_param.value
logging.info(f"now fitting on interval {i+1}/{len(method_kwargs['ranges'])}")
if not self.blindness:
logging.info(f"parameter with modified range:\n{mod_param}")
# use update_param_values instead of hypo_maker.update_params so that we
# don't overwrite the internal memory reference
if hypo_maker.__class__.__name__ == "Detectors":
update_param_values_detector(
hypo_maker, mod_param, update_range=True, update_nominal_values=True
)
else:
update_param_values(
hypo_maker, mod_param, update_range=True, update_nominal_values=True
)
fit_result = self.fit_recursively(
data_dist, hypo_maker, metric, external_priors_penalty,
local_fit_kwargs["method"], local_fit_kwargs["method_kwargs"],
local_fit_kwargs["local_fit_kwargs"]
)
all_fit_results.append(fit_result)
all_fit_metric_vals = [fit_info.metric_val for fit_info in all_fit_results]
# Take the one with the best fit
if is_metric_to_maximize(metric):
best_idx = np.argmax(all_fit_metric_vals)
else:
best_idx = np.argmin(all_fit_metric_vals)
if not self.blindness:
logging.info(f"Found best fit being in interval {best_idx+1} with metric "
f"{all_fit_metric_vals[best_idx]}")
best_fit_result = all_fit_results[best_idx]
# resetting the range of the parameter we played with
# This is one rare instance where we manipulate the parameters of a fit result.
best_fit_result._params[original_param.name].range = original_param.range
best_fit_result._params[original_param.name].nominal_value = original_param.nominal_value
best_fit_result._rehash()
# set the values of all parameters in the hypo_maker to the best fit values
# without overwriting the memory reference.
# Also reset ranges and nominal values that we might have changed above!
if hypo_maker.__class__.__name__ == "Detectors":
update_param_values_detector(
hypo_maker, best_fit_result.params.free,
update_range=True, update_nominal_values=True
)
else:
update_param_values(
hypo_maker, best_fit_result.params.free,
update_range=True, update_nominal_values=True
)
return best_fit_result
def _fit_staged(self, data_dist, hypo_maker, metric,
external_priors_penalty, method_kwargs, local_fit_kwargs):
"""Run a staged fit of one or more sub-fits where later fits start where the
earlier fits finished.
The subsidiary fits are passed as a list of dicts to `local_fit_kwargs` and
are worked on in order of the list. Internally, the `nominal_values` of the
parameters are set to the best fit values of the previous fit, such that
calls to `reset_free` do not destroy the progress of previous stages.
"""
assert local_fit_kwargs is not None
assert isinstance(local_fit_kwargs, list) and len(local_fit_kwargs) > 1
logging.info("Starting staged fit...")
best_fit_params = None
best_fit_info = None
# storing original nominal values
original_nominal_values = dict(
[(p.name, p.nominal_value) for p in hypo_maker.params.free]
)
for i, fit_kwargs in enumerate(local_fit_kwargs):
logging.info(f"Beginning fit {i+1} / {len(local_fit_kwargs)}")
if best_fit_params is not None:
if hypo_maker.__class__.__name__ == "Detectors":
update_param_values_detector(
hypo_maker, best_fit_params.free, update_nominal_values=True
)
else:
update_param_values(
hypo_maker, best_fit_params.free, update_nominal_values=True
)
best_fit_info = self.fit_recursively(
data_dist, hypo_maker, metric, external_priors_penalty,
fit_kwargs["method"], fit_kwargs["method_kwargs"],
fit_kwargs["local_fit_kwargs"]
)
best_fit_params = best_fit_info.params # makes a deepcopy anyway
# We set the nominal values to the best fit values, so that a `reset_free`
# call does not destroy the progress of the previous fit.
for p in best_fit_params.free:
p.nominal_value = p.value
# reset the nominal values to their original values as if nothing happened
# note that we manipulate the internal `_params` object directly, circumventing
# the getter method!
for p in best_fit_info._params.free:
p.nominal_value = original_nominal_values[p.name]
# Because we directly manipulated the internal parameters, we need to update
# the hash.
best_fit_info._rehash()
# Make sure that the hypo_maker has its params also at the best fit point
# with the original nominal parameter values.
if hypo_maker.__class__.__name__ == "Detectors":
update_param_values_detector(
hypo_maker, best_fit_info.params.free, update_nominal_values=True
)
else:
update_param_values(
hypo_maker, best_fit_info.params.free, update_nominal_values=True
)
return best_fit_info
def _fit_scipy(self, data_dist, hypo_maker, metric,
external_priors_penalty, method_kwargs, local_fit_kwargs):
"""Run an arbitrary scipy minimizer to modify hypo dist maker's free params
until the data_dist is most likely to have come from this hypothesis.
This function uses only local optimization and does not attempt to find
a global optimum among several local optima.
Parameters
----------
data_dist : MapSet or List of MapSets
Data distribution(s)
hypo_maker : Detectors, DistributionMaker or convertible thereto
metric : string or iterable of strings
minimizer_settings : dict
external_priors_penalty : func
User defined prior penalty function
Returns
-------
fit_info : HypoFitResult
"""
global_scipy_methods = ["differential_evolution", "basinhopping",
"dual_annealing", "shgo"]
methods_using_local_fits = ["basinhopping", "dual_annealing", "shgo"]
global_method = None
if "global_method" in method_kwargs.keys():
global_method = method_kwargs["global_method"]
if local_fit_kwargs is not None and global_method not in methods_using_local_fits:
logging.warning(f"local_fit_kwargs are ignored by global method {global_method}")
if global_method is None:
logging.info(f"entering local scipy fit using {method_kwargs['method']['value']}")
else:
if not global_method in global_scipy_methods:
raise ValueError(f"Unsupported global fit method {global_method}")
logging.info(f"entering global scipy fit using {global_method}")
if not self.blindness:
logging.debug("free parameters:")
logging.debug(hypo_maker.params.free)
if global_method in methods_using_local_fits:
if local_fit_kwargs is not None:
minimizer_settings = set_minimizer_defaults(local_fit_kwargs)
validate_minimizer_settings(minimizer_settings)
else:
# We put this here such that the checks farther down don't crash
minimizer_settings = {"method": {"value": "none"}, "options": {"value": {}}}
elif global_method == "differential_evolution":
# unfortunately we are not allowed to configure local min.
opt = method_kwargs["options"]
if local_fit_kwargs is not None and "constraints" in local_fit_kwargs["options"]["value"]:
raise RuntimeError("Pass constraints to differential evolution only via method_kwargs!")
# polish is default
if "constraints" in opt and opt["constraints"]:
# When we detect constraints (which are not empty or None),
# we assume polishing is requested (constraints useless otherwise)
opt["polish"] = True
if "polish" in opt and opt["polish"]:
if "constraints" in opt:
if opt["constraints"] is None:
opt["constraints"] = []
if opt["constraints"]:
logging.info(
"Differential evolution result is forced to be polished "
"with trust-constr."
)
minimizer_settings = {
"method": {"value": "trust-constr"},
"options": {"value": {"constraints": opt.pop("constraints")}}
}
else:
logging.info(
"Differential evolution result is forced to be polished "
"with L-BFGS-B."
)
minimizer_settings = {
"method": {"value": "l-bfgs-b"},
"options": {"value": {"eps": 1e-8}}
}
else:
# Remove any possible constraints key which is empty
opt.pop("constraints", None)
# We put this here such that the checks farther down don't crash
minimizer_settings = {"method": {"value": "none"}, "options": {"value": {}}}
else:
minimizer_settings = set_minimizer_defaults(method_kwargs)
validate_minimizer_settings(minimizer_settings)
if isinstance(metric, str):
metric = [metric]
sign = 0
for m in metric:
if m in METRICS_TO_MAXIMIZE and sign != +1:
sign = -1
elif m in METRICS_TO_MINIMIZE and sign != -1:
sign = +1
else:
raise ValueError("Defined metrics are not compatible")
# Get starting free parameter values
x0 = np.array(hypo_maker.params.free._rescaled_values) # pylint: disable=protected-access
# Indicate indices where x0 should be reflected around the mid-point at 0.5.
# This is only used for the COBYLA minimizer.
flip_x0 = np.zeros(len(x0), dtype=bool)
minimizer_method = minimizer_settings["method"]["value"].lower()
if ("constraints" in minimizer_settings["options"]["value"]
and minimizer_settings["options"]["value"]["constraints"]):
# convert user-specified equality or inequality constraint expressions
scipy_constraints_to_callables(minimizer_settings["options"]["value"]["constraints"], hypo_maker)
constrs = minimizer_settings["options"]["value"].pop("constraints")
else:
constrs = []
if minimizer_method == 'cobyla' and parse_version(scipy.__version__) < parse_version('1.11.0'):
logging.warning(
'Minimizer %s requires bounds to be formulated in terms of constraints.'
' Constraining functions are auto-generated now.',
minimizer_method
)
bound_constrs = []
for idx in range(len(x0)):
fl = lambda x, i=idx: x[i] - FTYPE_PREC # lower bound at zero
l = make_scipy_constraint_dict(constr_type='ineq', fun=fl)
fu = lambda x, i=idx: 1. - x[i] # upper bound at 1
u = make_scipy_constraint_dict(constr_type='ineq', fun=fu)
bound_constrs.append(l)
bound_constrs.append(u)
constrs += bound_constrs
# The minimizer begins with a step of size `rhobeg` in the positive
# direction. Flipping around 0.5 ensures that this initial step will not
# overstep boundaries if `rhobeg` is 0.5.
flip_x0 = np.array(x0) > 0.5
# The minimizer can't handle bounds, but they still need to be passed for
# the interface to be uniform even though they are not used.
bounds = [(0, 1)]*len(x0)
x0 = np.where(flip_x0, 1 - x0, x0)
if global_method is None:
logging.debug('Running the %s minimizer...', minimizer_method)
else:
logging.debug(f"Running the {global_method} global fit method...")
# Using scipy.optimize.minimize allows a whole host of minimizers to be
# used.
counter = Counter()
fit_history = []
fit_history.append(list(metric) + [v.name for v in hypo_maker.params.free])
start_t = time.time()
if self.pprint and not self.blindness:
free_p = hypo_maker.params.free
self._pprint_header(free_p, external_priors_penalty, metric)
# reset number of iterations before each minimization
self._nit = 0
# Before starting minimization, check if we already have a perfect match between data and template
# This can happen if using pseudodata that was generated with the nominal values for parameters
# (which will also be the initial values in the fit) and blah...
# If this is the case, don't both to fit and return results right away.
# Grab the hypo map
hypo_asimov_dist = hypo_maker.get_outputs(return_sum=True)
# Check if the hypo matches data
matches = False
if isinstance(data_dist, list):
matches = all( entry.allclose(hypo_asimov_dist[ie]) for ie, entry in enumerate(data_dist) )
else:
matches = data_dist.allclose(hypo_asimov_dist)
if matches:
msg = 'Initial hypo matches data, no need for fit'
logging.info(msg)
# Get the metric value at this initial point (for the returned data)
initial_metric_val = (
data_dist.metric_total(expected_values=hypo_asimov_dist, metric=metric[0])
+ hypo_maker.params.priors_penalty(metric=metric[0])
)
# Return fit results, even though didn't technically fit
return HypoFitResult(
metric,
initial_metric_val,
data_dist,
hypo_maker,
minimizer_time=0.,
minimizer_metadata={"success":True, "nit":0, "message":msg}, # Add some metadata in the format returned by `scipy.optimize.minimize`
fit_history=None,
other_metrics=None,
num_distributions_generated=0,
include_detailed_metric_info=True,
)
#
# From that point on, optimize starts using the metric and
# iterates, no matter what you do
#
if global_method is None:
optimize_result = optimize.minimize(
fun=self._minimizer_callable,
x0=x0,
args=(hypo_maker, data_dist, metric, counter, fit_history,
flip_x0, external_priors_penalty),
bounds=bounds,
constraints=constrs,
method=minimizer_settings['method']['value'],
options=minimizer_settings['options']['value'],
callback=self._minimizer_callback
)
elif global_method == "differential_evolution":
optimize_result = optimize.differential_evolution(
func=self._minimizer_callable,
bounds=bounds,
constraints=[old_constraint_to_new(i, cd) for i, cd in enumerate(constrs)],
args=(hypo_maker, data_dist, metric, counter, fit_history,
flip_x0, external_priors_penalty),
callback=self._minimizer_callback,
**method_kwargs["options"]
)
elif global_method == "basinhopping":
opt = method_kwargs["options"]
if "seed" in opt:
seed = opt["seed"]
else:
seed = None
rng = check_random_state(seed)
if "stepsize" in opt:
stepsize = opt["stepsize"]
else:
stepsize = 0.5
take_step = BoundedRandomDisplacement(stepsize, bounds, rng)
if minimizer_method != "none":
minimizer_kwargs = make_scipy_local_minimizer_kwargs(
minimizer_settings=minimizer_settings,
constrs=constrs,
bounds=bounds
)
else:
minimizer_kwargs = {"bounds": bounds}
# we need to pass args and bounds in any case as part of minimizer_kwargs
minimizer_kwargs["args"] = (
hypo_maker, data_dist, metric, counter, fit_history,
flip_x0, external_priors_penalty
)
if "reset_free" in minimizer_kwargs:
del minimizer_kwargs["reset_free"]
def basinhopping_callback(x, f, accept):
self._nit += 1
optimize_result = optimize.basinhopping(
func=self._minimizer_callable,
x0=x0,
take_step=take_step,
callback=basinhopping_callback,
minimizer_kwargs=minimizer_kwargs,
**method_kwargs["options"]
)
elif global_method == "dual_annealing":
minimizer_kwargs = make_scipy_local_minimizer_kwargs(
minimizer_settings=minimizer_settings,
constrs=constrs,
bounds=bounds
)
if "reset_free" in minimizer_kwargs:
del minimizer_kwargs["reset_free"]
def annealing_callback(x, f, context):
self._nit += 1
optimize_result = optimize.dual_annealing(
func=self._minimizer_callable,
bounds=bounds,
x0=x0,
args=(hypo_maker, data_dist, metric, counter, fit_history,
flip_x0, external_priors_penalty),
minimizer_kwargs=minimizer_kwargs if minimizer_method != "none" else {},
callback=annealing_callback,
**method_kwargs["options"]
)
elif global_method == "shgo":
minimizer_kwargs = make_scipy_local_minimizer_kwargs(
minimizer_settings=minimizer_settings,
constrs=constrs,
bounds=bounds
)
if "reset_free" in minimizer_kwargs:
del minimizer_kwargs["reset_free"]
if minimizer_kwargs["method"] != "none":
logging.warning(
f"Due to a scipy bug, shgo will ignore many local minimiser "
"options. This will most likely result in unreliable "
"behaviour or even crashes. Refer to "
"https://github.com/scipy/scipy/issues/20028."
)
optimize_result = optimize.shgo(
func=self._minimizer_callable,
bounds=bounds,
args=(hypo_maker, data_dist, metric, counter, fit_history,
flip_x0, external_priors_penalty),
callback=self._minimizer_callback,
minimizer_kwargs=minimizer_kwargs if minimizer_method != "none" else {},
**method_kwargs["options"]
)
end_t = time.time()
if self.pprint:
# clear the line
sys.stdout.write('\n\n')
sys.stdout.flush()
minimizer_time = end_t - start_t
# Check for minimization failure
if not optimize_result.success:
if self.blindness:
msg = ''
else:
msg = ' ' + str(optimize_result.message)
logging.warning('Optimization failed.' + msg)
# Instead of crashing completely, return a fit result with an infinite
# test statistic value.
metadata = {"success":optimize_result.success, "message":optimize_result.message,}
return HypoFitResult(
metric,
sign * np.inf,
data_dist,
hypo_maker,
minimizer_time=minimizer_time,
minimizer_metadata=metadata,
fit_history=fit_history,
other_metrics=None,
num_distributions_generated=counter.count,
include_detailed_metric_info=False,
)
logging.info(
'Total time to optimize: %8.4f s; # of dists generated: %6d;'
' avg dist gen time: %10.4f ms',
minimizer_time, counter.count, minimizer_time*1000./counter.count
)
# Will not assume that the minimizer left the hypo maker in the
# minimized state, so set the values now (also does conversion of
# values from [0,1] back to physical range)
rescaled_pvals = optimize_result.pop('x')
rescaled_pvals = np.where(flip_x0, 1 - rescaled_pvals, rescaled_pvals)
hypo_maker._set_rescaled_free_params(rescaled_pvals) # pylint: disable=protected-access
if hypo_maker.__class__.__name__ == "Detectors":
update_param_values_detector(hypo_maker, hypo_maker.params.free) #updates values for ALL detectors
# Get the best-fit metric value
metric_val = sign * optimize_result.pop('fun')
# Record minimizer metadata (all info besides 'x' and 'fun'; also do
# not record some attributes if performing blinded analysis)
metadata = OrderedDict()
for k in sorted(optimize_result.keys()):
if self.blindness and k in ['jac', 'hess', 'hess_inv']:
continue
if k=='hess_inv':
continue
if k=="message" and isinstance(optimize_result[k], bytes):
# A little fix for deserialization: After serialization and
# deserialization, the string would be decoded anyway and then the
# recovered object would look different.
metadata[k] = optimize_result[k].decode('utf-8')
continue
metadata[k] = optimize_result[k]
if self.blindness > 1: # only at stricter blindness level
# undo flip
x0 = np.where(flip_x0, 1 - x0, x0)
# Reset to starting value of the fit, rather than nominal values because
# the nominal value might be out of range if this is inside an octant check.
hypo_maker._set_rescaled_free_params(x0)
if hypo_maker.__class__.__name__ == "Detectors":
update_param_values_detector(hypo_maker, hypo_maker.params.free) #updates values for ALL detectors
# TODO: other metrics
fit_info = HypoFitResult(
metric,
metric_val,
data_dist,
hypo_maker,
minimizer_time=minimizer_time,
minimizer_metadata=metadata,
fit_history=fit_history,
other_metrics=None,
num_distributions_generated=counter.count,
include_detailed_metric_info=True,
)
if not self.blindness:
logging.info(f"found best fit: {fit_info.params.free}")
return fit_info
def _fit_iminuit(self, data_dist, hypo_maker, metric,
external_priors_penalty, method_kwargs, local_fit_kwargs):
"""Run the Minuit minimizer to modify hypo dist maker's free params
until the data_dist is most likely to have come from this hypothesis.
This function uses only local optimization and does not attempt to find
a global optimum among several local optima.
Parameters
----------
data_dist : MapSet or List of MapSets
Data distribution(s)
hypo_maker : Detectors, DistributionMaker or convertible thereto
metric : string or iterable of strings
external_priors_penalty : func
User defined prior penalty function
method_kwargs : dict
Options passed on for Minuit
local_fit_kwargs : dict
Ignored since no local fit happens inside this fit
Returns
-------
fit_info : HypoFitResult
"""
logging.info("Entering local fit using Minuit")
if local_fit_kwargs is not None:
logging.warning("Local fit kwargs are ignored by 'fit_minuit'."
"Use 'method_kwargs' to set Minuit options.")
if method_kwargs is None:
method_kwargs = {} # use all defaults
if not self.blindness:
logging.debug("free parameters:")
logging.debug(hypo_maker.params.free)
if isinstance(metric, str):
metric = [metric]
sign = 0
for m in metric:
if m in METRICS_TO_MAXIMIZE and sign != +1:
sign = -1
elif m in METRICS_TO_MINIMIZE and sign != -1:
sign = +1
else:
raise ValueError("Defined metrics are not compatible")
# Get starting free parameter values
x0 = np.array(hypo_maker.params.free._rescaled_values) # pylint: disable=protected-access
bounds = [(0, 1)]*len(x0)
counter = Counter()
fit_history = []
fit_history.append(list(metric) + [v.name for v in hypo_maker.params.free])
start_t = time.time()
if self.pprint and not self.blindness:
free_p = hypo_maker.params.free
self._pprint_header(free_p, external_priors_penalty, metric)
# reset number of iterations before each minimization
self._nit = 0
# we never flip in minuit, but we still need to set it
flip_x0 = np.zeros(len(x0), dtype=bool)
args=(hypo_maker, data_dist, metric, counter, fit_history,
flip_x0, external_priors_penalty)
def loss_func(x):
# In rare circumstances, minuit will try setting one of the parameters
# to NaN. Minuit might be able to recover when we return NaN.
if np.any(~np.isfinite(x)):
logging.warning(f"Minuit tried evaluating at invalid parameters: {x}")
return np.nan
return self._minimizer_callable(x, *args)
m = Minuit(loss_func, x0)
m.limits = bounds
# only initial step size, not very important
if "errors" in method_kwargs.keys():
m.errors = method_kwargs["errors"]
# Precision with which the likelihood is calculated
if "precision" in method_kwargs.keys():
m.precision = method_kwargs["precision"]
else:
# Documentation states that this value should be set to "some multiple of
# the smallest relative change of a parameter that still changes the
# function".
m.precision = 5 * FTYPE_PREC
if "tol" in method_kwargs.keys():
m.tol = method_kwargs["tol"]
simplex = False
if "run_simplex" in method_kwargs.keys():
simplex = method_kwargs["run_simplex"]
migrad = True
if "run_migrad" in method_kwargs.keys():
migrad = method_kwargs["run_migrad"]
if not (migrad or simplex):
raise ValueError("Must select at least one of MIGRAD or SIMPLEX to run")
# Minuit needs to know if the loss function is interpretable as a likelihood
# or as a least-squares loss. It influences the stopping condition where the
# estimated uncertainty on parameters is small compared to their covariance.
if metric[0] in LLH_METRICS:
m.errordef = Minuit.LIKELIHOOD
elif metric[0] in CHI2_METRICS:
m.errordef = Minuit.LEAST_SQUARES
else:
raise ValueError("Metric neither LLH or CHI2, unknown error definition.")
# Minuit can sometimes try to evaluate at NaN parameters if the liklihood
# is badly behaved. We don't want to completely crash in that case.
m.throw_nan = False
# actually run the minimization!
if simplex:
logging.info("Running SIMPLEX")
m.simplex()
if migrad:
logging.info("Running MIGRAD")
m.migrad()
end_t = time.time()
if self.pprint:
# clear the line
sys.stdout.write('\n\n')
sys.stdout.flush()
minimizer_time = end_t - start_t
logging.info(
'Total time to optimize: %8.4f s; # of dists generated: %6d;'
' avg dist gen time: %10.4f ms',
minimizer_time, counter.count, minimizer_time*1000./counter.count
)
if not m.accurate:
logging.warning("Covariance matrix invalid.")
if not m.valid:
logging.warning("Minimum not valid according to Minuit's criteria.")
# Will not assume that the minimizer left the hypo maker in the
# minimized state, so set the values now (also does conversion of
# values from [0,1] back to physical range)
rescaled_pvals = np.array(m.values)
hypo_maker._set_rescaled_free_params(rescaled_pvals) # pylint: disable=protected-access
if hypo_maker.__class__.__name__ == "Detectors":
update_param_values_detector(hypo_maker, hypo_maker.params.free) #updates values for ALL detectors
# Get the best-fit metric value
metric_val = sign * m.fval
# Record minimizer metadata (all info besides 'x' and 'fun'; also do
# not record some attributes if performing blinded analysis)
metadata = OrderedDict()
# param names are relevant because they allow one to reconstruct which
# parameter corresponds to which entry in the covariance matrix
metadata["param_names"] = hypo_maker.params.free.names
# The criteria to deem a minimum "valid" are too strict for our purposes, so
# we accept the value even if m.valid is False.
metadata["success"] = np.isfinite(metric_val)
metadata["valid"] = m.valid
metadata["accurate"] = m.accurate
metadata["edm"] = m.fmin.edm
metadata["edm_goal"] = m.fmin.edm_goal
metadata["has_reached_call_limit"] = m.fmin.has_reached_call_limit
metadata["has_parameters_at_limit"] = m.fmin.has_parameters_at_limit
metadata["nit"] = m.nfcn
metadata["message"] = "Minuit finished."
if not self.blindness:
if self.blindness < 2:
metadata["rescaled_values"] = np.array(m.values)
else:
metadata["rescaled_values"] = np.full(len(m.values), np.nan)
if m.accurate:
metadata["hess_inv"] = np.array(m.covariance)
else:
metadata["hess_inv"] = np.full((len(x0), len(x0)), np.nan)
if self.blindness > 1: # only at stricter blindness level
# undo flip
x0 = np.where(flip_x0, 1 - x0, x0)
# Reset to starting value of the fit, rather than nominal values because
# the nominal value might be out of range if this is inside an octant check.
hypo_maker._set_rescaled_free_params(x0)
if hypo_maker.__class__.__name__ == "Detectors":
update_param_values_detector(hypo_maker, hypo_maker.params.free) #updates values for ALL detectors
# TODO: other metrics
fit_info = HypoFitResult(
metric,
metric_val,
data_dist,
hypo_maker,
minimizer_time=minimizer_time,
minimizer_metadata=metadata,
fit_history=fit_history,
other_metrics=None,
num_distributions_generated=counter.count,
include_detailed_metric_info=True,
)
if not self.blindness:
logging.info(f"found best fit: {fit_info.params.free}")
return fit_info
def _fit_nlopt(self, data_dist, hypo_maker, metric,
external_priors_penalty, method_kwargs, local_fit_kwargs):
"""Run any of the (gradient-free) NLOPT optimizers to modify hypo dist maker's
free params until the data_dist is most likely to have come from this
hypothesis.
Parameters
----------
data_dist : MapSet or List of MapSets
Data distribution(s)
hypo_maker : Detectors, DistributionMaker or convertible thereto
metric : string or iterable of strings
external_priors_penalty : func
User defined prior penalty function
method_kwargs : dict
Options passed on for NLOPT
local_fit_kwargs : dict
Ignored since no local fit happens inside this fit
Returns
-------
fit_info : HypoFitResult
"""
logging.info("Entering fit using NLOPT")
if local_fit_kwargs is not None:
logging.warning("`local_fit_kwargs` are ignored by 'fit_nlopt'."
"Use `method_kwargs` to set nlopt options and use "
"`method_kwargs['local_optimizer']` to define the settings of "
"a subsidiary NLOPT optimizer.")
if method_kwargs is None:
raise ValueError("Need to specify at least the algorithm to run.")
if not self.blindness:
logging.debug("free parameters:")
logging.debug(hypo_maker.params.free)
if isinstance(metric, str):
metric = [metric]
sign = 0
for m in metric:
if m in METRICS_TO_MAXIMIZE and sign != +1:
sign = -1
elif m in METRICS_TO_MINIMIZE and sign != -1:
sign = +1
else:
raise ValueError("Defined metrics are not compatible")
# Get starting free parameter values
x0 = np.array(hypo_maker.params.free._rescaled_values) # pylint: disable=protected-access
counter = Counter()
fit_history = []
fit_history.append(list(metric) + [v.name for v in hypo_maker.params.free])
start_t = time.time()
if self.pprint and not self.blindness:
free_p = hypo_maker.params.free
self._pprint_header(free_p, external_priors_penalty, metric)
# reset number of iterations before each minimization
self._nit = 0
# we never flip in nlopt, but we still need to set it
flip_x0 = np.zeros(len(x0), dtype=bool)
args=(hypo_maker, data_dist, metric, counter, fit_history,
flip_x0, external_priors_penalty)
def loss_func(x, grad):
if np.any(~np.isfinite(x)):
logging.warning(f"NLOPT tried evaluating at invalid parameters: {x}")
return np.nan
if grad.size > 0:
raise RuntimeError("Gradients cannot be calculated, use a gradient-free"
" optimization routine instead.")
return self._minimizer_callable(x, *args)
opt = self._define_nlopt_opt(method_kwargs, loss_func, hypo_maker)
# For some stochastic optimization methods such as CRS2, a seed parameter may
# be used to make the optimization deterministic. Otherwise, nlopt will use a
# random seed based on the current system time.
if "seed" in method_kwargs:
nlopt.srand(method_kwargs["seed"])
logging.info(f"Starting optimization using {opt.get_algorithm_name()}")
xopt = opt.optimize(x0)
end_t = time.time()
if self.pprint:
# clear the line
sys.stdout.write('\n\n')
sys.stdout.flush()
minimizer_time = end_t - start_t
logging.info(
'Total time to optimize: %8.4f s; # of dists generated: %6d;'
' avg dist gen time: %10.4f ms',
minimizer_time, counter.count, minimizer_time*1000./counter.count
)
# Will not assume that the minimizer left the hypo maker in the
# minimized state, so set the values now (also does conversion of
# values from [0,1] back to physical range)
rescaled_pvals = xopt
hypo_maker._set_rescaled_free_params(rescaled_pvals) # pylint: disable=protected-access
if hypo_maker.__class__.__name__ == "Detectors":
update_param_values_detector(hypo_maker, hypo_maker.params.free) #updates values for ALL detectors
# Get the best-fit metric value
metric_val = sign * opt.last_optimum_value()
# Record minimizer metadata (all info besides 'x' and 'fun'; also do
# not record some attributes if performing blinded analysis)
metadata = OrderedDict()
nlopt_result = opt.last_optimize_result()
# Positive return values are successes, negative return values are failures.
# see https://nlopt.readthedocs.io/en/latest/NLopt_Reference/#return-values
metadata["success"] = nlopt_result > 0
metadata["nlopt_result"] = nlopt_result
metadata["nit"] = opt.get_numevals()
metadata["message"] = {
1: "NLOPT_SUCCESS",
2: "NLOPT_STOPVAL_REACHED",
3: "NLOPT_FTOL_REACHED",
4: "NLOPT_XTOL_REACHED",
5: "NLOPT_MAXEVAL_REACHED",
6: "NLOPT_MAXTIME_REACHED",
-1: "NLOPT_FAILURE",
-2: "NLOPT_INVALID_ARGS",
-3: "NLOPT_OUT_OF_MEMORY",
-4: "NLOPT_ROUNDOFF_LIMITED",
-5: "NLOPT_FORCED_STOP"
}[nlopt_result]
if self.blindness < 2:
metadata["rescaled_values"] = rescaled_pvals
else:
metadata["rescaled_values"] = np.full(len(m.values), np.nan) #FIXME: m.values?
# we don't get a Hessian from nlopt
metadata["hess_inv"] = np.full((len(x0), len(x0)), np.nan)
if self.blindness > 1: # only at stricter blindness level
hypo_maker._set_rescaled_free_params(x0)
if hypo_maker.__class__.__name__ == "Detectors":
update_param_values_detector(hypo_maker, hypo_maker.params.free) #updates values for ALL detectors
# TODO: other metrics
fit_info = HypoFitResult(
metric,
metric_val,
data_dist,
hypo_maker,
minimizer_time=minimizer_time,
minimizer_metadata=metadata,
fit_history=fit_history,
other_metrics=None,
num_distributions_generated=counter.count,
include_detailed_metric_info=True,
)
if not self.blindness:
logging.info(f"found best fit: {fit_info.params.free}")
return fit_info
def _define_nlopt_opt(self, method_kwargs, loss_func, hypo_maker):
"""
Helper function that reads the options from a dictionary and configures
an nlopt.opt object with all the options applied. Some global search algorithms
also need a local/subsidiary optimizer. Its options can be specified in
`method_kwargs['local_optimizer']` as a dictionary of the same form that is
passed to this function again to build the nlopt.opt object.
"""
if not "algorithm" in method_kwargs.keys():
raise ValueError("Need to specify the algorithm to use.")
alg_name_splits = method_kwargs["algorithm"].split("_")
if not alg_name_splits[0] == "NLOPT":
raise ValueError("Algorithm name should be specified as `NLOPT_{G,L}N_XXX`")
if len(alg_name_splits[1]) > 1 and alg_name_splits[1][1] == "D":
raise ValueError("Only gradient-free algorithms (NLOPT_GN or NLOPT_LN) are "
"supported.")
algorithm = getattr(nlopt, "_".join(alg_name_splits[1:]))
x0 = np.array(hypo_maker.params.free._rescaled_values)
opt = nlopt.opt(algorithm, len(x0))
opt.set_min_objective(loss_func)
if "ftol_abs" in method_kwargs.keys():
opt.set_ftol_abs(method_kwargs["ftol_abs"])
if "ftol_rel" in method_kwargs.keys():
opt.set_ftol_rel(method_kwargs["ftol_rel"])
if "xtol_abs" in method_kwargs.keys():
opt.set_xtol_abs(method_kwargs["xtol_abs"])
if "xtol_rel" in method_kwargs.keys():
opt.set_xtol_rel(method_kwargs["xtol_rel"])
if "stopval" in method_kwargs.keys():
opt.set_stopval(method_kwargs["stopval"])
if "maxeval" in method_kwargs.keys():
opt.set_maxeval(method_kwargs["maxeval"])
# Maximum runtime in seconds
if "maxtime" in method_kwargs.keys():
opt.set_maxtime(method_kwargs["maxtime"])
# set algorithm-specific parameters (see
# https://nlopt.readthedocs.io/en/latest/NLopt_Reference/#algorithm-specific-parameters)
if "algorithm_params" in method_kwargs.keys():
for k, v in method_kwargs["algorithm_params"].items():
opt.set_param(k, v)
if "ineq_constraints" in method_kwargs.keys():
ineq_funcs = get_nlopt_inequality_constraint_funcs(method_kwargs, hypo_maker)
for ineq_func in ineq_funcs:
opt.add_inequality_constraint(ineq_func)
# Population size for stochastic search algorithms, see
# https://nlopt.readthedocs.io/en/latest/NLopt_Reference/#stochastic-population
if "population" in method_kwargs.keys():
opt.set_population(method_kwargs["population"])
if "initial_step" in method_kwargs.keys():
opt.set_initial_step(method_kwargs["initial_step"])
opt.set_lower_bounds(0.)
opt.set_upper_bounds(1.)
if "local_optimizer" in method_kwargs.keys():
local_opt = self._define_nlopt_opt(method_kwargs["local_optimizer"],
loss_func, hypo_maker)
opt.set_local_optimizer(local_opt)
return opt
def _pprint_header(self, free_p, external_priors_penalty, metric):
# Display any units on top
r = re.compile(r'(^[+0-9.eE-]* )|(^[+0-9.eE-]*$)')
hdr = ' '*(6+1+10+1+12+3)
unt = []
for p in free_p:
u = r.sub('', format(p.value, '~')).replace(' ', '')[0:10]
if u:
u = '(' + u + ')'
unt.append(u.center(12))
hdr += ' '.join(unt)
hdr += '\n'
# Header names
hdr += ('iter'.center(6) + ' ' + 'funcalls'.center(10) + ' ' +
metric[0][0:12].center(12) + ' | ')
hdr += ' '.join([p.name[0:12].center(12) for p in free_p])
if external_priors_penalty is not None:
hdr += " | penalty "
hdr += '\n'
# Underscores
hdr += ' '.join(['-'*6, '-'*10, '-'*12, '+'] + ['-'*12]*len(free_p))
if external_priors_penalty is not None:
hdr += " + -----------"
hdr += '\n'
sys.stdout.write(hdr)
def _minimizer_callable(self, scaled_param_vals, hypo_maker, data_dist,
metric, counter, fit_history, flip_x0,
external_priors_penalty=None):
"""Simple callback for use by scipy.optimize minimizers.
This should *not* in general be called by users, as `scaled_param_vals`
are stripped of their units and scaled to the range [0, 1], and hence
some validation of inputs is bypassed by this method.
Parameters
----------
scaled_param_vals : sequence of floats
If called from a scipy.optimize minimizer, this sequence is
provieded by the minimizer itself. These values are all expected to
be in the range [0, 1] and be simple floats (no units or
uncertainties attached, etc.). Rescaling the parameter values to
their original (physical) ranges (including units) is handled
within this method.
hypo_maker : Detectors or DistributionMaker
Creates the per-bin expectation values per map
based on its param values. Free params in the
`hypo_maker` are modified by the minimizer to achieve a "best" fit.
data_dist : Sequence of MapSets or MapSet
Data distribution to be fit. Can be an actual-, Asimov-, or
pseudo-data distribution (where the latter two are derived from
simulation and so aren't technically "data").
metric : string or iterable of strings
Metric by which to evaluate the fit. See Map
counter : Counter
Mutable object to keep track--outside this method--of the number of
times this method is called.
flip_x0 : ndarray of type bool
Indicates which indices of x0 should be flipped around 0.5.
external_priors_penalty : func
User defined prior penalty function, which takes `hypo_maker` and
`metric` as arguments and returns numerical value of penalty to
the metric value. It is expected sign of the penalty is correctly
specified inside the `external_priors_penalty` (e.g. negative for
llh or positive for chi2).
"""
# Want to *maximize* e.g. log-likelihood but we're using a minimizer,
# so flip sign of metric in those cases.
if isinstance(metric, str):
metric = [metric]
sign = 0
for m in metric:
if m in METRICS_TO_MAXIMIZE and sign != +1:
sign = -1
elif m in METRICS_TO_MINIMIZE and sign != -1:
sign = +1
else:
raise ValueError('Defined metrics are not compatible')
scaled_param_vals = np.where(flip_x0, 1 - scaled_param_vals, scaled_param_vals)
# Set param values from the scaled versions the minimizer works with
hypo_maker._set_rescaled_free_params(scaled_param_vals) # pylint: disable=protected-access
if hypo_maker.__class__.__name__ == "Detectors":
update_param_values_detector(hypo_maker, hypo_maker.params.free) #updates values for ALL detectors
# Get the map set
try:
if metric[0] == 'generalized_poisson_llh':
raise NotImplementedError(
"generalized_poisson_llh isn't correctly implemented any longer!"
)
# FIXME: these `output_mode` and `force_standard_output` kwargs were removed in
# https://github.com/icecube/pisa/commit/7a4e875aa7bdc52ea64a5270e9808d866d1395f3
hypo_asimov_dist = hypo_maker.get_outputs(return_sum=False, output_mode='binned', force_standard_output=False)
hypo_asimov_dist = merge_mapsets_together(mapset_list=hypo_asimov_dist)
data_dist = data_dist.maps[0] # Extract the map from the MapSet
metric_kwargs = {'empty_bins':hypo_maker.empty_bin_indices}
else:
hypo_asimov_dist = hypo_maker.get_outputs(return_sum=True)
# TODO: can be removed? (see same commit as above)
if isinstance(hypo_asimov_dist, OrderedDict):
hypo_asimov_dist = hypo_asimov_dist['weights']
metric_kwargs = {}
except Exception as e:
if self.blindness:
logging.error('Minimizer failed')
else:
logging.error(
'Failed to generate distribution with free'
' params %s', hypo_maker.params.free
)
logging.error(str(e))
raise
#
# Assess the fit: whether the data came from the hypo_asimov_dist
#
try:
if hypo_maker.__class__.__name__ == "Detectors":
metric_val = 0
for i in range(len(hypo_maker.distribution_makers)):
data = data_dist[i].metric_total(expected_values=hypo_asimov_dist[i],
metric=metric[i], metric_kwargs=metric_kwargs)
metric_val += data
priors = hypo_maker.params.priors_penalty(metric=metric[0]) # uses just the "first" metric for prior
metric_val += priors
elif isinstance(hypo_asimov_dist, list): # DistributionMaker object with variable binning
metric_val = 0
for i in range(len(hypo_asimov_dist)):
metric_val += data_dist[i].metric_total(expected_values=hypo_asimov_dist[i],
metric=metric[0], metric_kwargs=metric_kwargs)
metric_val += hypo_maker.params.priors_penalty(metric=metric[0])
else: # DistributionMaker object with regular binning
if metric[0] == 'weighted_chi2':
actual_values = data_dist.hist['total']
expected_values = hypo_asimov_dist.hist['total']
d = {'output_binning': hypo_maker.pipelines[0].output_binning,
'output_key': 'bin_unc2'}
bin_unc2 = hypo_maker.get_outputs(return_sum=True, **d).hist['total']
metric_val = (
np.sum(weighted_chi2(actual_values, expected_values, bin_unc2))
+ hypo_maker.params.priors_penalty(metric=metric[0])
)
else:
metric_val = (
data_dist.metric_total(expected_values=hypo_asimov_dist,
metric=metric[0], metric_kwargs=metric_kwargs)
+ hypo_maker.params.priors_penalty(metric=metric[0])
)
except Exception as e:
if self.blindness:
logging.error('Minimizer failed')
else :
logging.error(
'Failed when computing metric with free params %s',
hypo_maker.params.free
)
logging.error(str(e))
raise
penalty = 0
if external_priors_penalty is not None:
penalty = external_priors_penalty(hypo_maker=hypo_maker,metric=metric)
# Report status of metric & params (except if blinded)
if self.blindness:
msg = ('minimizer iteration: #%6d | function call: #%6d'
%(self._nit, counter.count))
else:
#msg = '%s=%.6e | %s' %(metric, metric_val, hypo_maker.params.free)
msg = '%s %s %s | ' %(('%d'%self._nit).center(6),
('%d'%counter.count).center(10),
format(metric_val, '0.5e').rjust(12))
msg += ' '.join([('%0.5e'%p.value.m).rjust(12)
for p in hypo_maker.params.free])
if external_priors_penalty is not None:
msg += f" | {penalty:11.4e}"
if self.pprint:
sys.stdout.write(msg + '\n')
sys.stdout.flush()
else:
logging.trace(msg)
counter += 1
if not self.blindness:
fit_history.append(
[metric_val] + [v.value.m for v in hypo_maker.params.free]
)
if external_priors_penalty is not None:
metric_val += external_priors_penalty(hypo_maker=hypo_maker,metric=metric)
return sign*metric_val
def _minimizer_callback(self, xk, *unused_args, **unused_kwargs): # pylint: disable=unused-argument
"""Passed as `callback` parameter to `optimize.minimize`, and is called
after each iteration. Keeps track of number of iterations.
Parameters
----------
xk : list
Parameter vector
"""
self._nit += 1
[docs]
def MCMC_sampling(self, data_dist, hypo_maker, metric, nwalkers, burnin, nsteps,
return_burn_in=False, random_state=None, sampling_algorithm=None):
"""Performs MCMC sampling. Only supports serial (single CPU) execution at the
moment. See issue #830.
Parameters
----------
data_dist : Sequence of MapSets or MapSet
Data distribution to be fit. Can be an actual-, Asimov-, or pseudo-data
distribution (where the latter two are derived from simulation and so aren't
technically "data").
hypo_maker : Detectors or DistributionMaker
Creates the per-bin expectation values per map based on its param values.
Free params in the `hypo_maker` are modified by the minimizer to achieve a
"best" fit.
metric : string or iterable of strings
Metric by which to evaluate the fit. See documentation of Map.
nwalkers : int
Number of walkers
burnin : int
Number of steps in burn in phase
nSteps : int
Number of steps after burn in
return_burn_in : bool
Also return the steps of the burn in phase. Default is False.
random_state : None or type accepted by utils.random_numbers.get_random_state
Random state of the walker starting points. Default is None.
sampling_algorithm : None or emcee.moves object
Sampling algorithm used by the emcee sampler. None means to use the default which
is a Goodman & Weare “stretch move” with parallelization.
See https://emcee.readthedocs.io/en/stable/user/moves/#moves-user to learn more
about the emcee sampling algorithms.
Returns
-------
scaled_chain : numpy array
Array containing all points in the parameter space visited by each walker.
It is sorted by steps, so all the first steps of all walkers come first.
To for example get all values of the Nth parameter and the ith walker, use
scaled_chain[i::nwalkers, N].
scaled_chain_burnin : numpy array (optional)
Same as scaled_chain, but for the burn in phase.
"""
import emcee
assert 'llh' in metric or 'chi2' in metric, 'Use either a llh or chi2 metric'
if 'chi2' in metric:
warnings.warn("You are using a chi2 metric for the MCMC sampling."
"The sampler will assume that llh=0.5*chi2.")
ndim = len(hypo_maker.params.free)
bounds = np.repeat([[0,1]], ndim, axis=0)
rs = get_random_state(random_state)
p0 = rs.random(ndim * nwalkers).reshape((nwalkers, ndim))
def func(scaled_param_vals, bounds, data_dist, hypo_maker, metric):
"""Function called by the MCMC sampler. Similar to _minimizer_callable it
returns the current metric value + prior penalties.
"""
if np.any(scaled_param_vals > np.array(bounds)[:, 1]) or np.any(scaled_param_vals < np.array(bounds)[:, 0]):
return -np.inf
sign = +1 if metric in METRICS_TO_MAXIMIZE else -1
if 'llh' in metric:
N = 1
elif 'chi2' in metric:
N = 0.5
hypo_maker._set_rescaled_free_params(scaled_param_vals) # pylint: disable=protected-access
hypo_asimov_dist = hypo_maker.get_outputs(return_sum=True)
metric_val = (
N * data_dist.metric_total(expected_values=hypo_asimov_dist, metric=metric)
+ hypo_maker.params.priors_penalty(metric=metric)
)
return sign*metric_val
sampler = emcee.EnsembleSampler(
nwalkers, ndim, func,
moves=sampling_algorithm,
args=[bounds, data_dist, hypo_maker, metric]
)
if self.pprint:
sys.stdout.write('Burn in')
sys.stdout.flush()
pos, prob, state = sampler.run_mcmc(p0, burnin, progress=self.pprint)
if return_burn_in:
flatchain_burnin = sampler.flatchain
scaled_chain_burnin = np.full_like(flatchain_burnin, np.nan, dtype=FTYPE)
param_copy_burnin = ParamSet(hypo_maker.params.free)
for s, sample in enumerate(flatchain_burnin):
for dim, rescaled_val in enumerate(sample):
param = param_copy_burnin[dim]
param._rescaled_value = rescaled_val
val = param.value.m
scaled_chain_burnin[s, dim] = val
sampler.reset()
if self.pprint:
sys.stdout.write('Main sampling')
sys.stdout.flush()
sampler.run_mcmc(pos, nsteps, progress=self.pprint)
flatchain = sampler.flatchain
scaled_chain = np.full_like(flatchain, np.nan, dtype=FTYPE)
param_copy = ParamSet(hypo_maker.params.free)
for s, sample in enumerate(flatchain):
for dim, rescaled_val in enumerate(sample):
param = param_copy[dim]
param._rescaled_value = rescaled_val
val = param.value.m
scaled_chain[s, dim] = val
if return_burn_in:
return scaled_chain, scaled_chain_burnin
else:
return scaled_chain
[docs]
class Analysis(BasicAnalysis):
"""Analysis class for "canonical" IceCube/DeepCore/PINGU analyses.
* "Data" distribution creation (via passed `data_maker` object)
* "Expected" distribution creation (via passed `distribution_maker` object)
* Minimizer Interface (via method `_minimizer_callable`)
Interfaces to a minimizer for modifying the free parameters of the
`distribution_maker` to fit its output (as closely as possible) to the
data distribution is provided. See [minimizer_settings] for
"""
def __init__(self):
self._nit = 0
self.pprint = True
self.blindness = False
[docs]
def fit_hypo(self, data_dist, hypo_maker, metric, minimizer_settings,
hypo_param_selections=None, reset_free=True,
check_octant=True, fit_octants_separately=None,
check_ordering=False, other_metrics=None,
blind=False, pprint=True, external_priors_penalty=None):
"""Fitter "outer" loop: If `check_octant` is True, run
`fit_hypo_inner` starting in each octant of theta23 (assuming that
is a param in the `hypo_maker`). Otherwise, just run the inner
method once.
Note that prior to running the fit, the `hypo_maker` has
`hypo_param_selections` applied and its free parameters are reset to
their nominal values.
Parameters
----------
data_dist : MapSet or List of MapSets
Data distribution(s). These are what the hypothesis is tasked to
best describe during the optimization process.
hypo_maker : Detectors, DistributionMaker or instantiable thereto
Generates the expectation distribution under a particular
hypothesis. This typically has (but is not required to have) some
free parameters which can be modified by the minimizer to optimize
the `metric`.
hypo_param_selections : None, string, or sequence of strings
A pipeline configuration can have param selectors that allow
switching a parameter among two or more values by specifying the
corresponding param selector(s) here. This also allows for a single
instance of a DistributionMaker to generate distributions from
different hypotheses.
metric : string or iterable of strings
The metric to use for optimization. Valid metrics are found in
`VALID_METRICS`. Note that the optimized hypothesis also has this
metric evaluated and reported for each of its output maps.
minimizer_settings : string or dict
check_octant : bool
If theta23 is a parameter to be used in the optimization (i.e.,
free), the fit will be re-run in the second (first) octant if
theta23 is initialized in the first (second) octant.
reset_free : bool
Resets all free parameters to values defined in stages when starting a fit
fit_octants_separately : bool
If 'check_octant' is set so that the two octants of theta23 are
individually checked, this flag enforces that each theta23 can
only vary within the octant currently being checked (e.g. the
minimizer cannot swap octants). Deprecated.
check_ordering : bool
If the ordering is not in the hypotheses already being tested, the
fit will be run in both orderings.
other_metrics : None, string, or list of strings
After finding the best fit, these other metrics will be evaluated
for each output that contributes to the overall fit. All strings
must be valid metrics, as per `VALID_METRICS`, or the
special string 'all' can be specified to evaluate all
VALID_METRICS..
pprint : bool
Whether to show live-update of minimizer progress.
blind : bool or int
Whether to carry out a blind analysis. If True or 1, this hides actual
parameter values from display and disallows these (as well as Jacobian,
Hessian, etc.) from ending up in logfiles. If given an integer > 1, the
fitted parameters are also prevented from being stored in fit info
dictionaries.
external_priors_penalty : func
User defined prior penalty function. Adds an extra penalty
to the metric that is minimized, depending on the input function.
Returns
-------
best_fit_info : HypoFitResult (see fit_hypo_inner method for details of
`fit_info` dict)
alternate_fits : list of `fit_info` from other fits run
"""
if fit_octants_separately is not None:
warnings.warn("fit_octants_separately is deprecated and will be ignored, "
"octants are always fit separately now.", DeprecationWarning)
self.blindness = blind
self.pprint = pprint
if hypo_param_selections is None:
hypo_param_selections = hypo_maker.param_selections
if isinstance(metric, str):
metric = [metric]
# Check number of used metrics
if hypo_maker.__class__.__name__ == "Detectors":
if len(metric) == 1: # One metric for all detectors
metric = list(metric) * len(hypo_maker.distribution_makers)
elif len(metric) != len(hypo_maker.distribution_makers):
raise IndexError(f"Number of defined metrics does not match with number of detectors.")
else:
assert len(metric) == 1, f"Only one metric allowed for DistributionMaker"
if check_ordering:
if 'nh' in hypo_param_selections or 'ih' in hypo_param_selections:
raise ValueError('One of the orderings has already been '
'specified as one of the hypotheses but the '
'fit has been requested to check both. These '
'are incompatible.')
logging.info('Performing fits in both orderings.')
extra_param_selections = ['nh', 'ih']
else:
extra_param_selections = [None]
alternate_fits = []
# TODO: Pass alternative fits up the chain
for extra_param_selection in extra_param_selections:
if extra_param_selection is not None:
full_param_selections = hypo_param_selections
full_param_selections.append(extra_param_selection)
else:
full_param_selections = hypo_param_selections
# Select the version of the parameters used for this hypothesis
hypo_maker.select_params(full_param_selections)
# Reset free parameters to nominal values
if reset_free:
hypo_maker.reset_free()
if check_octant:
method = "octants"
method_kwargs = {
"angle": "theta23",
"inflection_point": 45 * ureg.deg,
"reset_free": reset_free,
}
local_fit_kwargs = {
"method": "scipy",
"method_kwargs": minimizer_settings,
"local_fit_kwargs": None,
}
else:
method = "scipy"
method_kwargs = minimizer_settings
local_fit_kwargs = None
# Perform the fit
best_fit_info = self.fit_recursively(
data_dist, hypo_maker, metric, external_priors_penalty,
method, method_kwargs, local_fit_kwargs
)
return best_fit_info, alternate_fits
[docs]
def nofit_hypo(self, data_dist, hypo_maker, hypo_param_selections,
hypo_asimov_dist, metric, other_metrics=None, blind=False, external_priors_penalty=None):
"""Fitting a hypo to a distribution generated by its own
distribution maker is unnecessary. In such a case, use this method
(instead of `fit_hypo`) to still retrieve meaningful information for
e.g. the match metrics.
Parameters
----------
data_dist : MapSet or List of MapSets
hypo_maker : Detectors or DistributionMaker
hypo_param_selections : None, string, or sequence of strings
hypo_asimov_dist : MapSet or List of MapSets
metric : string or iterable of strings
other_metrics : None, string, or sequence of strings
blind : bool
external_priors_penalty : func
"""
fit_info = HypoFitResult()
# NOTE: Select params but *do not* reset to nominal values to record
# the current (presumably already optimal) param values
hypo_maker.select_params(hypo_param_selections)
if isinstance(metric, str):
metric = [metric]
# Check number of used metrics
if hypo_maker.__class__.__name__ == "Detectors":
if len(metric) == 1: # One metric for all detectors
metric = list(metric) * len(hypo_maker.distribution_makers)
elif len(metric) != len(hypo_maker.distribution_makers):
raise IndexError(f"Number of defined metrics does not match with number of detectors.")
elif isinstance(hypo_asimov_dist, MapSet) or isinstance(hypo_asimov_dist, list):
# DistributionMaker object (list means variable binning)
assert len(metric) == 1, f"Only one metric allowed for DistributionMaker"
else:
raise NotImplementedError(f"hypo_maker returned output of type {type(hypo_asimov_dist)}")
fit_info.metric = metric
# Assess the fit: whether the data came from the hypo_asimov_dist
try:
if hypo_maker.__class__.__name__ == "Detectors":
metric_val = 0
for i in range(len(hypo_maker.distribution_makers)):
data = data_dist[i].metric_total(expected_values=hypo_asimov_dist[i], metric=metric[i])
metric_val += data
priors = hypo_maker.params.priors_penalty(metric=metric[0]) # uses just the "first" metric for prior
metric_val += priors
elif isinstance(data_dist, list): # DistributionMaker object with VarBinning
metric_val = 0
for i in range(len(data_dist)):
metric_val += data_dist[i].metric_total(expected_values=hypo_asimov_dist[i], metric=metric[0])
metric_val += hypo_maker.params.priors_penalty(metric=metric[0])
else: # DistributionMaker object with MultiDimBinning
if 'generalized_poisson_llh' == metric[0]:
raise NotImplementedError(
"generalized_poisson_llh isn't correctly implemented any longer!"
)
# FIXME: these `output_mode` and `force_standard_output` kwargs were removed in
# https://github.com/icecube/pisa/commit/7a4e875aa7bdc52ea64a5270e9808d866d1395f3
hypo_asimov_dist = hypo_maker.get_outputs(return_sum=False, output_mode='binned', force_standard_output=False)
hypo_asimov_dist = merge_mapsets_together(mapset_list=hypo_asimov_dist)
data_dist = data_dist.maps[0] # Extract the map from the MapSet
metric_kwargs = {'empty_bins':hypo_maker.empty_bin_indices}
else:
hypo_asimov_dist = hypo_maker.get_outputs(return_sum=True)
if isinstance(hypo_asimov_dist, HypoFitResult):
hypo_asimov_dist = hypo_asimov_dist['weights']
metric_kwargs = {}
metric_val = (
data_dist.metric_total(expected_values=hypo_asimov_dist,
metric=metric[0], metric_kwargs=metric_kwargs)
+ hypo_maker.params.priors_penalty(metric=metric[0])
)
if external_priors_penalty is not None:
metric_val += external_priors_penalty(hypo_maker=hypo_maker,metric=metric[0])
except Exception as e:
if self.blindness:
logging.error('Minimizer failed')
else :
logging.error(
'Failed when computing metric with free params %s',
hypo_maker.params.free
)
logging.error(str(e))
raise
fit_info.metric_val = metric_val
if self.blindness:
# Okay, if blind analysis is being performed, reset the values so
# the user can't find them in the object
hypo_maker.reset_free()
fit_info.metric_val = ParamSet()
else:
fit_info.metric_val = deepcopy(hypo_maker.params)
if hypo_maker.__class__.__name__ == "Detectors":
fit_info.detailed_metric_info = [fit_info.get_detailed_metric_info(
data_dist=data_dist[i], hypo_asimov_dist=hypo_asimov_dist[i],
params=hypo_maker.distribution_makers[i].params, metric=metric[i],
other_metrics=other_metrics, detector_name=hypo_maker.det_names[i]
) for i in range(len(data_dist))]
elif isinstance(data_dist, list): # DistributionMaker object with VarBinning
fit_info.detailed_metric_info = [fit_info.get_detailed_metric_info(
data_dist=data_dist[i], hypo_asimov_dist=hypo_asimov_dist[i],
params=hypo_maker.params, metric=metric[0], other_metrics=other_metrics,
detector_name=hypo_maker.detector_name
) for i in range(len(data_dist))]
else: # DistributionMaker object with MultiDimBinning
if 'generalized_poisson_llh' == metric[0]:
raise NotImplementedError(
"generalized_poisson_llh isn't correctly implemented any longer!"
)
# FIXME: these `output_mode` and `force_standard_output` kwargs were removed in
# https://github.com/icecube/pisa/commit/7a4e875aa7bdc52ea64a5270e9808d866d1395f3
generalized_poisson_dist = hypo_maker.get_outputs(return_sum=False, force_standard_output=False)
generalized_poisson_dist = merge_mapsets_together(mapset_list=generalized_poisson_dist)
else:
generalized_poisson_dist = None
fit_info.detailed_metric_info = fit_info.get_detailed_metric_info(
data_dist=data_dist, hypo_asimov_dist=hypo_asimov_dist, generalized_poisson_hypo=generalized_poisson_dist,
params=hypo_maker.params, metric=metric[0], other_metrics=other_metrics,
detector_name=hypo_maker.detector_name
)
fit_info.minimizer_time = 0 * ureg.sec
fit_info.num_distributions_generated = 0
fit_info.minimizer_metadata = OrderedDict()
fit_info.hypo_asimov_dist = hypo_asimov_dist
return fit_info
# TODO: move the complexity of defining a scan into a class with various
# factory methods, and just pass that class to the scan method; we will
# surely want to use scanning over parameters in more general ways, too:
# * set (some) fixed params, then run (minimizer, scan, etc.) on free
# params
# * set (some free or fixed) params, then check metric
# where the setting of the params is done for some number of values.
[docs]
def scan(self, data_dist, hypo_maker, metric, hypo_param_selections=None,
param_names=None, steps=None, values=None, only_points=None,
outer=True, profile=True, minimizer_settings=None, outfile=None,
debug_mode=1, **kwargs):
"""Set hypo maker parameters named by `param_names` according to
either values specified by `values` or number of steps specified by
`steps`, and return the `metric` indicating how well the data
distribution is described by each distribution.
Some flexibility in how the user can specify `values` is allowed, based
upon the shapes of `param_names` and `values` and how the `outer` flag
is set.
Either `values` or `steps` must be specified, but not both.
Parameters
----------
data_dist : Sequence of MapSets or MapSet
Data distribution(s). These are what the hypothesis is tasked to
best describe during the optimization/comparison process.
hypo_maker : Detectors, DistributionMaker or instantiable thereto
Generates the expectation distribution under a particular
hypothesis. This typically has (but is not required to have) some
free parameters which will be modified by the minimizer to optimize
the `metric` in case `profile` is set to True.
hypo_param_selections : None, string, or sequence of strings
A pipeline configuration can have param selectors that allow
switching a parameter among two or more values by specifying the
corresponding param selector(s) here. This also allows for a single
instance of a DistributionMaker to generate distributions from
different hypotheses.
metric : string or iterable of strings
The metric to use for optimization/comparison. Note that the
optimized hypothesis also has this metric evaluated and reported for
each of its output maps. Confer `pisa.core.map` for valid metrics.
param_names : None, string, or sequence of strings
If None, assume all parameters are to be scanned; otherwise,
specifies only the name or names of parameters to be scanned.
steps : None, integer, or sequence of integers
Number of steps to take within the allowed range of the parameter
(or parameters). Value(s) specified for `steps` must be >= 2. Note
that the endpoints of the range are always included, and numbers of
steps higher than 2 fill in between the endpoints.
* If integer...
Take this many steps for each specified parameter.
* If sequence of integers...
Take the coresponding number of steps within the allowed range
for each specified parameter.
values : None, scalar, sequence of scalars, or sequence-of-sequences
* If scalar...
Set this value for the (one) param name in `param_names`.
* If sequence of scalars...
* if len(param_names) is 1, set its value to each number in the
sequence.
* otherwise, set each param in param_names to the corresponding
value in `values`. There must be the same number of param names
as values.
* If sequence of (sequences or iterables)...
* Each param name corresponds to one of the inner sequences, in
the order that the param names are specified.
* If `outer` is False, all inner sequences must have the same
length, and there will be one distribution generated for
each set of values across the inner sequences. In other words,
there will be a total of len(inner sequence) distribution generated.
* If `outer` is True, the lengths of inner sequences needn't be
the same. This takes the outer product of the passed sequences
to arrive at the permutations of the parameter values that will
be used to produce the distributions (essentially nested
loops over each parameter). E.g., if two params are scanned,
for each value of the first param's inner sequence, a
distribution is produced for every value of the second param's
inner sequence. In total, there will be
``len(inner seq0) * len(inner seq1) * ...``
distributions produced.
only_points : None, integer, or even-length sequence of integers
Only select subset of points to be analysed by specifying their
range of positions within the whole set (0-indexed, incremental).
For the lazy amongst us...
outer : bool
If set to True and a sequence of sequences is passed for `values`,
the points scanned are the *outer product* of the inner sequences.
See `values` for a more detailed explanation.
profile : bool
If set to True, minimizes specified metric over all free parameters
at each scanned point. Otherwise keeps them at their nominal values
and only performs grid scan of the parameters specified in
`param_names`.
minimizer_settings : dict
Dictionary containing the settings for minimization, which are
only needed if `profile` is set to True. Hint: it has proven useful
to sprinkle with a healthy dose of scepticism.
outfile : string
Outfile to store results to. Will be updated at each scan step to
write out intermediate results to prevent loss of data in case
the apocalypse strikes after all.
debug_mode : int, either one of [0, 1, 2]
If set to 2, will add a wealth of minimisation history and physics
information to the output file. Otherwise, the output will contain
the essentials to perform an analysis (0), or will hopefully be
detailed enough for some simple debugging (1). Any other value for
`debug_mode` will be set to 2.
"""
if debug_mode not in (0, 1, 2):
debug_mode = 2
# Either `steps` or `values` must be specified, but not both (xor)
assert (steps is None) != (values is None)
if isinstance(param_names, str):
param_names = [param_names]
nparams = len(param_names)
hypo_maker.select_params(hypo_param_selections)
if values is not None:
if np.isscalar(values):
values = np.array([values])
assert nparams == 1
for i, val in enumerate(values):
if not np.isscalar(val):
# no scalar here, need a corresponding parameter name
assert nparams >= i+1
else:
# a scalar, can either have only one parameter or at least
# this many
assert nparams == 1 or nparams >= i+1
if nparams > 1:
values[i] = np.array([val])
else:
ranges = [hypo_maker.params[pname].range for pname in param_names]
if np.issubdtype(type(steps), int):
assert steps >= 2
values = [np.linspace(r[0], r[1], steps)*r[0].units
for r in ranges]
else:
assert len(steps) == nparams
assert np.all(np.array(steps) >= 2)
values = [np.linspace(r[0], r[1], steps[i])*r[0].units
for i, r in enumerate(ranges)]
if nparams > 1:
steplist = [[(pname, val) for val in values[i]]
for (i, pname) in enumerate(param_names)]
else:
steplist = [[(param_names[0], val) for val in values[0]]]
#Number of steps must be > 0
assert len(steplist) > 0
points_acc = []
if only_points is not None:
assert len(only_points) == 1 or len(only_points) % 2 == 0
if len(only_points) == 1:
points_acc = only_points
for i in range(0, len(only_points)-1, 2):
points_acc.extend(
list(range(only_points[i], 1 + only_points[i + 1]))
)
# Instead of introducing another multitude of tests above, check here
# whether the lists of steps all have the same length in case `outer`
# is set to False
if nparams > 1 and not outer:
assert np.all(len(steps) == len(steplist[0]) for steps in steplist)
loopfunc = zip
else:
# With single parameter, can use either `zip` or `product`
loopfunc = product
params = hypo_maker.params
# Fix the parameters to be scanned if `profile` is set to True
params.fix(param_names)
results = {'steps': {}, 'results': []}
results['steps'] = {pname: [] for pname in param_names}
for i, pos in enumerate(loopfunc(*steplist)):
if points_acc and i not in points_acc:
continue
msg = ''
for (pname, val) in pos:
params[pname].value = val
results['steps'][pname].append(val)
if isinstance(val, float):
msg += '%s = %.2f '%(pname, val)
elif isinstance(val, ureg.Quantity):
msg += '%s = %.2f '%(pname, val.magnitude)
else:
raise TypeError("val is of type %s which I don't know "
"how to deal with in the output "
"messages."% type(val))
logging.info('Working on point ' + msg)
hypo_maker.update_params(params)
# TODO: consistent treatment of hypo_param_selections and scanning
if not profile or not hypo_maker.params.free:
logging.info('Not optimizing since `profile` set to False or'
' no free parameters found...')
best_fit = self.nofit_hypo(
data_dist=data_dist,
hypo_maker=hypo_maker,
hypo_param_selections=hypo_param_selections,
hypo_asimov_dist=hypo_maker.get_outputs(return_sum=True),
metric=metric,
**{k: v for k,v in kwargs.items() if k not in ["pprint","reset_free","check_octant"]}
)
else:
logging.info('Starting optimization since `profile` requested.')
best_fit, _ = self.fit_hypo(
data_dist=data_dist,
hypo_maker=hypo_maker,
hypo_param_selections=hypo_param_selections,
metric=metric,
minimizer_settings=minimizer_settings,
**kwargs
)
# TODO: serialisation!
for k in best_fit.minimizer_metadata:
if k in ['hess', 'hess_inv']:
logging.debug("deleting %s", k)
del best_fit.minimizer_metadata[k]
best_fit.metric_val = deepcopy(
best_fit.metric_val.serializable_state
)
if isinstance(best_fit.hypo_asimov_dist, Sequence):
best_fit.hypo_asimov_dist = [deepcopy(
best_fit.hypo_asimov_dist[i].serializable_state
) for i in range(len(best_fit.hypo_asimov_dist))]
else:
best_fit.hypo_asimov_dist = deepcopy(
best_fit.hypo_asimov_dist.serializable_state
)
# decide which information to retain based on chosen debug mode
if debug_mode == 0 or debug_mode == 1:
try:
del best_fit['fit_history']
del best_fit.hypo_asimov_dist
except KeyError:
pass
if debug_mode == 0:
# torch the woods!
try:
del best_fit.minimizer_metadata
del best_fit.minimizer_time
except KeyError:
pass
results['results'].append(best_fit)
if outfile is not None:
# store intermediate results
to_file(results, outfile)
return results
def test_basic_analysis(pprint=False):
"""Test recursive fit strategies with BasicAnalysis."""
from pisa.core.distribution_maker import DistributionMaker
from pisa.utils.config_parser import parse_pipeline_config
###### Make Pipeline Configuration #########
# We make a configuration of two pipelines where some, but not all, parameters
# are shared between them. This checks for memory inconsistencies.
config = parse_pipeline_config('settings/pipeline/fast_example.cfg')
config2 = deepcopy(config)
# Remove one stage to remove some parameters from only one pipeline
del config2[("aeff", "aeff")]
dm = DistributionMaker([config, config2])
dm.select_params('nh')
dm.pipelines[0].params["aeff_scale"].value = 1.5
# make data distribution to fit against
data_dist = dm.get_outputs(return_sum=True).fluctuate(
method="poisson", random_state=0
)
#### Test subclassing
# It should be trivial to add a fit method to the BasicAnalysis class and use
# it by passing its name (without the "_fit_" prefix) to the dictionary.
class SubclassedAnalysis(BasicAnalysis):
def _fit_nonsense(
self, data_dist, hypo_maker, metric,
external_priors_penalty, method_kwargs, local_fit_kwargs
):
"""A custom, nonsensical fit method.
This method does nothing except to set theta23 to 42 deg for no reason.
"""
logging.info("Starting nonsense fit (setting theta23 to 42 deg)...")
for pipeline in hypo_maker:
if "theta23" in pipeline.params.free.names:
pipeline.params.theta23.value = 42 * ureg["deg"]
best_fit_info = self.fit_recursively(
data_dist, hypo_maker, metric, external_priors_penalty,
local_fit_kwargs["method"], local_fit_kwargs["method_kwargs"],
local_fit_kwargs["local_fit_kwargs"]
)
return best_fit_info
ana = SubclassedAnalysis()
ana.pprint = pprint
# Test that global optimization with CRS2 is deterministic as long as a seed
# is provided.
fit_nlopt_crs2 = OrderedDict(
method="nlopt",
method_kwargs={
"algorithm": "NLOPT_GN_CRS2_LM",
"ftol_rel": 1e-1,
"ftol_abs": 1e-1,
"population": 5,
"maxeval": 20,
"seed": 0,
},
local_fit_kwargs=None,
)
dm.reset_free()
best_fit_info_seed_0 = ana.fit_recursively(
data_dist,
dm,
"chi2",
None,
**fit_nlopt_crs2
)
logging.info("Best fit params with seed 0:")
logging.info(repr(best_fit_info_seed_0.params.free))
fit_nlopt_crs2["method_kwargs"]["seed"] = 1
dm.reset_free()
best_fit_info_seed_1 = ana.fit_recursively(
data_dist,
dm,
"chi2",
None,
**fit_nlopt_crs2
)
logging.info("Best fit params with seed 1:")
logging.info(repr(best_fit_info_seed_1.params.free))
fit_nlopt_crs2["method_kwargs"]["seed"] = 0
dm.reset_free()
best_fit_info_seed_0_reprod = ana.fit_recursively(
data_dist,
dm,
"chi2",
None,
**fit_nlopt_crs2
)
logging.info("Best fit params with seed 0, reproduced:")
logging.info(repr(best_fit_info_seed_0_reprod.params.free))
assert best_fit_info_seed_0.params == best_fit_info_seed_0_reprod.params
assert not (best_fit_info_seed_0.params == best_fit_info_seed_1.params)
scipy_settings = {
"method": {
"value": "L-BFGS-B",
"desc": "The string to pass to scipy.optimize.minimize so it knows what to use"
},
"options":{
"value": {
"disp" : 0,
"ftol" : 1.0e-1,
"eps" : 1.0e-6,
# we set a very low number of iterations so that this test exits early
# WILL CAUSE WARNINGS SAYING THAT THE OPTIMIZATION FAILED, BUT THAT IS OK!
#"maxiter": 2
},
"desc": {
"disp" : "Set to True to print convergence messages",
"ftol" : "Precision goal for the value of f in the stopping criterion",
"eps" : "Step size used for numerical approximation of the jacobian.",
"maxiter": "Maximum number of iteration"
}
},
}
###### Fit strategy to test ########
# This is a ridiculously complex fit strategy for a simple std. osc. fit
# that no one would use in real life, just to test the fit functions.
# Staged fit in two stages:
# 1. Best of:
# --> local fit using Simplex from NLOPT on a 2x1 grid
# --> local fit using Migrad from Minuit
# 2. --> for each range in deltam31:
# |--> starting from the best fit point found by local fit, shifted to range
# |--> fit with octant reflection, where internal fit is scipy
local_simplex = OrderedDict(
method="nlopt",
method_kwargs={
"algorithm": "NLOPT_LN_NELDERMEAD",
"ftol_rel": 1e-1,
"ftol_abs": 1e-1,
"maxeval": 10,
"initial_step": 0.2 # as a fraction of the total range
},
local_fit_kwargs=None
)
grid_scan = OrderedDict(
method="grid_scan",
method_kwargs={
"grid": {
"deltam31": np.array([3e-3, 5e-3]) * ureg["eV^2"],
"theta23": np.array([30]) * ureg["deg"]
},
"refined_fit": local_simplex
},
local_fit_kwargs=local_simplex
)
local_minuit = OrderedDict(
method="iminuit",
method_kwargs={
"tol": 10,
},
local_fit_kwargs=None
)
local_nonsense_minuit = OrderedDict(
method="nonsense",
method_kwargs=None,
local_fit_kwargs=local_minuit
)
best_of = OrderedDict(
method="best_of",
method_kwargs=None,
local_fit_kwargs=[
local_nonsense_minuit,
grid_scan
]
)
# a standard analysis strategy with an octant flip at 45 deg in theta23
standard_analysis = OrderedDict(
method="fit_octants",
method_kwargs={
"angle": "theta23",
"inflection_point": 45 * ureg.deg,
},
local_fit_kwargs={
"method": "scipy",
"method_kwargs": scipy_settings,
"local_fit_kwargs": None
}
)
# fit different ranges in mass splitting, and to the octant fits in each range
fit_in_ranges = OrderedDict(
method="fit_ranges",
method_kwargs={
"param_name": "deltam31",
"ranges": np.array([[0.001, 0.004], [0.004, 0.007]]) * ureg["eV^2"],
"reset_free": True
},
local_fit_kwargs=standard_analysis
)
# put together the full fit strategy
staged_fit = OrderedDict(
method="staged",
method_kwargs=None,
local_fit_kwargs=[
best_of,
fit_in_ranges
]
)
# changing the parameter values in theta23 such that the fits starts offset from
# the truth
# use this opportunity to test the update_param_values function as well
for p in dm.pipelines:
p.params.deltam31.is_fixed = False
mod_th23 = deepcopy(dm.params.theta23)
mod_th23.range = (0 * ureg.deg, 90 *ureg.deg)
mod_th23.value = 30 * ureg.deg
mod_th23.nominal_value = 30 * ureg.deg
update_param_values(dm, mod_th23, update_nominal_values=True, update_range=True)
# make sure that the parameter values inside the ParamSelector were changed and
# will not be overwritten by a call to `select_params`
mod_params = deepcopy(dm.params)
dm.select_params('nh')
assert mod_params == dm.params
# test alternative input to `update_param_values` where `hypo_maker` is just a
# single Pipeline
for pipeline in dm:
update_param_values(pipeline, mod_th23)
# the call above should just have no effect at all since we set everything to the
# same value
assert mod_params == dm.params
# resetting free parameters should now set theta23 to 30 degrees since that is
# the new nominal value
dm.reset_free()
assert dm.params.theta23.value.m_as("deg") == 30
# store all the original ranges and nominal values
# --> The fits should never return results where these have changed, even though
# they are sometimes changed within them.
original_ranges = dict((p.name, p.range) for p in dm.params)
original_nom_vals = dict((p.name, p.nominal_value) for p in dm.params)
# ACTUALLY RUN THE FIT
best_fit_info = ana.fit_recursively(
data_dist,
dm,
"chi2",
None,
**staged_fit
)
assert dm.params == best_fit_info.params
# there had been problems in the past where the range of the parameter that is
# changed by the octant flip was not reversed properly
for p in dm.params:
msg = f"mismatch in param {p.name}:\n"
msg += f"range is {p.range}, should be {original_ranges[p.name]}\n"
msg += f"nom. value is {p.nominal_value}, should be {original_nom_vals[p.name]}"
if p.range is not None:
assert p.range[0] == original_ranges[p.name][0], msg
assert p.range[0] == original_ranges[p.name][0], msg
if p.nominal_value is not None:
assert p.nominal_value == original_nom_vals[p.name], msg
# Here we make sure that making a param selection doesn't overwrite the fitted
# parameters. The Analysis should have changed the parameters inside the
# ParamSelector.
dm.select_params('nh')
assert dm.params == best_fit_info.params
for p in dm.params:
msg = f"mismatch in param {p.name}:\n"
msg += f"range is {p.range}, should be {original_ranges[p.name]}\n"
msg += f"nom. value is {p.nominal_value}, should be {original_nom_vals[p.name]}"
if p.range is not None:
assert p.range[0] == original_ranges[p.name][0], msg
assert p.range[0] == original_ranges[p.name][0], msg
if p.nominal_value is not None:
assert p.nominal_value == original_nom_vals[p.name], msg
dm.select_params('ih')
dm.select_params('nh')
assert dm.params == best_fit_info.params
for p in dm.params:
msg = f"mismatch in param {p.name}:\n"
msg += f"range is {p.range}, should be {original_ranges[p.name]}\n"
msg += f"nom. value is {p.nominal_value}, should be {original_nom_vals[p.name]}"
if p.range is not None:
assert p.range[0] == original_ranges[p.name][0], msg
assert p.range[0] == original_ranges[p.name][0], msg
if p.nominal_value is not None:
assert p.nominal_value == original_nom_vals[p.name], msg
logging.info('<< PASS : test_basic_analysis >>')
def test_constrained_minimization(pprint=False):
"""Test scipy solvers without or with equality and inequality
constraints. All are run with default options as set by
`set_minimizer_defaults`."""
from pisa.core.distribution_maker import DistributionMaker
config = 'settings/pipeline/fast_example.cfg'
dm = DistributionMaker(config)
data_dist = dm.get_outputs(return_sum=True)
### slsqp test with constraints ###
def slsqp_constr():
ana = BasicAnalysis()
ana.pprint = pprint
min_sett = {
"method": {"value": "slsqp", "desc": ""},
"options": {"value": {}, "desc": {}}
}
min_delta_index = 5e-3
max_aeff_scale = 0.986
t23 = 44.2
# constraint function can be callable or string
constrs_list = [
{'type': 'ineq',
'fun': lambda params: params.delta_index.m_as("dimensionless") - min_delta_index},
{'type': 'ineq',
'fun': 'lambda p: -p.aeff_scale.m_as("dimensionless") + %s' % max_aeff_scale},
{'type': 'eq',
'fun': lambda params: params.theta23.m_as("degree") - t23}
]
min_sett["options"]["value"]["constraints"] = constrs_list
scipy_sett = {"method": "scipy", "method_kwargs": min_sett}
dm.params.theta23.randomize(random_state=1)
bf = ana.fit_recursively(
data_dist=data_dist,
hypo_maker=dm,
metric="chi2",
external_priors_penalty=None,
**scipy_sett
)
assert bf.minimizer_metadata['success']
tol = 1e-5
assert recursiveEquality(bf.params.theta23.m_as('degree'), t23)
assert bf.params.delta_index.m_as('dimensionless') >= min_delta_index - tol
assert bf.params.aeff_scale.m_as('dimensionless') <= max_aeff_scale + tol
### cobyla test with inequality constraints (doesn't support equalities) ###
def cobyla_constr():
ana = BasicAnalysis()
ana.pprint = pprint
min_t23 = 46.
min_aeff_scale = 1.02
constrs_list = [
{'type': 'ineq',
'fun': lambda params: params.theta23.m_as("dimensionless") - min_t23},
{'type': 'ineq',
'fun': lambda params: params.aeff_scale.m_as("dimensionless") - min_aeff_scale}
]
# FIXME: steps out of bounds whenever these are included and whether bounds are also implemented as constraints or not
min_sett = {
"method": {"value": "cobyla", "desc": ""},
"options": {"value": {"constraints": constrs_list}, "desc": {}}
}
scipy_sett = {"method": "scipy", "method_kwargs": min_sett}
dm.params.randomize_free(random_state=1)
bf = ana.fit_recursively(
data_dist=data_dist,
hypo_maker=dm,
metric="chi2",
external_priors_penalty=None,
**scipy_sett
)
assert bf.minimizer_metadata['success']
tol = 1e-5
assert bf.params.theta23.m_as('dimensionless') >= min_t23 - tol
assert bf.params.aeff_scale.m_as('dimensionless') >= min_aeff_scale - tol
### trust-constr test with constraints ###
def trust_constr_constr():
ana = BasicAnalysis()
ana.pprint = pprint
min_delta_index = 5e-3
max_aeff_scale = 0.986
t23 = 44.2
constrs_list = [
{'type': 'ineq',
'fun': lambda params: params.delta_index.m_as("dimensionless") - min_delta_index},
{'type': 'ineq',
'fun': 'lambda p: -p.aeff_scale.m_as("dimensionless") + %s' % max_aeff_scale},
{'type': 'eq',
'fun': lambda params: params.theta23.m_as("degree") - t23}
]
min_sett = {
"method": {"value": "trust-constr", "desc": ""},
"options": {"value": {"constraints": constrs_list}, "desc": {}}
}
scipy_sett = {"method": "scipy", "method_kwargs": min_sett}
dm.params.randomize_free(random_state=5)
bf = ana.fit_recursively(
data_dist=data_dist,
hypo_maker=dm,
metric="chi2",
external_priors_penalty=None,
**scipy_sett
)
assert bf.minimizer_metadata['success']
tol = 1e-5
assert recursiveEquality(bf.params.theta23.m_as('degree'), t23)
assert bf.params.delta_index.m_as('dimensionless') >= min_delta_index - tol
assert bf.params.aeff_scale.m_as('dimensionless') <= max_aeff_scale + tol
### Nelder-Mead test (no constraints supported) ###
def nm_unconstr():
ana = BasicAnalysis()
ana.pprint = pprint
min_sett = {
"method": {"value": "Nelder-Mead", "desc": ""},
"options": {"value": {}, "desc": {}}
}
scipy_sett = {"method": "scipy", "method_kwargs": min_sett}
dm.params.randomize_free(random_state=9)
bf = ana.fit_recursively(
data_dist=data_dist,
hypo_maker=dm,
metric="chi2",
external_priors_penalty=None,
**scipy_sett
)
assert bf.minimizer_metadata['success']
### finally an nlopt solver with constraints ###
def some_nlopt_constr():
ana = BasicAnalysis()
ana.pprint = pprint
min_t23 = 46.
method_kwargs = {
"algorithm": "NLOPT_LN_COBYLA",
"ftol_abs": 1e-3,
"ftol_rel": 1e-3,
"maxeval": 100,
"ineq_constraints": [
lambda params: params.theta23.m_as("degree") - min_t23
]
}
nlopt_sett = {"method": "nlopt", "method_kwargs": method_kwargs}
# fix 2/3 to make it converge faster
fix_params = ["delta_index", "aeff_scale"]
[dm.params.fix(p) for p in fix_params] # pylint: disable=expression-not-assigned
dm.params.randomize_free(random_state=11)
bf = ana.fit_recursively(
data_dist=data_dist,
hypo_maker=dm,
metric="chi2",
external_priors_penalty=None,
**nlopt_sett
)
assert bf.minimizer_metadata['success']
tol = 1e-5
assert bf.params.theta23.m_as('degree') >= min_t23 - tol
# unfix again
[dm.params.unfix(p) for p in fix_params] # pylint: disable=expression-not-assigned
# now run them all
slsqp_constr()
try:
cobyla_constr()
except Exception as e:
# don't fail, just document here
logging.error("Cobyla test with constraints failed: '%s'. This is under investigation..." % str(e))
trust_constr_constr()
nm_unconstr()
some_nlopt_constr()
logging.info('<< PASS : test_constrained_minimization >>')
def test_global_scipy_minimization(pprint=False):
from pisa.core.distribution_maker import DistributionMaker
config = 'settings/pipeline/fast_example.cfg'
dm = DistributionMaker(config)
dm.params.fix("theta23") # make it converge faster
data_dist = dm.get_outputs(return_sum=True)
def run_global_with_supported_local(
global_method_kwargs, constrs_list=None, constrs_test=None
):
ana = BasicAnalysis()
ana.pprint = pprint
assert (constrs_list is None) == (constrs_test is None)
glob_meth = global_method_kwargs["global_method"]
for loc_min in set(SUPPORTED_LOCAL_SCIPY_MINIMIZERS).union(('none',)):
min_sett = {
"method": {"value": loc_min, "desc": ""},
"options": {"value": {}, "desc": {}}
}
if loc_min in MINIMIZERS_ACCEPTING_CONSTRS and constrs_list is not None:
min_sett["options"]["value"]["constraints"] = deepcopy(constrs_list)
glob_sett = {
"method": "scipy",
"method_kwargs": global_method_kwargs,
"local_fit_kwargs": min_sett if loc_min != 'none' else None
}
dm.reset_free()
dm.params.aeff_scale.randomize(random_state=0)
try:
bf = ana.fit_recursively(
data_dist=data_dist,
hypo_maker=dm,
metric="chi2",
external_priors_penalty=None,
**glob_sett
)
if not bf.minimizer_metadata['success']:
raise RuntimeError("Minimizer unsuccessful")
if loc_min in MINIMIZERS_ACCEPTING_CONSTRS and constrs_list is not None:
constrs_test(bf)
except Exception as e:
# don't fail, just document here
logging.error(
f"test of {glob_meth} + {loc_min} failed: '{e}'. This is under investigation..."
)
def run_differential_evolution(constrs_list, constrs_test, polish=True):
ana = BasicAnalysis()
ana.pprint = pprint
assert (constrs_list is None) == (constrs_test is None)
global_method_kwargs = {
"global_method": "differential_evolution",
"options": {
"maxiter": 100,
"tol": 0.1,
"disp": False,
"seed": 0,
"polish": polish,
"constraints": deepcopy(constrs_list)
},
"local_fit_kwargs": None
}
glob_sett = {
"method": "scipy",
"method_kwargs": global_method_kwargs,
}
dm.reset_free()
dm.params.aeff_scale.randomize(random_state=0)
bf = ana.fit_recursively(
data_dist=data_dist,
hypo_maker=dm,
metric="chi2",
external_priors_penalty=None,
**glob_sett
)
if not bf.minimizer_metadata['success']:
raise RuntimeError("Minimizer unsuccessful")
if constrs_test is not None:
constrs_test(bf)
min_delta_index = 5e-3
max_aeff_scale = 0.986
# all solvers that accept constraints accept inequalities
constrs_list = [
{'type': 'ineq',
'fun': lambda params: params.delta_index.m_as("dimensionless") - min_delta_index},
{'type': 'ineq',
'fun': 'lambda p: -p.aeff_scale.m_as("dimensionless") + %s' % max_aeff_scale}
]
def constrs_test(bf):
tol = 1e-5
if (not bf.params.delta_index.m_as('dimensionless') >= min_delta_index - tol or
not bf.params.aeff_scale.m_as('dimensionless') <= max_aeff_scale + tol):
raise ValueError("inequality constraint(s) violated!")
method_kwargs = {
"global_method": "basinhopping",
"options": {
"niter": 3,
"T": 1,
"seed": 0,
"stepsize": 0.1
}
}
# basinhopping w/ constraints
run_global_with_supported_local(method_kwargs, constrs_list, constrs_test)
# basinhopping w/o (skip)
#run_global_with_supported_local(method_kwargs)
method_kwargs = {
"global_method": "dual_annealing",
"options": {
"maxiter": 10,
"seed": 0
}
}
# dual annealing w/ constraints
run_global_with_supported_local(method_kwargs, constrs_list, constrs_test)
# dual annealing w/o (skip)
#run_global_with_supported_local(method_kwargs)
# DE with trust-constr (w/ constraints)
run_differential_evolution(constrs_list, constrs_test)
# DE with l-bfgs-b (w/o constraints) or without polishing
# entirely takes too long:
#run_differential_evolution(None, None, polish=True)
#run_differential_evolution(None, None, polish=False)
def run_global_min_with_constraints_from_file(fit_sett):
from pisa.utils.fileio import from_file
ana = BasicAnalysis()
ana.pprint = pprint
fit_sett = from_file(fit_sett)
dm.params.unfix("theta23")
dm.reset_free()
dm.params.randomize_free(random_state=0)
bf = ana.fit_recursively(
data_dist=data_dist,
hypo_maker=dm,
metric="chi2",
external_priors_penalty=None,
**fit_sett
)
if not bf.minimizer_metadata['success']:
raise RuntimeError("Minimizer unsuccessful")
#run_global_min_with_constraints_from_file('settings/minimizer/de_2nd_octant_popsize_10_init_sobol_polish_tol1e-1_maxiter1000.json')
logging.info('<< PASS : test_global_scipy_minimization >>')
if __name__ == "__main__":
set_verbosity(1)
test_basic_analysis(pprint=True)
test_constrained_minimization(pprint=True)
test_global_scipy_minimization(pprint=True)