#!/usr/bin/env python
"""
Define CrossSections class for importing, working with, and storing neutrino
cross sections
"""
from __future__ import absolute_import, division
from copy import deepcopy
import os
import numpy as np
from scipy.interpolate import interp1d
from pisa.utils.comparisons import recursiveEquality
from pisa.utils.fileio import expand, from_file, to_file
from pisa.utils.flavInt import ALL_NUFLAVINTS, FlavIntData, NuFlavIntGroup
from pisa.utils.log import logging, set_verbosity
from pisa.utils.resources import find_resource
__all__ = ['CrossSections', 'manual_test_CrossSections']
__author__ = 'J.L. Lanfranchi'
__license__ = '''Copyright (c) 2014-2017, The IceCube Collaboration
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.'''
# TODO: make class for groups of CX or just a function for finding eisting
# versions in a json file; eliminate the optional ver parameters in
# CrossSections class methods
[docs]
class CrossSections(FlavIntData):
"""Cross sections for each neutrino flavor & interaction type ("flavint").
What is stored are *PER-H2O-MOLECULE* cross sections, in units of [m^2].
Be careful when using these; cross sections are often plotted in the
literature as per-nucleon and per-energy with units [10^-38 cm^2 / GeV].
ver : string
Version of cross sections to load from file. E.g. 'genie_2.6.4'
energy : None or array of float
Energies at which cross sections are defined.
xsec : string, dictonary, or another CrossSections object
If str: provides PISA resource name from which to load cross sections
If dict or CrossSections object: construct cross sections from a
deepcopy of that object
"""
def __init__(self, ver=None, energy=None,
xsec='cross_sections/cross_sections.json'):
super().__init__()
self.energy = energy
self._ver = ver
self._interpolants = {}
if xsec is None:
pass
elif isinstance(xsec, dict):
xsec = deepcopy(xsec)
elif isinstance(xsec, str):
assert self.energy is None
self.energy, xsec = self.load(fpath=xsec, ver=ver)
else:
raise TypeError('Unhandled xsec type passed in arg: ' +
str(type(xsec)))
if xsec is not None:
super().validate(xsec)
self.validate_xsec(self.energy, xsec)
self.update(xsec)
self._define_interpolant()
[docs]
@staticmethod
def load(fpath, ver=None, **kwargs):
"""Load cross sections from a file locatable and readable by the PISA
from_file command. If `ver` is provided, it is used to index into the
top level of the loaded dictionary"""
all_xsec = from_file(fpath, **kwargs)
if ver not in all_xsec:
raise ValueError('Version "%s" not found. Valid versions in file'
'"%s" are: %s' % (ver, fpath, all_xsec.keys()))
return all_xsec[ver]['energy'], all_xsec[ver]['xsec']
[docs]
@staticmethod
def load_root_file(fpath, ver, tot_sfx='_tot', o_sfx='_o16', h_sfx='_h1',
plt_sfx='_plot'):
"""Load cross sections from root file, where graphs are first-level in
hierarchy. This is yet crude and not very flexible, but at least it's
recorded here for posterity.
Requires ROOT and ROOT python module be installed
Parameters
----------
fpath : string
Path to ROOT file
ver : string
Necessary to differentaite among different file formats that Ken
has sent out
tot_sfx : string (default = '_tot')
Suffix for finding total cross sections in ROOT file (if these
fields are found, the oxygen/hydrogen fields are skipped)
o_sfx : string (default = '_o16')
Suffix for finding oxygen-16 cross sections in ROOT file
h_sfx : string (default = '_h1')
Suffix for finding hydrogen-1 cross sections in ROOT file
plt_sfx : string (default = '_plt')
Suffix for plots containing cross sections per GeV in ROOT file
Returns
-------
xsec : :class:`pisa.utils.flavInt.FlavIntData`
Object containing the loaded cross sections
"""
import ROOT
def extractData(f, key):
"""Extract x and y info from (already-opened) ROOT TFile."""
try:
g = ROOT.gDirectory.Get(key)
x = np.array(g.GetX())
y = np.array(g.GetY())
except AttributeError:
raise ValueError('Possibly missing file "%s" or missing key'
' "%s" within that file?' % (f, key))
return x, y
rfile = ROOT.TFile(fpath) # pylint: disable=no-member
try:
energy = None
xsec = FlavIntData()
for flavint in ALL_NUFLAVINTS:
if ver == 'genie_2.6.4':
# Expected to contain xsect per atom; summing 2*Hydrogen
# and 1*Oxygen yields total cross section for water
# molecule.
# Format as found in, e.g., "genie_2.6.4_simplified.root"
key = str(flavint) + o_sfx
o16_e, o16_xs = extractData(rfile, key)
key = str(flavint) + h_sfx
h1_e, h1_xs = extractData(rfile, key)
tot_xs = h1_xs*2 + o16_xs*1
assert np.all(h1_e == o16_e)
ext_e = o16_e
elif ver == 'genie_2.8.6':
# Expected to contain xsect-per-nucleon-per-energy, so
# multiplying by energy and by # of nucleons (18) yields
# cross sections per molecule.
# Format as found in, e.g., "genie_2.8.6_simplified.root"
key = str(flavint) + plt_sfx
ext_e, fract_xs = extractData(rfile, key)
tot_xs = fract_xs * ext_e * 18
else:
raise ValueError('Invalid or not implemented `ver`: "%s"'
% ver)
if energy is None:
energy = ext_e
assert np.all(ext_e == energy)
# Note that units in the ROOT files are [1e-38 cm^2] but PISA
# requires units of [m^2], so this conversion is made here.
xsec[flavint] = tot_xs * 1e-38 * 1e-4
finally:
rfile.Close()
CrossSections.validate_xsec(energy, xsec)
return energy, xsec
[docs]
@classmethod
def new_from_root(cls, fpath, ver, **kwargs):
"""Instantiate a new CrossSections object from ROOT file.
Parameters
----------
fpath : string
PISA resource specification for location of ROOT file
ver : string
Specify the version name of the cross sections loaded
**kwargs
Passed to method load_root_file()
Returns
-------
Instantiated CrossSections object
"""
energy, xsec = CrossSections.load_root_file(fpath, ver=ver, **kwargs)
return cls(energy=energy, xsec=xsec, ver=ver)
[docs]
@staticmethod
def validate_xsec(energy, xsec):
"""Validate cross sections"""
# TODO: different validation based on cross sections version string
# Make sure the basics are present
xsec = FlavIntData(xsec)
## No NaN's
assert not np.any(np.isnan(energy))
# Energy spans at least 1-100 GeV
assert np.min(energy) <= 1
assert np.max(energy) >= 100
# All event flavints need to be present
for k in ALL_NUFLAVINTS:
# Uses "standard" PISA indexing scheme
x = xsec[k]
# Arrays are same lengths
assert len(x) == len(energy)
# No NaN's
assert np.sum(np.isnan(x)) == 0
# Max xsec/energy value is in range for units of [m^2/GeV]
assert np.max(x/energy) < 40e-42, np.max(x/energy)
[docs]
def set_version(self, ver):
"""Set the cross sections version to the string `ver`."""
self._ver = ver
[docs]
def get_version(self):
"""Return the cross sections version string"""
return self._ver
[docs]
def save(self, fpath, ver=None, **kwargs): # pylint: disable=arguments-differ
"""Save cross sections (and the energy specification) to a file at
`fpath`."""
if ver is None:
if self._ver is None:
raise ValueError(
'Either a ver must be specified in call to `save` or it '
'must have been set prior to the invocation of `save`.'
)
ver = self._ver
else:
assert ver == self._ver
try:
fpath = find_resource(fpath)
except IOError:
pass
fpath = os.path.expandvars(os.path.expanduser(fpath))
all_xs = {}
# Get any existing data from file
if os.path.exists(fpath):
all_xs = from_file(fpath)
# Validate existing data by instantiating objects from each
for v, d in all_xs.items():
CrossSections(ver=v, energy=d['energy'], xsec=d['xsec'])
if ver in all_xs:
logging.warning('Overwriting existing version "' + ver +
'" in file ' + fpath)
all_xs[ver] = {'xsec':self, 'energy':self.energy}
to_file(all_xs, fpath, **kwargs)
[docs]
def get_xs_value(self, flavintgroup, energy):
"""Get (combined) cross section value (in units of m^2) for
`flavintgroup` at `energy` (in units of GeV).
Parameters
----------
flavintgroup : NuFlavIntGroup or convertible thereto
energy : numeric or sequence thereof
Energy (or energies) at which to evaluate total cross section, in
units of GeV
Returns
-------
Combined cross section for flavor/interaction types in units of
m^2, evaluated at each energy. Shape of returned value matches that of
passed `energy` parameter.
"""
flavintgroup = NuFlavIntGroup(flavintgroup)
if flavintgroup not in self._interpolants:
self._define_interpolant(flavintgroup=flavintgroup)
return self._interpolants[flavintgroup](energy)
[docs]
def get_xs_ratio_value(self, flavintgroup0, flavintgroup1, energy,
gamma=0):
"""Get ratio of combined cross sections for `flavintgroup0` to combined
cross sections for `flavintgroup1`, weighted by E^{-`gamma`}.
Parameters
----------
flavintgroup0, flavintgroup1 : NuFlavIntGroup or convertible thereto
energy : numeric or sequence thereof
Energy (or energies) at which to evaluate total cross section, in
units of GeV
Returns
-------
Ratio of combined cross sections flavintgroup0 / flavintgroup1
evaluated at each energy. Shape of returned value matches that of
passed `energy` parameter.
"""
flavintgroup0 = NuFlavIntGroup(flavintgroup0)
flavintgroup1 = NuFlavIntGroup(flavintgroup1)
self._define_interpolant(flavintgroup=flavintgroup0)
self._define_interpolant(flavintgroup=flavintgroup1)
xs_ratio_vals = self._interpolants[flavintgroup0](energy) / \
self._interpolants[flavintgroup1](energy)
# Special case to avoid multiplying by array of ones
if gamma == 0:
return xs_ratio_vals
return xs_ratio_vals * energy**(-gamma)
def _define_interpolant(self, flavintgroup=None):
"""If `flavintgroup` is None, compute all (separate) flavint
interpolants; otherwise, compute interpolant for specified
`flavintgroup`. Do not re-compute if already present.
"""
if flavintgroup is None:
flavintgroups = [NuFlavIntGroup(fi) for fi in self.flavints]
else:
flavintgroups = [NuFlavIntGroup(flavintgroup)]
for fig in flavintgroups:
if fig in self._interpolants:
continue
combined_xs = self._combine_xs(fig)
self._interpolants[fig] = interp1d(
x=self.energy, y=combined_xs, kind='linear', copy=False,
bounds_error=True, fill_value=0
)
def _combine_xs(self, flavintgroup):
"""Combine all cross sections specified by the flavints in
`flavintgroup`. All CC and NC interactions are separately grouped
together and averaged, then the average of each interaction type
is added to the other.
If CC and NC interactions are present, they *must* be from the same
flavor(s). I.e., it doesn't make sense (and so causes an exception) if
you combine numu CC with numubar NC. It does make sense if you combine
numu and numubar CC with numu and numubar NC, though, and this is
allowed.
Notes
-----
Does not yet implement *Ngen/spectrum-weighted* averages, which are
necessary when combining cross sections of disparate flavor/interaction
types from different Monte Carlo simulation runs.
"""
flavintgroup = NuFlavIntGroup(flavintgroup)
# Trivial case: nothing to combine
if len(flavintgroup.flavints) == 1:
return self[flavintgroup.flavints[0]]
cc_flavints = flavintgroup.cc_flavints
nc_flavints = flavintgroup.nc_flavints
if cc_flavints and nc_flavints:
assert flavintgroup.cc_flavs == flavintgroup.nc_flavs, \
'Combining CC and NC but CC flavors do not match NC flavors'
cc_avg_xs = 0
if cc_flavints:
logging.trace('cc_flavints = %s' % (cc_flavints,))
cc_avg_xs = np.sum([self[k] for k in cc_flavints], axis=0) \
/ len(cc_flavints)
nc_avg_xs = 0
if nc_flavints:
logging.trace('nc_flavints = %s' % (nc_flavints,))
nc_avg_xs = np.sum([self[k] for k in nc_flavints], axis=0) \
/ len(nc_flavints)
tot_xs = cc_avg_xs + nc_avg_xs
logging.trace('mean(tot_xs) = %s' % (np.mean(tot_xs),))
return tot_xs
[docs]
def get_xs_ratio_integral(self, flavintgroup0, flavintgroup1, e_range,
gamma=0, average=False):
"""Energy-spectrum-weighted integral of (possibly a ratio of)
(possibly-combined) flavor/interaction type cross sections.
Parameters
----------
flavintgroup0 : NuFlavIntGroup or convertible thereto
Flavor(s)/interaction type(s) for which to combine cross sections
for numerator of ratio
flavintgroup1 : None, NuFlavIntGroup or convertible thereto
Flavor(s)/interaction type(s) for which to combine cross sections
for denominator of ratio. If None is passed, the denominator of
the "ratio" is effectively 1.
e_range
Range of energy over which to integrate (GeV)
gamma : float >= 0
Power law spectral index used for weighting the integral,
E^{-`gamma`}. Note that `gamma` should be >= 0.
average : bool
If True, return the average of the cross section (ratio)
If False, return the integral of the cross section (ratio)
See also
--------
See _combine_xs for detals on how flavints are combined.
"""
e_min = min(e_range)
e_max = max(e_range)
assert e_min > 0, '`e_range` must lie strictly above 0'
assert e_max > e_min, \
'max(`e_range`) must be strictly larger than min(`e_range`)'
assert gamma >= 0, '`gamma` must be >= 0'
if flavintgroup1 is None:
flavintgroups = [NuFlavIntGroup(flavintgroup0)]
else:
flavintgroups = [NuFlavIntGroup(flavintgroup0),
NuFlavIntGroup(flavintgroup1)]
# Create interpolant(s) (to get xs at energy range's endpoints)
for fg in flavintgroups:
self._define_interpolant(flavintgroup=fg)
all_energy = self._interpolants[flavintgroups[0]].x
xs_data = [self._interpolants[fg].y for fg in flavintgroups]
for xd in xs_data:
logging.trace('mean(xs_data) = %e' % np.mean(xd))
# Get indices of data points within the specified energy range
idx = (all_energy > e_min) & (all_energy < e_max)
# Get xsec at endpoints
xs_endpoints = [self._interpolants[fg]((e_min, e_max))
for fg in flavintgroups]
for ep in xs_endpoints:
logging.trace('xs_emin = %e, xs_emax = %e' % (ep[0], ep[1]))
# Attach endpoints
energy = np.concatenate([[e_min], all_energy[idx], [e_max]])
xs = [np.concatenate([[ep[0]], xsd[idx], [ep[1]]])
for ep, xsd in zip(xs_endpoints, xs_data)]
if len(xs) == 1:
xs = xs[0]
else:
xs = xs[0] / xs[1]
# Weight xsec (or ratio) by energy spectrum
if gamma == 0:
wtd_xs = xs
else:
wtd_xs = xs*energy**(-gamma)
logging.trace('mean(wtd_xs) = %e' % np.mean(wtd_xs))
# Integrate via trapezoidal rule
wtd_xs_integral = np.trapz(y=wtd_xs, x=energy)
logging.trace('wtd_xs_integral = %e' % wtd_xs_integral)
# Need to divide by integral of the weight function (over the same
# energy interval as wtd_xs integral was computed) to get the average
if average:
if gamma == 0:
# Trivial case
xs_average = wtd_xs_integral / (e_max - e_min)
else:
# Otherwise use trapezoidal rule to approximate integral
xs_average = wtd_xs_integral / \
np.trapz(y=energy**(-gamma), x=energy) #* (e_max-e_min)
logging.trace('xs_average = %e' %(xs_average))
return xs_average
return wtd_xs_integral
#def get_xs_integral(self, flavintgroup, e_range, gamma=0, average=False):
# """Energy-spectrum-weighted integral or average of (possibly-combined)
# flavor/interaction type cross section.
# Parameters
# ----------
# flavintgroup : NuFlavIntGroup or convertible thereto
# Flavor(s)/interaction type(s) for which to combine cross sections
# e_range
# Range of energy over which to integrate (GeV)
# gamma : float >= 0
# Power law spectral index used for weighting the integral,
# E^{-`gamma`}. Note that `gamma` should be >= 0.
# average : bool
# True: return the average of the cross section over `e_range`
# False: return the integral of the cross section over `e_range`
# of flavints specfied in `flavintgroup`; if `gamma` specified, weight
# integral by simulated spectrum with that power-spectral index
# (i.e.: E^{-gamma}).
#
# Specifying `average`=True yields the weighted-average of cross section.
# See also
# --------
# See _combine_xs for detals on how flavints are combined.
# """
# return self.get_xs_ratio_integral(
# flavintgroup0=flavintgroup, flavintgroup1=None,
# e_range=e_range, gamma=gamma, average=average
# )
[docs]
def plot(self, save=None):
"""Plot cross sections per GeV; optionally, save plot to file. Requires
matplotlib and optionally uses seaborn to set "pretty" defaults.
save : None, str, or dict
If None, figure is not saved (default)
If a string, saves figure as that name
If a dict, calls pyplot.savefig(**save) (i.e., save contains
keyword args to savefig)
"""
import matplotlib.pyplot as plt
if self._ver is None:
leg_ver = ''
else:
leg_ver = self._ver + '\n'
leg_fs = 11
figsize = (9, 6)
alpha = 1.0
f = plt.figure(figsize=figsize)
f.clf()
ax1 = f.add_subplot(121)
ax2 = f.add_subplot(122)
ls = [dict(lw=5, ls='-'),
dict(lw=2, ls='-'),
dict(lw=1, ls='--'),
dict(lw=5, ls='-'),
dict(lw=3, ls='-.'),
dict(lw=1, ls='-')]
energy = self.energy
nc_n = cc_n = 0
for flavint in list(ALL_NUFLAVINTS.particles) + \
list(ALL_NUFLAVINTS.antiparticles):
# Convert from [m^2] to [1e-38 cm^2]
xs = self[flavint] * 1e38 * 1e4
if flavint.cc:
ax1.plot(energy, xs/energy,
alpha=alpha,
label='$%s$' % flavint.flav.tex,
**ls[cc_n%len(ls)])
cc_n += 1
else:
ax2.plot(energy, xs/energy,
alpha=alpha,
label='$%s$' % flavint.flav.tex,
**ls[nc_n%len(ls)])
nc_n += 1
l1 = ax1.legend(title=leg_ver +
r'$\nu+{\rm H_2O}$, total CC',
fontsize=leg_fs, frameon=False, ncol=2)
l2 = ax2.legend(title=leg_ver +
r'$\nu+{\rm H_2O}$, total NC',
fontsize=leg_fs, frameon=False, ncol=2)
for (ax, leg) in [(ax1, l1), (ax2, l2)]:
ax.set_xlim(0.1, 100)
ax.set_xscale('log')
ax.set_xlabel(r'$E_\nu\,{\rm [GeV]}$')
ax.set_ylabel(
r'$\sigma(E_\nu)/E_\nu\quad ' +
r'\left[10^{-38} \mathrm{cm^2} \mathrm{GeV^{-1}}\right]$'
)
leg.get_title().set_fontsize(leg_fs)
plt.tight_layout()
if isinstance(save, str):
logging.info('Saving cross sections plots to file "%s"', save)
f.savefig(save)
elif isinstance(save, dict):
logging.info('Saving cross sections plots via `figure.save(**%s)`', save)
f.savefig(**save)
[docs]
def manual_test_CrossSections(outdir=None):
"""Unit tests for CrossSections class"""
from shutil import rmtree
from tempfile import mkdtemp
remove_dir = False
if outdir is None:
remove_dir = True
outdir = mkdtemp()
try:
# "Standard" location of cross sections file in PISA; retrieve 2.6.4 for
# testing purposes
pisa_xs_file = 'cross_sections/cross_sections.json'
xs = CrossSections(ver='genie_2.6.4', xsec=pisa_xs_file)
# Location of the root file to use (not included in PISA at the moment)
#test_dir = expand(os.path.join('/tmp', 'pisa_tests', 'cross_sections'))
#root_xs_file = os.path.join(test_dir, 'genie_2.6.4_simplified.root')
try:
root_xs_file = find_resource(
os.path.join(
#'tests', 'data', 'xsec', 'genie_2.6.4_simplified.root'
'cross_sections', 'genie_xsec_H2O.root'
)
)
except Exception:
root_xs_file = None
# Make sure that the XS newly-imported from ROOT match those stored in
# PISA
if root_xs_file:
xs_from_root = CrossSections.new_from_root(root_xs_file, ver='genie_2.6.4')
logging.info(
'Found and loaded ROOT source cross sections file %s', root_xs_file
)
assert xs_from_root.allclose(xs, rtol=1e-7)
# Check XS ratio for numu_cc to numu_cc + numu_nc (user must inspect)
kg0 = NuFlavIntGroup('numu_cc')
kg1 = NuFlavIntGroup('numu_nc')
logging.info(
r'\int_1^80 xs(numu_cc) E^{-1} dE = %e',
xs.get_xs_ratio_integral(kg0, None, e_range=[1, 80], gamma=1)
)
logging.info(
'(int E^{-gamma} * (sigma_numu_cc)/int(sigma_(numu_cc+numu_nc)) dE)'
' / (int E^{-gamma} dE) = %e',
xs.get_xs_ratio_integral(kg0, kg0+kg1, e_range=[1, 80], gamma=1,
average=True)
)
# Check that XS ratio for numu_cc+numu_nc to the same is 1.0
int_val = xs.get_xs_ratio_integral(kg0+kg1, kg0+kg1, e_range=[1, 80],
gamma=1, average=True)
if not recursiveEquality(int_val, 1):
raise ValueError('Integral of nc + cc should be 1.0; get %e'
' instead.' % int_val)
# Check via plot that the
# Plot all cross sections stored in PISA xs file
try:
alldata = from_file(pisa_xs_file)
xs_versions = alldata.keys()
for ver in xs_versions:
xs = CrossSections(ver=ver, xsec=pisa_xs_file)
xs.plot(save=os.path.join(
outdir, 'pisa_' + ver + '_nuxCCNC_H2O_cross_sections.pdf'
))
except ImportError as exc:
logging.debug('Could not plot; possible that matplotlib not'
'installed. ImportError: %s', exc)
finally:
if remove_dir:
rmtree(outdir)
if __name__ == "__main__":
set_verbosity(1)
manual_test_CrossSections()