"""
Tools for working with hypersurfaces, which are continuous functions in N-D
with arbitrary functional forms.
Hypersurfaces can be used to model systematic uncertainties derived from discrete
simulation datasets, for example for detedctor uncertainties.
Note: to execute this module as a script, due to the relative import below, run
`python -m pisa.utils.hypersurface.hypersurface`
"""
__all__ = ['Hypersurface', 'HypersurfaceParam', 'fit_hypersurfaces',
'load_hypersurfaces']
__author__ = 'T. Stuttard, A. Trettin'
__license__ = '''Copyright (c) 2014-2017, The IceCube Collaboration
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.'''
import os
import copy
# Handle change over time in `collections` module
from collections import OrderedDict
from collections.abc import Mapping, Sequence # Required as of py3.10
import numpy as np
from iminuit import Minuit
from pisa import FTYPE, ureg
from pisa.utils.jsons import from_json, to_json
from pisa.core.pipeline import Pipeline
from pisa.core.binning import OneDimBinning, MultiDimBinning, is_binning
from pisa.core.map import Map
from pisa.core.param import Param, ParamSet
from pisa.utils.resources import find_resource
from pisa.utils.fileio import mkdir
from pisa.utils.log import logging, set_verbosity
from pisa.utils.comparisons import ALLCLOSE_KW
from uncertainties import ufloat, correlated_values
from uncertainties import unumpy as unp
from .hypersurface_plotting import plot_bin_fits, plot_bin_fits_2d
'''
Hypersurface functional forms
Define functional forms for HypersurfaceParam instances here.
Functions defined here MUST:
- Support numba guvectorization.
- Function arguments must observed this convention:
`p`, `<coefficient 0>`, ..., `<coefficient N>`, `out` where `p` is the
systematic parameter, `out is the array to write the results to, and there are
N coefficients of the parameterisation.
The format of these arguments depends on the use case, of which there are two:
- When fitting the function coefficients. This is done bin-wise using multiple
datasets.
- Params are then: `p` is array (one value per dataset), coefficients and `out`
are scalar (representing a single bin).
- Evaluating a fitted hypersurface. This is done for all bins simultaneously, using
a single value for p.
- Params are then: `p` is scalar (current value of systematic parameter,
coefficients and `out` are arrays representing the hypersurfaces of all bins
per bin.
'''
class linear_hypersurface_func(object):
'''
Linear hypersurface functional form
f(p) = m * p
'''
def __init__(self):
self.nargs = 1
def __call__(self, p, m, out):
result = m * p
np.copyto(src=result, dst=out)
def grad(self, p, m, out):
# because m itself is not in the actual calculation, we have to broadcast
# manually to yield the same shape as if we had done m*p and added one axis
foo = m*p
result = np.broadcast_to(p, foo.shape)[..., np.newaxis]
np.copyto(src=result, dst=out)
class quadratic_hypersurface_func(object):
'''
Quadratic hypersurface functional form
f(p) = m1*p + m2*p**2
'''
def __init__(self):
self.nargs = 2
def __call__(self, p, m1, m2, out):
result = m1*p + m2*p**2
np.copyto(src=result, dst=out)
# the gradient *must* have all these arguments, even if they are un-used!
def grad(self, p, m1, m2, out):
# because m itself is not in the actual calculation, we have to broadcast
# manually to yield the same shape as if we had done m*p and stacked on the last
# axis
foo = m1*p
result = np.stack([np.broadcast_to(p, foo.shape),
np.broadcast_to(p**2, foo.shape)],
axis=-1
)
np.copyto(src=result, dst=out)
class exponential_hypersurface_func(object):
'''
Exponential hypersurface functional form
f(p) = exp(b*p) - 1
The functional form ensures that it is zero at the nominal point.
'''
def __init__(self):
self.nargs = 1
def __call__(self, p, b, out):
result = np.exp(b*p) - 1.
np.copyto(src=result, dst=out)
def grad(self, p, b, out):
# because parameters and coefficients both appear, everything is broadcast
# automatically
result = np.array([p*np.exp(b*p)])[..., np.newaxis]
np.copyto(src=result, dst=out)
class scaled_exponential_hypersurface_func(object):
'''
Exponential hypersurface functional form
f(p) = (a + 1) * (exp(b*p) - 1)
The functional form is chosen such that it is zero at the nominal point.
If a strong prior is imposed on a, it becomes equivalent to the un-scaled
exponential hypersurface function.
'''
def __init__(self):
self.nargs = 2
def __call__(self, p, a, b, out):
result = (a + 1.) * (np.exp(b*p) - 1.)
np.copyto(src=result, dst=out)
def grad(self, p, a, b, out):
# because parameters and coefficients both appear, everything is broadcast
# automatically
result = np.stack([np.exp(b*p) - 1., (a + 1.)*p*np.exp(b*p)], axis=-1)
np.copyto(src=result, dst=out)
class logarithmic_hypersurface_func(object):
'''
Logarithmic hypersurface functional form
f(p) = log(1 + mp)
Allows the fit of an effectively linear multiplicative
function while in logmode, since:
exp(log(1 + mp) + h) = (1 + mp) exp(h)
'''
def __init__(self):
self.nargs = 1
def __call__(self, p, m, out):
result = np.log(1 + m*p)
np.copyto(src=result, dst=out)
def grad(self, p, m, out):
# because parameters and coefficients both appear, everything is broadcast
# automatically
result = np.array(p/(1 + m*p))[..., np.newaxis]
np.copyto(src=result, dst=out)
# Container holding all possible functions
HYPERSURFACE_PARAM_FUNCTIONS = OrderedDict()
HYPERSURFACE_PARAM_FUNCTIONS["linear"] = linear_hypersurface_func
HYPERSURFACE_PARAM_FUNCTIONS["quadratic"] = quadratic_hypersurface_func
HYPERSURFACE_PARAM_FUNCTIONS["exponential"] = exponential_hypersurface_func
HYPERSURFACE_PARAM_FUNCTIONS["exponential_scaled"] = scaled_exponential_hypersurface_func
HYPERSURFACE_PARAM_FUNCTIONS["logarithmic"] = logarithmic_hypersurface_func
[docs]
class Hypersurface(object):
'''
A class defining the hypersurface
Contains :
- A single common intercept
- N systematic parameters, inside which the functional form is defined
This class can be configured to hold both the functional form of the hypersurface
and values (likely fitted from simulation datasets) for the free parameters of this
functional form.
Fitting functionality is provided to fit these free parameters.
This class can simultaneously hold hypersurfaces for every bin in a histogram (Map).
The functional form of the systematic parameters can be arbitrarily complex.
The class has a fit method for fitting the hypersurface to some data (e.g.
discrete systematics sets).
Serialization functionality is included to allow fitted hypersurfaces to be stored
to a file and re-loaded later (e.g. to be used in analysis).
The main use cases are:
1) Fit hypersurfaces
- Define the desired HypersurfaceParams (functional form, intial coefficient guesses).
- Instantiate the `Hypersurface` class, providing the hypersurface params and initial intercept guess.
- Use `Hypersurface.fit` function (or more likely the `fit_hypersurfaces` helper function provided below),
to fit the hypersurface coefficients to some provided datasets.
- Store to file
2) Evaluate an existing hypersurface
- Load existing fitted Hypersurface from a file (`load_hypersurfaces` helper function)
- Get the resulting hypersurface value for each bin for a given set of systemaic param
values using the `Hypersurface.evaluate` method.
- Use the hypersurface value for each bin to re-weight events
The class stores information about the datasets used to fit the hypersurfaces, including the Maps
used and nominal and systematic parameter values.
Parameters
----------
params : list
A list of HypersurfaceParam instances defining the hypersurface.
The `initial_fit_coeffts` values in this instances will be used as the starting
point for any fits.
initial_intercept : float
Starting point for the hypersurface intercept in any fits
log : bool, optional
Set hypersurface to log mode. The surface is fit to the log of the bin counts.
The fitted surface is exponentiated during evaluation. Default: False
'''
def __init__(self, params, initial_intercept=None, log=False):
# Store args
self.initial_intercept = initial_intercept
# Store params as dict for ease of lookup
self.params = OrderedDict()
for param in params:
assert param.name not in self.params, "Duplicate param name found : %s" % param.name
self.params[param.name] = param
self.log = log
# Internal state
self._initialized = False
# Containers for storing fitting information
self.fit_complete = False
self.fit_info_stored = False
self.fit_maps_norm = None
self.fit_maps_smooth = None
self.fit_maps_raw = None
self.fit_chi2 = None
self.fit_cov_mat = None
self.fit_method = None
# Also add option store the pipeline param values used to generate the
# maps that are the inouts to the fits for these hypersurfaces. They are
# not actually used in the fit and so this variable is generally `None`,
# but a user can set them externally during the fitting process so that
# they can be stored for for future reference
self.fit_pipeline_param_values = None
# Serialization
self._serializable_state = None
# Legacy handling
self.using_legacy_data = False
def _init(self, binning, nominal_param_values):
'''
Actually initialise the hypersurface.
Internal function, not to be called by a user.
'''
#
# Binning
#
# Store the binning
self.binning = binning
# Set a default initial intercept value if none provided
if self.initial_intercept is None:
self.initial_intercept = 0. if self.log else 1.
# Create the fit coefficient arrays
# Have one fit per bin
self.intercept = np.full(
self.binning.shape, self.initial_intercept, dtype=FTYPE)
self.intercept_sigma = np.full_like(self.intercept, np.nan)
for param in list(self.params.values()):
param._init_fit_coefft_arrays(self.binning)
#
# Nominal values
#
# Store the nominal param values
# TODO better checks, including not already set
for param in list(self.params.values()):
param.nominal_value = nominal_param_values[param.name]
#
# Done
#
self._initialized = True
@property
def initialized(self):
'''
Return flag indicating if hypersurface has been initialized
Not giving use direct write-access to the variable as they should nt be setting it themselves
'''
return self._initialized
@property
def param_names(self):
'''
Return the (ordered) names of the systematic parameters
'''
return list(self.params.keys())
[docs]
def evaluate(self, param_values, bin_idx=None, return_uncertainty=False):
'''
Evaluate the hypersurface, using the systematic parameter values provided.
Uses the current internal values for all functional form coefficients.
Parameters
----------
param_values : dict
A dict specifying the values of the systematic parameters to use in the evaluation.
Format is :
{ sys_param_name_0 : sys_param_0_val, ..., sys_param_name_N : sys_param_N_val }.
The keys must be string and correspond to the HypersurfaceParam instances.
The values must be scalars.
bin_idx : tuple or None
Optionally can specify a particular bin (using numpy indexing). d
Othewise will evaluate all bins.
return_uncertainty : bool, optional
return the uncertainty on the output (default: False)
'''
assert self._initialized, "Cannot evaluate hypersurface, it haas not been initialized"
#
# Check inputs
#
# Determine number of sys param values (per sys param)
# This will be >1 when fitting, and == 1 when evaluating the hypersurface within the stage
num_param_values = np.asarray(list(param_values.values())[0]).size
# Check same number of values for all sys params
for k, v in list(param_values.items()):
n = np.asarray(v).size
assert n == num_param_values, "All sys params must have the same number of values"
# Determine whether using single bin or not
single_bin_mode = bin_idx is not None
#
# Prepare output array
#
# Determine shape of output array
# Two possible cases, with limitations on both based on how the sys param functional forms are defined
if not single_bin_mode:
# Case 1 : Calculating for all bins simultaneously (e.g. `bin_idx is None`)
# Only support a single scalar value for each systematic parameters
# Use case is evaluating the hypersurfaces during the hypersurface stage
assert num_param_values == 1, "Can only provide one value per sys param when evaluating all bins simultaneously"
for v in list(param_values.values()):
assert np.isscalar(
v), "sys param values must be a scalar when evaluating all bins simultaneously"
out_shape = self.binning.shape
bin_idx = Ellipsis
else:
# Case 2 : Calculating for multiple sys param values, but only a single bin
# Use case is fitting the hypersurfaces fucntional form fit params
out_shape = (num_param_values,)
# Create the output array
out = np.full(out_shape, np.nan, dtype=FTYPE)
#
# Evaluate the hypersurface
#
# Start with the intercept
for i in range(num_param_values):
if single_bin_mode:
out[i] = self.intercept[bin_idx]
else:
np.copyto(src=self.intercept[bin_idx], dst=out[bin_idx])
# Evaluate each individual parameter
for k, p in list(self.params.items()):
param_val = param_values[k] if self.using_legacy_data else param_values[k] - p.nominal_value
p.evaluate(param_val, out=out, bin_idx=bin_idx)
output_factors = np.exp(out) if self.log else out
if return_uncertainty:
# create buffer array for the gradients
n_coeffs = 1 # start with 1 because intercept is an additional coefficient
for param in list(self.params.values()):
n_coeffs += param.num_fit_coeffts
gradient_buffer = np.full(
out_shape + (n_coeffs,), np.nan, dtype=FTYPE)
# Start with the intercept, its gradient is always 1
gradient_buffer[..., 0] = 1.
# Evaluate gradient each individual parameter and store in buffer.
i = 1 # start at one because the intercept was already treated
for k, p in list(self.params.items()):
gbuf = np.full(out_shape + (p.num_fit_coeffts,),
np.nan, dtype=FTYPE)
param_val = param_values[k] if self.using_legacy_data else param_values[k] - p.nominal_value
p.gradient(param_val, out=gbuf, bin_idx=bin_idx)
for j in range(p.num_fit_coeffts):
gradient_buffer[..., i] = gbuf[..., j]
i += 1
# In log-mode, the output is exponentiated. For the gradient this simply means multiplying
# with the output itself.
if self.log:
gradient_buffer = output_factors[...,
np.newaxis]*gradient_buffer
# Calculate uncertainty from gradients and covariance matrix
transformed_jacobian = np.einsum(
'...j,...kj->...k', gradient_buffer, self.fit_cov_mat[bin_idx])
variance = np.einsum(
'...j,...j', transformed_jacobian, gradient_buffer)
assert np.all(variance[np.isfinite(variance)] >= 0.), "invalid covariance"
if return_uncertainty:
return output_factors, np.sqrt(variance)
else:
return output_factors
[docs]
def fit(self, nominal_map, nominal_param_values, sys_maps, sys_param_values,
norm=True, method="L-BFGS-B", fix_intercept=False, intercept_bounds=None,
intercept_sigma=None, include_empty=False, keep_maps=True, ref_bin_idx=None,
smooth_method=None, smooth_kw=None):
'''
Fit the hypersurface coefficients (in every bin) to best match the provided
nominal and systematic datasets.
Writes the results directly into this data structure.
Parameters
----------
nominal_map : Map
Map from the nominal dataset
nominal_param_values : dict
Value of each systematic param used to generate the nominal dataset
Format: { param_0_name : param_0_nom_val, ..., param_N_name : param_N_nom_val }
sys_maps : list of Maps
List containing the Map from each systematic dataset
sys_param_values : list of dicts
List where each element if a dict containing the values of each systematic
param used to generate the that dataset Each list element specified the
parameters for the corresponding element in `sys_maps`
norm : bool
Normalise the maps to the nominal map. This is what you want to do when
using the hypersurface to re-weight simulation (which is the main use case).
In principal the hypersurfaces are more general though and could be used for
other tasks too, hence this option.
method : str
`method` arg to pass to `scipy.optimize.minimiza`
fix_intercept : bool
Fix intercept to the initial intercept.
intercept_bounds : 2-tuple, optional
Bounds on the intercept. Default is None (no bounds)
include_empty : bool
Include empty bins in the fit. If True, empty bins are included with value 0
and sigma 1.
Default: False
keep_maps : bool
Keep maps used to make the fit. If False, maps will be set to None after
the fit is complete. This helps to reduce the size of JSONS if the
Hypersurface is to be stored on disk.
ref_bin_idx : tuple
An index specifying a reference bin that will be used for logging
'''
#
# Check inputs
#
# Check nominal dataset definition
assert isinstance(nominal_map, Map)
assert isinstance(nominal_param_values, Mapping)
assert set(nominal_param_values.keys()) == set(self.param_names), f"Params mismatch : {set(nominal_param_values.keys())} != {set(self.param_names)}"
assert all([isinstance(k, str) for k in nominal_param_values.keys()])
assert all([np.isscalar(v) for v in nominal_param_values.values()])
# Check systematic dataset definitions
assert isinstance(sys_maps, Sequence)
assert isinstance(sys_param_values, Sequence)
assert len(sys_maps) == len(sys_param_values)
for sys_map, sys_param_vals in zip(sys_maps, sys_param_values):
assert isinstance(sys_map, Map)
assert isinstance(sys_param_vals, Mapping)
msg = f"self.param_names: {self.param_names}\n sys_param_vals.keys(): {sys_param_vals.keys()}"
assert set(sys_param_vals.keys()) == set(self.param_names), msg
assert all([isinstance(k, str) for k in sys_param_vals.keys()])
assert all([np.isscalar(v) for v in sys_param_vals.values()])
assert sys_map.binning == nominal_map.binning
assert not (
include_empty and self.log), "empty bins cannot be included in log mode"
#
# Format things before getting started
#
# Store the fitting method
self.fit_method = method
# Store smoothing info
self.smooth_method = smooth_method
self.smooth_kw = smooth_kw
# Initialise hypersurface using nominal dataset
self._init(binning=nominal_map.binning,
nominal_param_values=nominal_param_values)
# Combine nominal and sys sets
maps = [nominal_map] + sys_maps
param_values = [nominal_param_values] + sys_param_values
# Store raw maps
self.fit_maps_raw = maps
self.fit_info_stored = True
# Convert params values from `list of dicts` to `dict of lists`
param_values_dict = {name: np.array([p[name] for p in param_values])
for name in list(param_values[0].keys())}
# Save the param values used for fitting in the param objects (useful for plotting later)
for name, values in list(param_values_dict.items()):
self.params[name].fit_param_values = values
# Format the fit `x` values : [ [sys param 0 values], [sys param 1 values], ... ]
# Order of the params must match the order in `self.params`
x = np.asarray([param_values_dict[param_name]
for param_name in list(self.params.keys())], dtype=FTYPE)
# Prepare covariance matrix array
self.fit_cov_mat = np.full(
list(self.binning.shape)+[self.num_fit_coeffts, self.num_fit_coeffts], np.nan)
#
# Smoothing
#
self.fit_maps_smooth = None
if self.smooth_method is not None :
raise Exception("Hypersurface smoothing needs some fixing")
fit_maps_smooth = []
if self.smooth_method == "gaussian_filter" :
#
# Perform Gaussian filtering on the input maps
#
#TODO REMOVE
#TODO REMOVE
#TODO REMOVE
#TODO REMOVE
#TODO REMOVE
print(">>>> STARTED gaussian_filter SMOOTHING")
#TODO REMOVE
#TODO REMOVE
#TODO REMOVE
#TODO REMOVE
if self.smooth_kw is None :
self.smooth_kw = {}
# Treating each PID bin individually as a 2D hist (E, coszen)
#TODO Make more general
#TODO Can smooth in 3 dims here if desired with gaussian_filter (I think...)
split_dim = "pid"
assert split_dim in self.binning
# Loop over maps and apply filter
for m in self.fit_maps :
fit_maps_smooth.append( m.gaussian_filter(split_dim=split_dim, **self.smooth_kw) )
else :
raise Exception(f"Unknown smooting method : {self.smooth_method}")
# Store
self.fit_maps_smooth = fit_maps_smooth
#
# Normalisation
#
# All map values are finite, but if have empty bins the nominal map will end up
# with inf bins in the normalised map (divide by zero). Use a mask to handle
# this.
finite_mask = nominal_map.nominal_values != 0
# Also include any binning mask in the finite mask (since these bin will be NaN)
if self.binning.mask is not None :
finite_mask = finite_mask & self.binning.mask
# Normalise bin values, if requested
if norm:
# Normalise the maps by dividing the nominal map This means the hypersurface
# results can be interpretted as a re-weighting factor, relative to the
# nominal
# Formalise, handling inf values
fit_maps_norm = []
for m in self.fit_maps:
norm_m = copy.deepcopy(m)
norm_m.hist[finite_mask] = norm_m.hist[finite_mask] / \
unp.nominal_values(nominal_map.hist[finite_mask])
norm_m.hist[~finite_mask] = ufloat(np.nan, np.nan)
fit_maps_norm.append(norm_m)
self.fit_maps_norm = fit_maps_norm
#
# Some final checks
#
# Not expecting any bins to have negative values (negative counts doesn't make
# sense)
# TODO hypersurface in general could consider -ve values (not explicitly
# tied to histograms), so maybe can relax this constraint
for m in self.fit_maps:
assert np.all(m.nominal_values[finite_mask]
>= 0.), "Found negative bin counts"
#
# Loop over bins
#
for bin_idx in np.ndindex(self.binning.shape): # TODO grab from input map
# Check if this bin is masked
if (self.binning.mask is not None) and (self.binning.mask[bin_idx] == False) :
logging.debug("Skipping masked bin {bin_idx}")
p0_intercept = self.intercept[bin_idx]
p0_param_coeffts = [param.get_fit_coefft(bin_idx=bin_idx, coefft_idx=i_cft)
for param in list(self.params.values())
for i_cft in range(param.num_fit_coeffts)]
if fix_intercept:
p0 = np.array(p0_param_coeffts, dtype=FTYPE)
else:
p0 = np.array([p0_intercept] + p0_param_coeffts, dtype=FTYPE)
# Not fitting, add empty variables
popt = np.full_like(p0, np.nan)
pcov = np.nan
else :
# Otherwise proceed to fitting...
#
# Format this bin's data for fitting
#
# Format the fit `y` values : [ bin value 0, bin_value 1, ... ]
# Also get the corresonding uncertainty
y = np.asarray([m.nominal_values[bin_idx]
for m in self.fit_maps], dtype=FTYPE)
y_sigma = np.asarray([m.std_devs[bin_idx]
for m in self.fit_maps], dtype=FTYPE)
# Create a mask for keeping all these points
# May remove some points before fitting if find issues
scan_point_mask = np.ones(y.shape, dtype=bool)
# Cases where we have a y_sigma element = 0 (normally because the
# corresponding y element = 0) screw up the fits (least squares divides by
# sigma, so get infs) By default, we ignore empty bins. If the user wishes
# to include them, it can be done with a value of zero and standard
# deviation of 1.
bad_sigma_mask = y_sigma == 0.
if bad_sigma_mask.sum() > 0:
if include_empty:
y_sigma[bad_sigma_mask] = 1.
else:
scan_point_mask = scan_point_mask & ~bad_sigma_mask
# Apply the mask to get the values I will actually use
x_to_use = np.array([xx[scan_point_mask] for xx in x])
y_to_use = y[scan_point_mask]
y_sigma_to_use = y_sigma[scan_point_mask]
# Checks
assert x_to_use.shape[0] == len(self.params)
assert x_to_use.shape[1] == y_to_use.size
# Get flat list of the fit param guesses
# The param coefficients are ordered as [ param 0 cft 0, ..., param 0 cft N,
# ..., param M cft 0, ..., param M cft N ]
p0_intercept = self.intercept[bin_idx]
p0_param_coeffts = [param.get_fit_coefft(bin_idx=bin_idx, coefft_idx=i_cft)
for param in list(self.params.values())
for i_cft in range(param.num_fit_coeffts)]
if fix_intercept:
p0 = np.array(p0_param_coeffts, dtype=FTYPE)
else:
p0 = np.array([p0_intercept] + p0_param_coeffts, dtype=FTYPE)
#
# Check if have valid data in this bin
#
# If have empty bins, cannot fit In particular, if the nominal map has an
# empty bin, it cannot be rescaled (x * 0 = 0) If this case, no need to try
# fitting
# Check if have NaNs/Infs
if np.any(~np.isfinite(y_to_use)): # TODO also handle missing sigma
# Not fitting, add empty variables
popt = np.full_like(p0, np.nan)
pcov = np.nan
# Otherwise, fit...
else:
#
# Fit
#
# Must have at least as many sets as free params in fit or else curve_fit will fail
assert y.size >= p0.size, "Number of datasets used for fitting (%i) must be >= num free params (%i)" % (
y.size, p0.size)
# Define a callback function for use with `curve_fit`
# x : sys params
# p : func/shape params
def callback(x, *p):
# Note that this is using the dynamic variable `bin_idx`, which
# cannot be passed as an arg as `curve_fit` cannot handle fixed
# parameters.
#
# Unflatten list of the func/shape params, and write them to the
# hypersurface structure
self.intercept[bin_idx] = self.initial_intercept if fix_intercept else p[0]
i = 0 if fix_intercept else 1
for param in list(self.params.values()):
for j in range(param.num_fit_coeffts):
bin_fit_idx = tuple(list(bin_idx) + [j])
param.fit_coeffts[bin_fit_idx] = p[i]
i += 1
# Unflatten sys param values
params_unflattened = OrderedDict()
for i in range(len(self.params)):
param_name = list(self.params.keys())[i]
params_unflattened[param_name] = x[i]
return self.evaluate(params_unflattened, bin_idx=bin_idx)
inv_param_sigma = []
if intercept_sigma is not None:
inv_param_sigma.append(1./intercept_sigma)
else:
inv_param_sigma.append(0.)
for param in list(self.params.values()):
if param.coeff_prior_sigma is not None:
for j in range(param.num_fit_coeffts):
inv_param_sigma.append(
1./param.coeff_prior_sigma[j])
else:
for j in range(param.num_fit_coeffts):
inv_param_sigma.append(0.)
inv_param_sigma = np.array(inv_param_sigma)
assert np.all(np.isfinite(
inv_param_sigma)), "invalid values found in prior sigma. They must not be zero."
# coefficient names to pass to Minuit. Not strictly necessary
coeff_names = [] if fix_intercept else ['intercept']
for name, param in self.params.items():
for j in range(param.num_fit_coeffts):
coeff_names.append(name + '_p{:d}'.format(j))
def loss(p):
'''
Loss to be minimized during the fit.
'''
fvals = callback(x_to_use, *p)
return np.sum(((fvals - y_to_use)/y_sigma_to_use)**2) + np.sum((inv_param_sigma*p)**2)
# Define fit bounds for `minimize`. Bounds are pairs of (min, max)
# values for each parameter in the fit. Use 'None' in place of min/max
# if there is
# no bound in that direction.
fit_bounds = []
if fix_intercept:
logging.debug("fixed intercept needs no bounds")
elif intercept_bounds is None:
fit_bounds.append(tuple([None, None]))
else:
assert (len(intercept_bounds) == 2) and (
np.ndim(intercept_bounds) == 1), "intercept bounds must be given as 2-tuple"
fit_bounds.append(intercept_bounds)
for param in self.params.values():
if param.bounds is None:
fit_bounds.extend(
((None, None),)*param.num_fit_coeffts)
else:
if np.ndim(param.bounds) == 1:
assert len(
param.bounds) == 2, "bounds on single coefficients must be given as 2-tuples"
fit_bounds.append(param.bounds)
elif np.ndim(param.bounds) == 2:
assert np.all([len(t) == 2 for t in param.bounds]
), "bounds must be given as a tuple of 2-tuples"
fit_bounds.extend(param.bounds)
# Define the EPS (step length) used by the fitter Need to take care with
# floating type precision, don't want to go smaller than the FTYPE being
# used by PISA can handle
eps = np.finfo(FTYPE).eps
# If no reference bin index was specified, used the first bin index to be fitted
if ref_bin_idx is None :
ref_bin_idx = bin_idx
# Debug logging
if bin_idx == ref_bin_idx:
msg = ">>>>>>>>>>>>>>>>>>>>>>>\n"
msg += "Curve fit inputs to bin %s :\n" % (bin_idx,)
msg += " x : \n%s\n" % x
msg += " y : \n%s\n" % y
msg += " y sigma : \n%s\n" % y_sigma
msg += " x used : \n%s\n" % x_to_use
msg += " y used : \n%s\n" % y_to_use
msg += " y sigma used: \n%s\n" % y_sigma_to_use
msg += " p0 : %s\n" % p0
msg += " bounds : \n%s\n" % fit_bounds
msg += " inv sigma : \n%s\n" % inv_param_sigma
msg += " fit method : %s\n" % self.fit_method
msg += "<<<<<<<<<<<<<<<<<<<<<<<"
logging.debug(msg)
# Perform fit
# errordef =1 for least squares fit and 0.5 for nllh fit
m = Minuit(loss, p0,
# only initial step size, not very important
# error=(0.1)*len(p0),
# limit=fit_bounds,
name=coeff_names)
m.errors = (0.1) * len(p0)
m.limits = fit_bounds
m.errordef = Minuit.LEAST_SQUARES
m.migrad()
m.hesse()
popt = np.array(m.values)
try:
pcov = np.atleast_1d(np.array(m.covariance))
except:
logging.warn(f"HESSE call failed for bin {bin_idx}, covariance matrix unavailable")
pcov = np.full((len(p0), len(p0)), np.nan)
if bin_idx == ref_bin_idx:
logging.debug(m.fmin)
logging.debug(m.params)
logging.debug(m.covariance)
#
# Re-format fit results
#
# Use covariance matrix to get uncertainty in fit parameters Using
# uncertainties.correlated_values, and will extract the std dev (including
# correlations) shortly Fit may fail to determine covariance matrix
# (method-dependent), so only do this if have a finite covariance matrix
corr_vals = correlated_values(popt, pcov) if np.all(
np.isfinite(pcov)) else None
# Write the fitted param results (and sigma, if available) back to the
# hypersurface structure
i = 0
if not fix_intercept:
self.intercept[bin_idx] = popt[i]
self.intercept_sigma[bin_idx] = np.nan if corr_vals is None else corr_vals[i].std_dev
i += 1
for param in list(self.params.values()):
for j in range(param.num_fit_coeffts):
idx = param.get_fit_coefft_idx(
bin_idx=bin_idx, coefft_idx=j)
param.fit_coeffts[idx] = popt[i]
param.fit_coeffts_sigma[idx] = np.nan if corr_vals is None else corr_vals[i].std_dev
i += 1
# Store the covariance matrix
if fix_intercept and np.all(np.isfinite(pcov)):
self.fit_cov_mat[bin_idx] = np.pad(pcov, ((1, 0), (1, 0)))
else:
self.fit_cov_mat[bin_idx] = pcov
#
# chi2
#
# Compare the result of the fitted hypersurface function with the actual data
# points used for fitting Compute the resulting chi2 to have an estimate of the
# fit quality
self.fit_chi2 = []
# Loop over datasets
for i_set in range(self.num_fit_sets):
# Get expected bin values according tohypersurface value
predicted = self.evaluate(
{name: values[i_set] for name, values in list(param_values_dict.items())})
# Get the observed value
observed = self.fit_maps[i_set].nominal_values
sigma = self.fit_maps[i_set].std_devs
# we have to apply the same condition on which values we include
# as we did during the fit above
with np.errstate(invalid='ignore'):
valid_idx = sigma > 0. # can be NaN
if include_empty:
sigma[~valid_idx] = 1.
# Compute chi2
with np.errstate(divide='ignore'):
chi2 = ((predicted - observed) / sigma) ** 2
# Add to container
self.fit_chi2.append(chi2)
# Combine into single array
self.fit_chi2 = np.stack(self.fit_chi2, axis=-1).astype(FTYPE)
# Drop input maps if not keeping them
if not keep_maps:
self.fit_maps_raw = None
self.fit_maps_smooth = None
self.fit_maps_norm = None
self.fit_info_stored = False
# Record some provenance info about the fits
self.fit_complete = True
@property
def nominal_values(self):
'''
Return the stored nominal parameter for each dataset
Returns: { param_0_name : param_0_nom_val, ..., param_N_name : param_N_nom_val }
'''
assert self.fit_info_stored, "Cannot get fit dataset nominal values, fit info not stored%s" % (
" (using legacy data)" if self.using_legacy_data else "")
return OrderedDict([(name, param.nominal_value) for name, param in list(self.params.items())])
@property
def fit_param_values(self):
'''
Return the stored systematic parameters from the datasets used for fitting
Returns: { param_0_name : [ param_0_sys_val_0, ..., param_0_sys_val_M ], ..., param_N_name : [ param_N_sys_val_0, ..., param_N_sys_val_M ] }
'''
assert self.fit_info_stored, "Cannot get fit dataset param values, fit info not stored%s" % (
" (using legacy data)" if self.using_legacy_data else "")
return OrderedDict([(name, param.fit_param_values) for name, param in list(self.params.items())])
[docs]
def get_nominal_mask(self):
'''
Return a mask indicating which datasets have nominal values for all parameters
'''
assert self.fit_info_stored, "Cannot get nominal mask, fit info not stored%s" % (
" (using legacy data)" if self.using_legacy_data else "")
nom_mask = np.ones((self.num_fit_sets,), dtype=bool)
for param in list(self.params.values()):
nom_mask = nom_mask & np.isclose(
param.fit_param_values, param.nominal_value)
return nom_mask
[docs]
def get_on_axis_mask(self, param_name):
'''
Return a mask indicating which datasets are "on-axis" for a given parameter.
"On-axis" means "generated using the nominal value for this parameter". Parameters other
than the one specified can have non-nominal values.
Parameters
----------
param_name : str
The name of systematic parameter for which we want on-axis datasets
'''
assert self.fit_info_stored, "Cannot get on-axis mask, fit info not stored%s" % (
" (using legacy data)" if self.using_legacy_data else "")
assert param_name in self.param_names
on_axis_mask = np.ones((self.num_fit_sets,), dtype=bool)
# Loop over sys params
for param in list(self.params.values()):
# Ignore the chosen param
if param.name != param_name:
# Define a "nominal" mask
on_axis_mask = on_axis_mask & np.isclose(
param.fit_param_values, param.nominal_value)
return on_axis_mask
[docs]
def report(self, bin_idx=None):
'''
Return a string version of the hypersurface contents
Parameters
----------
bin_idx : tuple of None
Specify a particular bin (using numpy indexing). In this case only report on that bin.
'''
msg = ""
# Fit results
msg += ">>>>>> Fit coefficients >>>>>>" + "\n"
bin_indices = np.ndindex(
self.binning.shape) if bin_idx is None else [bin_idx]
for bin_idx in bin_indices:
msg += " Bin %s :" % (bin_idx,) + "\n"
msg += " Intercept : %0.5g" % (self.intercept[bin_idx],) + "\n"
for param in list(self.params.values()):
msg += " %s : %s" % (param.name, ", ".join(["%0.5g" % param.get_fit_coefft(
bin_idx=bin_idx, coefft_idx=cft_idx) for cft_idx in range(param.num_fit_coeffts)])) + "\n"
msg += "<<<<<< Fit coefficients <<<<<<" + "\n"
return msg
def __str__(self):
return self.report()
@property
def fit_maps(self):
'''
Return the `Map instances used for fitting
These will be normalised if the fit was performend to normalised maps.
'''
assert self.fit_info_stored, "Cannot get fit maps, fit info not stored%s" % (
" (using legacy data)" if self.using_legacy_data else "")
# Return whatever the final processed map type was during the fitting process
if self.fit_maps_norm is not None :
return self.fit_maps_norm
elif self.fit_maps_smooth is not None :
return self.fit_maps_smooth
elif self.fit_maps_raw is not None :
return self.fit_maps_raw
else :
raise Exception("Cannot find fit maps")
@property
def num_fit_sets(self):
'''
Return number of datasets used for fitting
'''
assert self.fit_info_stored, "Cannot get fit sets, fit info not stored%s" % (
" (using legacy data)" if self.using_legacy_data else "")
return len(list(self.fit_param_values.values())[0])
@property
def num_fit_coeffts(self):
'''
Return the total number of coefficients in the hypersurface fit
This is the overall intercept, plus the coefficients for each individual param
'''
return int(1 + np.sum([param.num_fit_coeffts for param in list(self.params.values())]))
@property
def fit_coeffts(self):
'''
Return all coefficients, in all bins, as a single array
This is the overall intercept, plus the coefficients for each individual param
Dimensions are: [binning ..., fit coeffts]
'''
array = [self.intercept]
for param in list(self.params.values()):
for i in range(param.num_fit_coeffts):
array.append(param.get_fit_coefft(coefft_idx=i))
array = np.stack(array, axis=-1)
return array
@fit_coeffts.setter
def fit_coeffts(self, fit_coeffts):
'''
Setter to conveniently set the coefficients in the parameters
of the hypersurface in the same order in which they are also
returned by the getter.
'''
assert fit_coeffts.shape == self.fit_coeffts.shape, "incorrect shape of coefficients"
self.intercept = fit_coeffts[..., 0]
n = 1
for param in self.params.values():
for i in range(param.num_fit_coeffts):
idx = param.get_fit_coefft_idx(coefft_idx=i)
param.fit_coeffts[idx] = fit_coeffts[..., n]
n += 1
@property
def fit_coefft_labels(self):
'''
Return labels for each fit coefficient
'''
return ["intercept"] + ["%s p%i" % (param.name, i) for param in list(self.params.values()) for i in range(param.num_fit_coeffts)]
@property
def serializable_state(self):
"""
OrderedDict containing savable state attributes
"""
if self._serializable_state is None: # TODO always redo?
state = OrderedDict()
state["_initialized"] = self._initialized
state["binning"] = self.binning.serializable_state
state["initial_intercept"] = self.initial_intercept
state["log"] = self.log
state["intercept"] = self.intercept
state["intercept_sigma"] = self.intercept_sigma
state["fit_complete"] = self.fit_complete
state["fit_info_stored"] = self.fit_info_stored
state["fit_maps_norm"] = self.fit_maps_norm
state["fit_maps_smooth"] = self.fit_maps_smooth
state["fit_maps_raw"] = self.fit_maps_raw
state["fit_chi2"] = self.fit_chi2
state["fit_cov_mat"] = self.fit_cov_mat
state["fit_method"] = self.fit_method
state["fit_pipeline_param_values"] = self.fit_pipeline_param_values
state["using_legacy_data"] = self.using_legacy_data
state["params"] = OrderedDict()
for name, param in list(self.params.items()):
state["params"][name] = param.serializable_state
self._serializable_state = state
return self._serializable_state
[docs]
@classmethod
def from_state(cls, state):
"""
Instantiate a new object from the contents of a serialized state dict
Parameters
----------
resource : dict
A dict
See Also
--------
to_json
"""
#
# Get the state
#
# If it is not already a a state, alternativey try to load it in case a JSON
# file was passed
if not isinstance(state, Mapping):
state = from_json(state)
#
# Create params
#
params = []
# Loop through params in the state
params_state = state.pop("params")
for param_name, param_state in list(params_state.items()):
param = HypersurfaceParam.from_state(param_state)
params.append(param)
#
# Create hypersurface
#
# Instantiate
hypersurface = cls(
params=params,
initial_intercept=state.pop("initial_intercept"),
)
# Add binning
hypersurface.binning = MultiDimBinning(**state.pop("binning"))
# Add maps
fit_maps_raw = state.pop("fit_maps_raw")
hypersurface.fit_maps_raw = None if fit_maps_raw is None else [
Map(**map_state) for map_state in fit_maps_raw]
fit_maps_norm = state.pop("fit_maps_norm")
hypersurface.fit_maps_norm = None if fit_maps_norm is None else [
Map(**map_state) for map_state in fit_maps_norm]
fit_maps_smooth = state.pop("fit_maps_smooth") if "fit_maps_smooth" in state else None # Backwards compatibility
hypersurface.fit_maps_smooth = None if fit_maps_smooth is None else [
Map(**map_state) for map_state in fit_maps_smooth]
# Define rest of state
for k in list(state.keys()):
setattr(hypersurface, k, state.pop(k))
return hypersurface
[docs]
def fluctuate(self, random_state=None) :
'''
Return a new hypersurface object whose coefficients have been randomly fluctuated according
to the fit covariance matrix.
Used for testing the impact of statistical uncertainty in the hypersurfaces fits on
downstream analyses.
'''
#TODO uncorrelated fluctuation option
# Init random state
if random_state is None :
random_state = np.random.RandomState(12345) #TODO use PISA functions for this
# Create a copy of this instance
new_hypersurface = copy.deepcopy(self) #TODO Use serialized state instead?
# Loop over bins
for bin_idx in np.ndindex(self.binning.shape):
# Skip if this bin has no fits
if np.all(np.isfinite(self.fit_coeffts[bin_idx])) :
# Perform multivariate random sampling from the covariance matrix
# This gives new coefficients, which are written to the output hyersurface instance
new_fit_coeffts = random_state.multivariate_normal(self.fit_coeffts[bin_idx], self.fit_cov_mat[bin_idx])
# Set the values in the output hypersurface
new_hypersurface.intercept[bin_idx] = new_fit_coeffts[0]
n = 1
for param in new_hypersurface.params.values():
for i in range(param.num_fit_coeffts):
idx = param.get_fit_coefft_idx(bin_idx=bin_idx, coefft_idx=i)
param.fit_coeffts[idx] = new_fit_coeffts[n]
n += 1
return new_hypersurface
[docs]
class HypersurfaceParam(object):
'''
A class representing one of the parameters (and corresponding functional forms) in
the hypersurface.
A user creates the initial instances of thse params, before passing the to the
Hypersurface instance. Once this has happened, the user typically does not need to
directly interact woth these HypersurfaceParam instances.
Parameters
----------
name : str
Name of the parameter
func_name : str
Name of the hypersurface function to use.
See "Hypersurface functional forms" section for more details, including
available functions.
initial_fit_coeffts : array
Initial values for the coefficients of the functional form
Number and meaning of coefficients depends on functional form
bounds : 2-tuple of array_like, optional
Lower and upper bounds on independent variables. Defaults to no bounds. Each
element of the tuple must be either an array with the length equal to the number
of parameters, or a scalar (in which case the bound is taken to be the same for
all parameters.) Use ``np.inf`` with an appropriate sign to disable bounds on
all or some parameters.
coeff_prior_sigma : array, optional
Prior sigma values for the coefficients. If None (default), no regularization
will be applied during the fit.
'''
def __init__(self, name, func_name, initial_fit_coeffts=None, bounds=None, coeff_prior_sigma=None):
# Store basic members
self.name = name
# Handle functional form fit parameters
self.fit_coeffts = None # Fit params container, not yet populated
self.fit_coeffts_sigma = None # Fit param sigma container, not yet populated
# The initial values for the fit parameters
self.initial_fit_coeffts = initial_fit_coeffts
self.bounds = bounds
self.coeff_prior_sigma = coeff_prior_sigma
# Record information relating to the fitting
self.fitted = False # Flag indicating whether fit has been performed
# The values of this sys param in each of the fitting datasets
self.fit_param_values = None
# Placeholder for nominal value
self.nominal_value = None
# Serialization
self._serializable_state = None
self.binning_shape = None # initialized when used in Hypersurface
#
# Init the functional form
#
# Get the function
self.func_name = func_name
self._hypersurface_func = self._get_hypersurface_func(self.func_name)
# Get the number of functional form parameters
self.num_fit_coeffts = self._hypersurface_func.nargs
if self.coeff_prior_sigma is not None:
assert len(
self.coeff_prior_sigma) == self.num_fit_coeffts, "number of prior sigma values must equal the number of parameters."
# Check and init the fit param initial values
# TODO Add support for "per bin" initial values
if initial_fit_coeffts is None:
# No values provided, use 0 for all
self.initial_fit_coeffts = np.zeros(
self.num_fit_coeffts, dtype=FTYPE)
else:
# Use the provided initial values
self.initial_fit_coeffts = np.array(self.initial_fit_coeffts)
assert self.initial_fit_coeffts.size == self.num_fit_coeffts, "'initial_fit_coeffts' should have %i values, found %i" % (
self.num_fit_coeffts, self.initial_fit_coeffts.size)
def _get_hypersurface_func(self, func_name):
'''
Find the function defining the hypersurface functional form.
User specifies this by it's string name, which must correspond to a pre-defined
function in `HYPERSURFACE_PARAM_FUNCTIONS`.
Internal function, not to be called by a user.
'''
assert isinstance(func_name, str), "'func_name' must be a string"
assert func_name in HYPERSURFACE_PARAM_FUNCTIONS, "Cannot find hypersurface function '%s', choose from %s" % (
func_name, list(HYPERSURFACE_PARAM_FUNCTIONS.keys()))
return HYPERSURFACE_PARAM_FUNCTIONS[func_name]()
def _init_fit_coefft_arrays(self, binning):
'''
Create the arrays for storing the fit parameters
Have one fit per bin, for each parameter
The shape of the `self.fit_coeffts` arrays is: (binning shape ..., num fit params )
Internal function, not to be called by a user.
'''
arrays = []
self.binning_shape = binning.shape
for fit_coefft_initial_value in self.initial_fit_coeffts:
fit_coefft_array = np.full(
self.binning_shape, fit_coefft_initial_value, dtype=FTYPE)
arrays.append(fit_coefft_array)
self.fit_coeffts = np.stack(arrays, axis=-1)
self.fit_coeffts_sigma = np.full_like(self.fit_coeffts, np.nan)
[docs]
def evaluate(self, param, out, bin_idx=None):
'''
Evaluate the functional form for the given `param` values.
Uses the current values of the fit coefficients.
By default evaluates all bins, but optionally can specify a particular bin (used
when fitting).
'''
# Create an array to fill with this contribution
this_out = np.full_like(out, np.nan, dtype=FTYPE)
# Form the arguments to pass to the functional form
# Need to be flexible in terms of the number of fit parameters
args = [param]
for cft_idx in range(self.num_fit_coeffts):
args += [self.get_fit_coefft(bin_idx=bin_idx, coefft_idx=cft_idx)]
args += [this_out]
# Call the function
self._hypersurface_func(*args)
# Add to overall hypersurface result
out += this_out
[docs]
def gradient(self, param, out, bin_idx=None):
'''
Evaluate gradient of the functional form for the given `param` values.
Uses the current values of the fit coefficients.
By default evaluates all bins, but optionally can specify a particular bin (used when fitting).
'''
# Create an array to fill with the gradient
this_out = np.full_like(out, np.nan, dtype=FTYPE)
# Form the arguments to pass to the functional form
# Need to be flexible in terms of the number of fit parameters
args = [param]
for cft_idx in range(self.num_fit_coeffts):
args += [self.get_fit_coefft(bin_idx=bin_idx, coefft_idx=cft_idx)]
args += [this_out]
# Call the function
self._hypersurface_func.grad(*args)
# Copy to wherever the gradient is to be stored
np.copyto(src=this_out, dst=out)
[docs]
def get_fit_coefft_idx(self, bin_idx=None, coefft_idx=None):
'''
Indexing the fit_coefft matrix is a bit of a pain
This helper function eases things
'''
# TODO can probably do this more cleverly with numpy indexing, but works for now...
# Indexing based on the bin
if (bin_idx is Ellipsis) or (bin_idx is None):
idx = [Ellipsis]
else:
idx = list(bin_idx)
# Indexing based on the coefficent
if isinstance(coefft_idx, slice):
idx.append(coefft_idx)
elif coefft_idx is None:
idx.append(slice(0, -1))
else:
idx.append(coefft_idx)
# Put it all together
idx = tuple(idx)
return idx
[docs]
def get_fit_coefft(self, *args, **kwargs):
'''
Get a fit coefficient values from the matrix
Basically just wrapping the indexing function
'''
idx = self.get_fit_coefft_idx(*args, **kwargs)
return self.fit_coeffts[idx]
@property
def serializable_state(self):
"""
OrderedDict containing savable state attributes
"""
if self._serializable_state is None: # TODO always redo?
state = OrderedDict()
state["name"] = self.name
state["func_name"] = self.func_name
state["num_fit_coeffts"] = self.num_fit_coeffts
state["fit_coeffts"] = self.fit_coeffts
state["fit_coeffts_sigma"] = self.fit_coeffts_sigma
state["initial_fit_coeffts"] = self.initial_fit_coeffts
state["fitted"] = self.fitted
state["fit_param_values"] = self.fit_param_values
state["binning_shape"] = self.binning_shape
state["nominal_value"] = self.nominal_value
state["bounds"] = self.bounds
state["coeff_prior_sigma"] = self.coeff_prior_sigma
self._serializable_state = state
return self._serializable_state
[docs]
@classmethod
def from_state(cls, state):
# Define param init kwargs
# Special handling for `coeff_prior_sigma`, which was missing in older
# files (due to a bug in `serializable_state`) so need to handle this
# for backwards compatibility
param_init_kw = dict(
name=state.pop("name"),
func_name=state.pop("func_name"),
initial_fit_coeffts=state.pop("initial_fit_coeffts"),
bounds=state.pop("bounds"),
)
if "coeff_prior_sigma" in state :
param_init_kw["coeff_prior_sigma"] = state.pop("coeff_prior_sigma")
else :
param_init_kw["coeff_prior_sigma"] = None
# Create the param
param = cls(**param_init_kw)
# Define rest of state
for k in list(state.keys()):
setattr(param, k, state.pop(k))
return param
'''
Hypersurface fitting and loading helper functions
'''
def get_hypersurface_file_name(hypersurface, tag):
'''
Create a descriptive file name
'''
num_dims = len(hypersurface.params)
param_str = "_".join(hypersurface.param_names)
output_file = "%s__hypersurface_fits__%dd__%s.json" % (
tag, num_dims, param_str)
return output_file
[docs]
def fit_hypersurfaces(nominal_dataset, sys_datasets, params, output_dir, tag, combine_regex=None,
log=True, minimum_mc=0, minimum_weight=0, **hypersurface_fit_kw):
'''
A helper function that a user can use to fit hypersurfaces to a bunch of simulation
datasets, and save the results to a file. Basically a wrapper of Hypersurface.fit,
handling common pre-fitting tasks like producing mapsets from piplelines, merging
maps from similar specifies, etc.
Note that this supports fitting multiple hypersurfaces to the datasets, e.g. one per
simulated species. Returns a dict with format: { map_0_key : map_0_hypersurface,
..., map_N_key : map_N_hypersurface, }
Parameters
----------
nominal_dataset : dict
Definition of the nominal dataset. Specifies the pipleline with which the maps
can be created, and the values of all systematic parameters used to produced the
dataset.
Format must be:
nominal_dataset = {
"pipeline_cfg" = <pipeline cfg file (either cfg file path or dict)>),
"sys_params" = { param_0_name : param_0_value_in_dataset, ..., param_N_name : param_N_value_in_dataset }
}
Sys params must correspond to the provided HypersurfaceParam instances provided
in the `params` arg.
sys_datasets : list of dicts
List of dicts, where each dict defines one of the systematics datasets to be
fitted. The format of each dict is the same as explained for `nominal_dataset`
params : list of HypersurfaceParams
List of HypersurfaceParams instances that define the hypersurface. Note that
this defined ALL hypersurfaces fitted in this function, e.g. only supports a
single parameterisation for all maps (this is almost almost what you want).
output_dir : str
Path to directly to write results file in
tag : str
A string identifier that will be included in the file name to help you make
sense of the file in the future. Note that additional information on the
contents will be added to the file name by this function.
combine_regex : list of str, or None
List of string regex expressions that will be used for merging maps. Used to
combine similar species. Must be something that can be passed to the
`MapSet.combine_re` function (see that functions docs for more details). Choose
`None` is do not want to perform this merging.
minimum_mc : int, optional
Minimum number of unweighted MC events required in each bin. If the number
of unweighted MC events in a bin in any MC set is less than this number, the
value is set to exactly zero and will be excluded from the fit.
minimum_weight : float, optional
Minimum weight per bin. Bins with a total summed weight of less than this
number are excluded from the fit. Intended use is to exclude extremely small
values from KDE histograms that would pull the fit to zero.
hypersurface_fit_kw : kwargs
kwargs will be passed on to the calls to `Hypersurface.fit`
'''
# TODO Current yneed to manually ensure consistency between `combine_regex` here and
# the `links` param in `hypersurface` Need to make `hypersurface` directly use
# the value of `combine_regex` from the Hypersurface instance
#
# Make copies
#
# Take (deep) copies of lists/dicts to avoid modifying the originals
# Useful for cases where this function is called in a loop (e.g. leave-one-out tests)
nominal_dataset = copy.deepcopy(nominal_dataset)
sys_datasets = copy.deepcopy(sys_datasets)
params = copy.deepcopy(params)
#
# Check inputs
#
# Check types
assert isinstance(sys_datasets, Sequence)
assert isinstance(params, Sequence)
assert isinstance(output_dir, str)
assert isinstance(tag, str)
# Check formatting of datasets is as expected
all_datasets = [nominal_dataset] + sys_datasets
for dataset in all_datasets:
assert isinstance(dataset, Mapping)
assert "pipeline_cfg" in dataset
assert isinstance(dataset["pipeline_cfg"], (str, Mapping))
assert "sys_params" in dataset
assert isinstance(dataset["sys_params"], Mapping)
# Check params
assert len(params) >= 1
for p in params:
assert isinstance(p, HypersurfaceParam)
# Report inputs
msg = "Hypersurface fit details :"
msg += " Num params : %i" % len(params)
msg += " Num fit coefficients : %i" % sum(
[p.num_fit_coeffts for p in params])
msg += " Num datasets : 1 nominal + %i systematics" % len(
sys_datasets)
msg += " Nominal values : %s" % nominal_dataset["sys_params"]
logging.info(msg)
#
# Generate MapSets
#
def find_hist_stage(pipeline):
"""Locate the index of the hist stage in a pipeline."""
hist_idx_found = False
kde_idx_found = False
for i, s in enumerate(pipeline.stages):
if s.__class__.__name__ == "hist":
hist_idx = i
hist_idx_found = True
break
if s.__class__.__name__ == "kde":
hist_idx = i
kde_idx_found = True
break
if not hist_idx_found and not kde_idx_found:
raise RuntimeError("Could not find hist or kde stage in pipeline, aborting.")
return hist_idx, kde_idx_found
# Get maps and param values from nominal pipeline
nominal_pipeline = Pipeline(nominal_dataset["pipeline_cfg"])
logging.info("Nominal pipeline parameters:\n" + repr(nominal_pipeline.params))
pipeline_param_values = { p.name:p.value for p in nominal_pipeline.params }
nominal_dataset["mapset"] = nominal_pipeline.get_outputs() # return_sum=False)
# get the un-weighted event counts as well so that we can exclude bins
# with too little statistics
# First, find out which stage is the hist stage
hist_idx, is_kde = find_hist_stage(nominal_pipeline)
# minimum MC is only applicable to hist stage, not to KDE
if not is_kde:
nominal_pipeline.stages[hist_idx].unweighted = True
nominal_dataset["mapset_unweighted"] = nominal_pipeline.get_outputs()
else:
nominal_dataset["mapset_unweighted"] = None
# Bootstrapping is required to calculate errors on the histograms
assert nominal_pipeline.stages[hist_idx].bootstrap, (
"Hypersurfaces can only be fit to KDE histograms if bootstrapping is enabled."
)
del nominal_pipeline # Save memory
# Loop over sys datasets and grap the maps from them too
# Also make sure the pipeline params match the nominal pipeline (only the input file should differ between them)
for sys_dataset in sys_datasets:
sys_pipeline = Pipeline(sys_dataset["pipeline_cfg"])
for param in sys_pipeline.params :
assert param.value == pipeline_param_values[param.name], "Mismatch in pipeline param '%s' value between nominal and systematic pipelines : %s != %s" % (param.name, param.value, pipeline_param_values[param.name])
sys_dataset["mapset"] = sys_pipeline.get_outputs() # return_sum=False)
# get the un-weighted event counts as well so that we can exclude bins
# with too little statistics
# First, find out which stage is the hist stage
hist_idx, is_kde = find_hist_stage(sys_pipeline)
if not is_kde:
sys_pipeline.stages[hist_idx].unweighted = True
sys_dataset["mapset_unweighted"] = sys_pipeline.get_outputs()
else:
sys_dataset["mapset_unweighted"] = None
assert sys_pipeline.stages[hist_idx].bootstrap, (
"Hypersurfaces can only be fit to KDE histograms if bootstrapping is "
"enabled."
)
del sys_pipeline
# Merge maps according to the combine regex, if one was provided
if combine_regex is not None:
nominal_dataset["mapset"] = nominal_dataset["mapset"].combine_re(combine_regex)
if nominal_dataset["mapset_unweighted"] is not None:
nominal_dataset["mapset_unweighted"] = (
nominal_dataset["mapset_unweighted"].combine_re(combine_regex)
)
for sys_dataset in sys_datasets:
sys_dataset["mapset"] = sys_dataset["mapset"].combine_re(combine_regex)
if sys_dataset["mapset_unweighted"] is None: continue
sys_dataset["mapset_unweighted"] = (
sys_dataset["mapset_unweighted"].combine_re(combine_regex)
)
# Remove bins (i.e. set their count to zero) that have too few MC events or too little
# total weight
for dataset in sys_datasets + [nominal_dataset]:
for map_name in dataset["mapset"].names:
if dataset["mapset_unweighted"] is not None:
insuff_mc = dataset["mapset_unweighted"][map_name].nominal_values < minimum_mc
else:
insuff_mc = np.zeros(dataset["mapset"][map_name].nominal_values.shape, dtype=bool)
insuff_weight = dataset["mapset"][map_name].nominal_values < minimum_weight
# Setting the hist to zero sets both nominal value and std_dev to zero
dataset["mapset"][map_name].hist[insuff_mc | insuff_weight] = 0.
# TODO check every mapset has the same elements
#
# Loop over maps
#
# Create the container to fill
hypersurfaces = OrderedDict()
# Loop over maps
for map_name in nominal_dataset["mapset"].names:
#
# Prepare data for fit
#
nominal_map = nominal_dataset["mapset"][map_name]
nominal_param_values = nominal_dataset["sys_params"]
sys_maps = [sys_dataset["mapset"][map_name]
for sys_dataset in sys_datasets]
sys_param_values = [sys_dataset["sys_params"]
for sys_dataset in sys_datasets]
#
# Fit the hypersurface
#
# Create the hypersurface
hypersurface = Hypersurface(
params=copy.deepcopy(params), # Need the deepcopy, as want one set of params per map
initial_intercept=0. if log else 1., # Initial value for intercept
log=log
)
# Perform fit
hypersurface.fit(
nominal_map=nominal_map,
nominal_param_values=nominal_param_values,
sys_maps=sys_maps,
sys_param_values=sys_param_values,
norm=True,
**hypersurface_fit_kw
)
# Record the pipeline params used to generate the maps used for
# the fits, for data provenance purposes only
hypersurface.fit_pipeline_param_values = pipeline_param_values
# Report the results
logging.debug("\nFitted hypersurface report:\n%s" % hypersurface)
# Store for later write to disk
hypersurfaces[map_name] = hypersurface
#
# Store results
#
# Create a file name
output_path = os.path.join(output_dir, get_hypersurface_file_name(
list(hypersurfaces.values())[0], tag))
# Create the output directory
mkdir(output_dir)
# Write to a json file
to_json(hypersurfaces, output_path)
logging.info("Fit results written : %s" % output_path)
return output_path
[docs]
def load_hypersurfaces(input_file, expected_binning=None):
'''
User function to load file containing hypersurface fits, as written using `fit_hypersurfaces`.
Can be multiple hypersurfaces assosicated with different maps.
Returns a dict with the format: { map_0_key : map_0_hypersurface, ..., map_N_key : map_N_hypersurface, }
Handling the following input files cases:
1) Load files produced using this code (recommended)
2) Load files producing using older versions of PISA
3) Load public data releases csv formatted files
Parameters
----------
input_file : str
Path to the file contsaining the hypersurface fits.
For the special case of the datareleases these needs to be the path to all
relevent CSV fles, e.g. "<path/to/datarelease>/hyperplanes_*.csv".
expected_binning : One/MultiDimBinning
(Optional) Expected binning for hypersurface.
It will checked enforced that this mathes the binning found in the parsed
hypersurfaces. For certain legacy cases where binning info is not stored, this
will be assumed to be the actual binning.
'''
#
# Check inputs
#
assert isinstance(input_file, str)
if expected_binning is not None:
assert is_binning(expected_binning)
#
# PISA hypersurface files
#
logging.info(f"Loading non-interpolated hypersurfaces from file: {input_file}")
hypersurfaces = None
if input_file.endswith("json") or input_file.endswith("json.bz2"):
# Load file
input_data = from_json(input_file)
assert isinstance(input_data, Mapping)
logging.info(f"Reading file complete, generating hypersurfaces...")
# Testing various cases to support older files as well as modern ones...
if "sys_list" in input_data:
# Legacy case, create a modern hypersurface instance using old hyperplane fits
hypersurfaces = _load_hypersurfaces_legacy(input_data)
logging.warn("Old fit files detected, loaded via legacy mode")
else:
# Otherwise assume file is using the modern format
hypersurfaces = OrderedDict()
for map_name, hypersurface_state in list(input_data.items()):
hypersurfaces[map_name] = Hypersurface.from_state(
hypersurface_state)
#
# Public data release file
#
elif input_file.endswith("csv") or input_file.endswith("csv.bz2"):
hypersurfaces = _load_hypersurfaces_data_release(
input_file, expected_binning)
#
# Done
#
else:
raise Exception("Unknown file format : %s" % input_file)
# Check binning
if expected_binning is not None:
for hypersurface in hypersurfaces.values():
if not hypersurface.binning.hash == expected_binning.hash:
for a, b, in zip(hypersurface.binning.dims, expected_binning.dims):
assert a == b, "Incompatible binning dimension %s and %s"%(a, b)
logging.info(f"Generated hypersurfaces")
return hypersurfaces
def _load_hypersurfaces_legacy(input_data):
'''
Load an old hyperplane (not surface) fit file from older PISA version.
Put the results into an instance the new `Hypersurface` class so can use the
resulting hypersurface in modern code.
User should not use this directly, instead call `load_hypersurfaces`.
'''
hypersurfaces = OrderedDict()
#
# Loop over map names
#
for map_name in input_data["map_names"]:
#
# Create the params
#
# Get the param names
param_names = input_data["sys_list"]
# Create the param instances.
# Using linear functional forms (legacy files only supported linear forms, e.g.
# hyperplanes rather than surfaces).
params = [HypersurfaceParam(
name=name, func_name="linear", initial_fit_coeffts=None, ) for name in param_names]
#
# Get binning
#
# This varies depending on how old the file is...
# Note that the hypersurface class really only needs to know the binning
# shape (to create the coefficient arrays).
# If the (serialized version of the) binning is stored, great! Use it
if "binning" in input_data:
binning = MultiDimBinning(**input_data["binning"])
# If no binning is available, can at least get the correct shape (using
# one of the map arrays) and create a dummy binning instance.
# Remember that the final dimension is the sys params, not binning
else:
# Remove last dimension
binning_shape = input_data[map_name][..., 0].shape
binning = MultiDimBinning([OneDimBinning(name="dummy_%i" % i, domain=[
0., 1.], is_lin=True, num_bins=dim) for i, dim in enumerate(binning_shape)])
#
# Create the hypersurface instance
#
# Create the hypersurface
hypersurface = Hypersurface(
params=params, # Specify the systematic parameters
initial_intercept=1., # Intercept value (or first guess for fit)
)
# Set some internal members that would normally be configured during fitting
# Don't know the nominal values with legacy files, so just stores NaNs
hypersurface._init(
binning=binning,
nominal_param_values={
name: np.nan for name in hypersurface.param_names},
)
# Indicate this is legacy data (not all functionality will work)
hypersurface.using_legacy_data = True
#
# Get the fit values
#
# Handling two different legacy cases here...
fitted_coefficients = input_data["hyperplanes"][map_name][
"fit_params"] if "hyperplanes" in input_data else input_data[map_name]
# Fitted coefficients have following array shape: [ binning dim 0, ..., binning dim N, sys params (inc. intercept) ]
intercept_values = fitted_coefficients[..., 0]
sys_param_gradient_values = {
n: fitted_coefficients[..., i+1] for i, n in enumerate(param_names)}
# Write the values to the hypersurface
np.copyto(src=intercept_values, dst=hypersurface.intercept)
for param in hypersurface.params.values():
np.copyto(
src=sys_param_gradient_values[param.name], dst=param.fit_coeffts[..., 0])
# Done, store the hypersurface
hypersurfaces[map_name] = hypersurface
return hypersurfaces
def _load_hypersurfaces_data_release(input_file_prototype, binning):
'''
Load the hypersurface CSV files from an official IceCube data release
User should not use this directly, instead call `load_hypersurfaces`.
'''
# TODO Current only handles DRAGON (analysis B) data release (as was also the case for
# the older hyperplane code)
# TODO Would need to add support for muon hypersurface (including non-linear params)
# as well as a different binning
import pandas as pd
hypersurfaces = OrderedDict()
#
# Check inputs
#
assert binning is not None, "Must provide binning when loading data release hypersurfaces"
#
# Load CSV files
#
fit_results = {}
fit_results['nue_cc+nuebar_cc'] = pd.read_csv(find_resource(
input_file_prototype.replace('*', 'nue_cc')))
fit_results['numu_cc+numubar_cc'] = pd.read_csv(find_resource(
input_file_prototype.replace('*', 'numu_cc')))
fit_results['nutau_cc+nutaubar_cc'] = pd.read_csv(find_resource(
input_file_prototype.replace('*', 'nutau_cc')))
fit_results['nu_nc+nubar_nc'] = pd.read_csv(find_resource(
input_file_prototype.replace('*', 'all_nc')))
#
# Get hyperplane info
#
param_names = None
for map_name, map_fit_results in fit_results.items():
#
# Get hypersurface params
#
# Remove the bin info from the data frame (only want hyperplane params)
# Check that find the same dimensions as the expected binning
# TODO Also check bin centers are within expected bins
for n in binning.names:
midpoints_found = np.unique(map_fit_results.pop(n).values)
assert midpoints_found.size == binning[n].num_bins, "Mismatch between expected and actual binning dimensions"
# Also extract the special case of the offset
offset = map_fit_results.pop("offset")
# Get the param names (everything remaining is a hypersurface param)
if param_names is None:
param_names = map_fit_results.columns.tolist()
else:
assert param_names == map_fit_results.columns.tolist(
), "Mismatch between hypersurface params in different files"
# Create the params
params = [HypersurfaceParam(
name=name, func_name="linear", initial_fit_coeffts=None, ) for name in param_names]
#
# Create the hypersurface instance
#
# Create the hypersurface
hypersurface = Hypersurface(
params=params, # Specify the systematic parameters
initial_intercept=1., # Intercept value (or first guess for fit)
)
# Set some internal members that would normally be configured during fitting
# Don't know the nominal values with legacy files, so just stores NaNs
hypersurface._init(
binning=binning,
nominal_param_values={
name: np.nan for name in hypersurface.param_names},
)
# Indicate this is legacy data (not all functionality will work)
hypersurface.using_legacy_data = True
#
# Get the fit values
#
# Intercept
intercept_values = offset.values.reshape(binning.shape)
np.copyto(src=intercept_values, dst=hypersurface.intercept)
# Param gradients
for param in hypersurface.params.values():
sys_param_gradient_values = map_fit_results[param.name].values.reshape(
binning.shape)
np.copyto(src=sys_param_gradient_values,
dst=param.fit_coeffts[..., 0])
# Done, store the hypersurface
hypersurfaces[map_name] = hypersurface
return hypersurfaces
#
# Test/example
#
def generate_asimov_testdata(binning, parameters, true_param_coeffs,
nominal_param_values, sys_param_values,
error_scale=0.1, log=False, intercept=2.,
):
hypersurface = Hypersurface(
params=parameters, # Specify the systematic parameters
# Intercept value (or first guess for fit)
initial_intercept=intercept,
log=log,
)
assert set(hypersurface.params.keys()) == set(nominal_param_values.keys())
assert set(hypersurface.params.keys()) == set(true_param_coeffs.keys())
hypersurface._init(binning=binning, nominal_param_values=nominal_param_values)
from pisa.core.map import Map, MapSet
for bin_idx in np.ndindex(binning.shape):
for name, coeffs in true_param_coeffs.items():
assert len(coeffs) == hypersurface.params[name].num_fit_coeffts, ("number "
"of coefficients in the parameter must match")
for j, c in enumerate(coeffs):
idx = hypersurface.params[name].get_fit_coefft_idx(bin_idx=bin_idx,
coefft_idx=j,
)
hypersurface.params[name].fit_coeffts[idx] = c
logging.debug("Truth hypersurface report:\n%s" % str(hypersurface))
# Only consider one particle type for simplicity
particle_key = "nue_cc"
# Create each dataset, e.g. set the systematic parameter values, calculate a bin count
hist = hypersurface.evaluate(nominal_param_values)
assert np.all(hist >= 0.), ("nominal map has negative values! "
"Choose different true parameters.")
nom_map = Map(name=particle_key, binning=binning,
hist=hist, error_hist=np.sqrt(hist)*error_scale,
)
logging.debug("Nominal hist: \n%s" % str(nom_map.hist))
sys_maps = []
for i in range(len(sys_param_values)):
hist = hypersurface.evaluate(sys_param_values[i])
assert np.all(hist > 0.), ("a systematic map has negative values! values: "
"%s systematics: %s" % (str(hist), str(sys_param_values[i])))
sys_maps.append(Map(name=particle_key, binning=binning,
hist=hist, error_hist=np.sqrt(hist)*error_scale))
return nom_map, sys_maps
def test_hypersurface_uncertainty(plot=False):
'''
Simple test of hypersurface fits + uncertainty
1. Creates some Asimov test data matching a true hypersurface and checks the ability
to fit back the truth.
2. Fluctuates Asimov test data randomly to check uncertainties of hypersurface
'''
# Define systematic parameters in the hypersurface
params = [
HypersurfaceParam(name="foo", func_name="linear",
initial_fit_coeffts=[1.]),
HypersurfaceParam(name="bar", func_name="quadratic",
initial_fit_coeffts=[1., -1.]),
]
# Create the hypersurface
hypersurface = Hypersurface(
params=params, # Specify the systematic parameters
initial_intercept=1., # Intercept value (or first guess for fit)
log=False
)
# Define binning with one dummy bin
binning = MultiDimBinning([OneDimBinning(name="reco_energy",
domain=[0., 10.],
num_bins=1,
units=ureg.GeV,
is_lin=True
)])
# Define true coefficients
true_coeffs = {'foo': [-0.4], 'bar': [0.5, 1.]}
true_intercept = 5.
nominal_param_values = {'foo': 1., 'bar': 0.}
# making combinations of systematic values
foo_vals = np.linspace(-2., 2., 6)
bar_vals = np.linspace(-2, 1.5, 8)
sys_param_values = []
for f in foo_vals:
for b in bar_vals:
sys_param_values.append({'foo': f, 'bar': b})
nom_map, sys_maps = generate_asimov_testdata(binning,
params,
true_coeffs,
nominal_param_values,
sys_param_values,
intercept=true_intercept,
log=False,
error_scale=0.2,
)
# Perform fit
hypersurface.fit(
nominal_map=nom_map,
nominal_param_values=nominal_param_values,
sys_maps=sys_maps,
sys_param_values=sys_param_values,
norm=False,
)
# Report the results
logging.debug("Fitted hypersurface report:\n%s" % hypersurface)
assert np.allclose(hypersurface.intercept, true_intercept,
rtol=ALLCLOSE_KW['rtol']*10.)
for param_name in hypersurface.param_names:
assert np.allclose(hypersurface.params[param_name].fit_coeffts,
true_coeffs[param_name], rtol=ALLCLOSE_KW['rtol']*10.)
if plot:
import matplotlib.pyplot as plt
fig, ax = plt.subplots()
plot_bin_fits(ax, hypersurface, bin_idx=[0], param_name='foo',
label='Asimov test map')
ax.grid()
plt.savefig('test_hypersurface_foo.pdf')
fig, ax = plt.subplots()
plot_bin_fits(ax, hypersurface, bin_idx=[0], param_name='bar',
label='Asimov test map')
ax.grid()
plt.savefig('test_hypersurface_bar.pdf')
# Evaluate hypersurface and uncertainties at some points
# that just happen to be the systematic values (but choice could be different)
asimov_true_points = []
asimov_fit_points = []
asimov_fit_errs = []
for i in range(len(sys_param_values)):
hist, errs = hypersurface.evaluate(
sys_param_values[i], return_uncertainty=True)
asimov_fit_points.append(hist)
asimov_fit_errs.append(errs)
asimov_true_points.append(sys_maps[i].nominal_values)
asimov_true_points = np.concatenate(asimov_true_points)
asimov_fit_points = np.concatenate(asimov_fit_points)
asimov_fit_errs = np.concatenate(asimov_fit_errs)
logging.debug("Asimov true points:\n%s" % str(asimov_true_points))
logging.debug("Asimov fit points:\n%s" % str(asimov_fit_points))
logging.debug("Asimov fit error estimates:\n%s" % str(asimov_fit_errs))
assert np.allclose(asimov_true_points, asimov_fit_points, rtol=ALLCLOSE_KW['rtol']*10.)
logging.debug("Fluctuating maps and re-fitting...")
# do several rounds of fluctuation, re-fit and storage of results
n_rounds = 50
fluctuated_fit_points = []
for i in range(n_rounds):
#logging.info("Round %d/%d" % (i+1, n_rounds))
nom_map_fluct = nom_map.fluctuate(method='gauss', random_state=i*n_rounds*len(sys_maps))
sys_maps_fluct = []
for j, s in enumerate(sys_maps):
sys_maps_fluct.append(s.fluctuate(method='gauss',
random_state=i*n_rounds*len(sys_maps)+j+1))
hypersurface.fit(
nominal_map=nom_map_fluct,
nominal_param_values=nominal_param_values,
sys_maps=sys_maps_fluct,
sys_param_values=sys_param_values,
norm=False,
)
fluctuated_fit_points.append([])
for j in range(len(sys_param_values)):
hist = hypersurface.evaluate(
sys_param_values[j], return_uncertainty=False)
fluctuated_fit_points[-1].append(hist)
fluctuated_fit_points[-1] = np.concatenate(fluctuated_fit_points[-1])
logging.trace("Fluctuated fit points:\n%s" %
str(fluctuated_fit_points[-1]))
# evaluate whether the actual fluctuations match the estimated errors
fluctuated_fit_points = np.array(fluctuated_fit_points)
fit_differences = fluctuated_fit_points - asimov_fit_points
all_pulls = fit_differences / asimov_fit_errs
avg_fit_differences = np.mean(fit_differences, axis=0)
std_pulls = np.std(all_pulls, axis=0)
logging.debug("Average fluctuated fit difference:\n%s" %
str(avg_fit_differences))
logging.debug("Mean pulls per point:\n%s" % str(std_pulls))
logging.debug("Mean pull: %.3f" % np.mean(std_pulls))
# mean of 0.906 (0.916) in double (single) precision at time test succeeded
# If the pseudoexperiments above are modified, this condition
# will not necessarily hold any longer
assert np.abs(np.mean(std_pulls) -
1.) < 0.1, "avg. pulls too far from expectation"
if plot:
plt.figure()
plt.hist(all_pulls.flatten(), bins=50,
density=True, label='fluctuated fits')
x_plot = np.linspace(-4, 4, 100)
plt.plot(x_plot, np.exp(-x_plot**2/2.) /
np.sqrt(2.*np.pi), label='expectation')
plt.title('pull distribution')
plt.xlabel('pull')
plt.ylabel('density')
plt.legend()
plt.savefig('test_hypersurface_pull.pdf')
logging.info('<< PASS : test_hypersurface_uncertainty >>')
def test_hypersurface_basics():
'''
Test basic fitting, inject/recover, storing and loading
'''
import tempfile
from pisa.core.map import Map
params = [HypersurfaceParam(name="foo", func_name="linear",
initial_fit_coeffts=[1.],
),
# the exponential HS function did not reliably recover injected true
# parameters, probably due to the degeneracy with the intercept.
HypersurfaceParam(name="bar", func_name="quadratic",
initial_fit_coeffts=[.1, .1],
),
]
# Create the hypersurface
hypersurface = Hypersurface(params=params, # Specify the systematic parameters
initial_intercept=1., # Intercept first guess for fit)
log=False,
)
binning = MultiDimBinning([OneDimBinning(name="reco_energy",
domain=[0., 10.],
num_bins=3,
units=ureg.GeV,
is_lin=True,
)])
# Define the values for the parameters for each dataset
nom_param_values = {}
sys_param_values_dict = {}
if "foo" in [p.name for p in params]:
nom_param_values["foo"] = 0.
sys_param_values_dict["foo"] = [0., 0., 0., -1., +1., 1.]
if "bar" in [p.name for p in params]:
nom_param_values["bar"] = 10.
sys_param_values_dict["bar"] = [20., 30., 0., 10., 10., 15.]
# Get number of datasets
num_sys_datasets = len(list(sys_param_values_dict.values())[0])
# Only consider one particle type for simplicity
particle_key = "nue_cc"
# Create a dummy "true" hypersurface that can be used to generate
# some fake bin values for the dataset
true_hypersurface = copy.deepcopy(hypersurface)
true_hypersurface._init(
binning=binning, nominal_param_values=nom_param_values)
true_hypersurface.intercept.fill(10.)
if "foo" in true_hypersurface.params:
true_hypersurface.params["foo"].fit_coeffts[..., 0].fill(2.)
if "bar" in true_hypersurface.params:
true_hypersurface.params["bar"].fit_coeffts[..., 0].fill(-.1)
true_hypersurface.params["bar"].fit_coeffts[..., 1].fill(0.05)
logging.debug("Truth hypersurface report:\n%s" % str(true_hypersurface))
# Create each dataset, e.g. set the systematic parameter values, calculate bin count
hist = true_hypersurface.evaluate(nom_param_values)
nom_map = Map(name=particle_key, binning=binning,
hist=hist, error_hist=np.sqrt(hist),
)
sys_maps = []
sys_param_values = []
for i in range(num_sys_datasets):
sys_param_values.append({name: sys_param_values_dict[name][i]
for name in list(true_hypersurface.params.keys())
})
hist = true_hypersurface.evaluate(sys_param_values[-1])
sys_maps.append(Map(name=particle_key, binning=binning,
hist=hist, error_hist=np.sqrt(hist),
)
)
# Perform fit
hypersurface.fit(nominal_map=nom_map,
nominal_param_values=nom_param_values,
sys_maps=sys_maps,
sys_param_values=sys_param_values,
norm=False,
)
logging.debug("Fitted hypersurface report:\n%s" % hypersurface)
# Check the fitted parameter values match the truth
# This only works if `norm=False` in the `hypersurface.fit` call just above
logging.debug("Checking fit recovered truth...")
assert np.allclose(hypersurface.intercept,
true_hypersurface.intercept, rtol=ALLCLOSE_KW['rtol']*10.)
for param_name in hypersurface.param_names:
assert np.allclose(hypersurface.params[param_name].fit_coeffts,
true_hypersurface.params[param_name].fit_coeffts,
rtol=ALLCLOSE_KW['rtol']*10.
)
logging.debug("... fit was successful!")
# testing save/reload
with tempfile.TemporaryDirectory() as tmpdirname:
file_name = "hypersurface.json.bz2"
file_path = os.path.join(tmpdirname, file_name)
to_json(hypersurface, file_path)
reloaded_hypersurface = Hypersurface.from_state(file_path)
logging.debug(
"Checking saved and re-loaded hypersurfaces are identical...")
assert np.allclose(hypersurface.intercept,
reloaded_hypersurface.intercept,
rtol=ALLCLOSE_KW['rtol']*10.
)
for param_name in hypersurface.param_names:
assert np.allclose(hypersurface.params[param_name].fit_coeffts,
reloaded_hypersurface.params[param_name].fit_coeffts,
rtol=ALLCLOSE_KW['rtol']*10.
)
logging.debug("... save+re-load was successful!")
# test getting and setting coefficients
coeffts = hypersurface.fit_coeffts
reloaded_hypersurface.fit_coeffts = coeffts
logging.debug(
"Checking hypersurfaces are identical after getting and setting coeffts...")
assert np.allclose(hypersurface.intercept, reloaded_hypersurface.intercept,
rtol=ALLCLOSE_KW['rtol']*10.)
for param_name in hypersurface.param_names:
assert np.allclose(hypersurface.params[param_name].fit_coeffts,
reloaded_hypersurface.params[param_name].fit_coeffts,
rtol=ALLCLOSE_KW['rtol']*10.)
logging.debug("... setting and getting coefficients was successful!")
logging.info('<< PASS : test_hypersurface_basics >>')
# Run the examp'es/tests
if __name__ == "__main__":
set_verbosity(2)
test_hypersurface_basics()
test_hypersurface_uncertainty()