"""PISA data container"""
from __future__ import absolute_import, division, print_function
import argparse
from collections.abc import Mapping, Iterable, Sequence
from collections import OrderedDict
import copy
import re
import numpy as np
from pisa import FTYPE
from pisa.core.binning import OneDimBinning, MultiDimBinning
from pisa.utils.fileio import from_file
from pisa.utils.log import logging
__all__ = [
"NU_FLAVORS",
"NU_INTERACTIONS",
"OUTPUT_NUFLAVINT_KEYS",
"LEGACY_FLAVKEY_XLATION",
"EventsPi",
"split_nu_events_by_flavor_and_interaction",
"fix_oppo_flux",
"main",
]
__author__ = "T. Stuttard"
__license__ = """Copyright (c) 2014-2018, The IceCube Collaboration
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License."""
# Define the flavors and interactions for neutrino events
NU_FLAVORS = OrderedDict(
nue=12, nuebar=-12, numu=14, numubar=-14, nutau=16, nutaubar=-16
)
NU_INTERACTIONS = OrderedDict(cc=1, nc=2)
OUTPUT_NUFLAVINT_KEYS = tuple(
"%s_%s" % (fk, ik)
for fk, fc in NU_FLAVORS.items()
for ik, ic in NU_INTERACTIONS.items()
)
LEGACY_FLAVKEY_XLATION = dict(
nue="nue",
nuebar="nuebar",
nue_bar="nuebar",
numu="numu",
numubar="numubar",
numu_bar="numubar",
nutau="nutau",
nutaubar="nutaubar",
nutau_bar="nutaubar",
)
# Backwards cmpatiblity fixes
OPPO_FLUX_LEGACY_FIX_MAPPING_NU = {
"nominal_nue_flux" : "neutrino_nue_flux",
"nominal_numu_flux" : "neutrino_numu_flux",
"nominal_nuebar_flux" : "neutrino_oppo_nue_flux",
"nominal_numubar_flux" : "neutrino_oppo_numu_flux",
}
OPPO_FLUX_LEGACY_FIX_MAPPING_NUBAR = {
"nominal_nue_flux" : "neutrino_oppo_nue_flux",
"nominal_numu_flux" : "neutrino_oppo_numu_flux",
"nominal_nuebar_flux" : "neutrino_nue_flux",
"nominal_numubar_flux" : "neutrino_numu_flux",
}
def append_arrays_dict(key, val, sdict):
'''
Helper function for appending multiple dicts of arrays (e.g. from
multiple input files) into a single dict of arrays
'''
if isinstance(val, Mapping):
# Handle sub-dict
for key2, val2 in val.items() :
if key not in sdict :
sdict[key] = OrderedDict()
append_arrays_dict(key2, val2, sdict[key])
else :
# Have now reached a variable
assert isinstance(val, np.ndarray), "'%s' is not an array, is a %s" % (key, type(val))
if key in sdict :
sdict[key] = np.append(sdict[key], val)
else :
sdict[key] = val
[docs]
class EventsPi(OrderedDict):
"""
Container for events for use with PISA pi
Parameters
----------
name : string, optional
Name to identify events
neutrinos : bool, optional
Flag indicating if events represent neutrinos; toggles special
behavior such as splitting into nu/nubar and CC/NC. Default is True.
fraction_events_to_keep : float, default: None
Fraction of loaded events to use (use to downsample).
Must be in range [0.,1.]. Disabled by setting to `None`.
events_subsample_index : int >= 0, default: 0
If `fraction_events_to_keep` is not `None`, determines
which of the statistically independent sub-samples
(uniquely determined by the seed passed to `load_events_file`)
to select.
*args, **kwargs
Passed on to `__init__` method of OrderedDict
"""
def __init__(
self,
*args,
name=None,
neutrinos=True,
fraction_events_to_keep=None,
events_subsample_index=0,
**kwargs
):
super().__init__(*args, **kwargs)
self.name = name
self.neutrinos = neutrinos
self.fraction_events_to_keep = fraction_events_to_keep
self.events_subsample_index = events_subsample_index
# Checks for down-sampling inputs
if self.fraction_events_to_keep is not None:
# Check `fraction_events_to_keep` value is required range
self.fraction_events_to_keep = float(self.fraction_events_to_keep)
assert (self.fraction_events_to_keep >= 0.) and (self.fraction_events_to_keep <= 1.), "`fraction_events_to_keep` must be in range [0.,1.], or None to disable"
# Check `fraction_events_to_keep` and `events_subsample_index` values are compatible
assert isinstance(self.events_subsample_index, int), f"`events_subsample_index` must be an integer"
assert self.events_subsample_index >= 0, f"`events_subsample_index` = {self.events_subsample_index}, but must be >= 0"
max_index = int(np.floor( 1. / self.fraction_events_to_keep )) - 1
assert self.events_subsample_index <= max_index, f"`events_subsample_index` = {self.events_subsample_index} is too large given `fraction_events_to_keep` = {self.fraction_events_to_keep} (max is {max_index})"
# Define some metadata
#TODO Is this out of date?
self.metadata = OrderedDict(
[
("detector", ""),
("geom", ""),
("runs", []),
("proc_ver", ""),
("cuts", []),
]
)
[docs]
def load_events_file(self, events_file, variable_mapping=None, required_metadata=None, seed=123456):
"""Fill this events container from an input HDF5 file filled with event
data. Optionally can provide a variable mapping so select a subset of
variables, rename them, etc.
Parameters
----------
events_file : string or mapping
If string, interpret as a path and load file at that path; the
loaded object should be a mapping. If already a mapping, take and
interpret events from that.
variable_mapping : mapping, optional
If specified, should be a mapping where the keys are the
destination variable names and the items are either the source
variable names or an iterable of source variables names. In the
latter case, each of the specified source variables will become a
column vector in the destination array.
required_metadata : sequence of str, default: None
Can optionally specify metadata keys to parse from the input file metdata.
ONLY metadata specified here will be parsed.
Anything specified here MUST exist in the files.
seed : int, default: 123456
If `fraction_events_to_keep` is not `None`, serves
as random seed for generating reproducible sub-samples.
"""
# Validate `events_file`
if not isinstance(events_file, (str, Mapping, Sequence)):
raise TypeError(
"`events_file` must be either string or mapping; got (%s)"
% type(events_file)
)
# Validate `variable_mapping`
if variable_mapping is not None:
if not isinstance(variable_mapping, Mapping):
raise TypeError("'variable_mapping' must be a mapping (e.g., dict)")
for dst, src in variable_mapping.items():
if not isinstance(dst, str):
raise TypeError("`variable_mapping` 'dst' (key) must be a string")
if isinstance(src, str):
pass # Nothing to do
elif isinstance(src, Iterable):
for v in src:
if not isinstance(v, str):
raise TypeError(
"`variable_mapping` 'src' (value) has at least"
" one element that is not a string"
)
else:
raise TypeError(
"`variable_mapping` 'src' (value) must be a string or"
" an iterable of strings"
)
# Validate `required_metadata`
if required_metadata is not None :
assert isinstance(required_metadata, Sequence)
assert all([ isinstance(k, str) for k in required_metadata ])
# Reporting
if self.fraction_events_to_keep is not None :
logging.info("Down-sampling events (keeping %0.2g%% of the total). Will take sub-sample %i." % (100.*self.fraction_events_to_keep, self.events_subsample_index))
#
# Loop over files
#
input_data = OrderedDict()
metadata = OrderedDict()
# Handle list of files vs single file
events_files_list = []
if isinstance(events_file, str):
events_files_list = [events_file]
elif isinstance(events_file, Mapping):
events_files_list = [events_file]
elif isinstance(events_file, Sequence):
events_files_list = events_file
# Loop over files
for i_file, infile in enumerate(events_files_list) :
#
# Parse variables from file
#
# Read the file
# If `variable_mapping` was specified, only load those variables (saves time/memory)
if isinstance(infile, str):
# If user provided a variable mapping, only load the requested variables.
# Remember to andle cases where the variable is defined as a list of variables in
# the cfg file.
if variable_mapping is None :
choose = None
else :
choose = []
for var_name in variable_mapping.values() :
if isinstance(var_name, str) :
choose.append(var_name)
elif isinstance(var_name, Sequence) :
for sub_var_name in var_name :
assert isinstance(sub_var_name, str), "Unknown variable format, must be `str`"
choose.append(sub_var_name)
else :
raise IOError("Unknown variable name format, must be `str` or list of `str`")
# Handle "oppo" flux backwards compatibility
# This means adding the old variable names into the chosen variable list
# The actual renaming is done later by `fix_oppo_flux`
if variable_mapping is not None :
for var_name in choose :
if var_name in OPPO_FLUX_LEGACY_FIX_MAPPING_NU :
choose.append( OPPO_FLUX_LEGACY_FIX_MAPPING_NU[var_name] )
if var_name in OPPO_FLUX_LEGACY_FIX_MAPPING_NUBAR :
choose.append( OPPO_FLUX_LEGACY_FIX_MAPPING_NUBAR[var_name] )
# Load the file
file_input_data = from_file(infile, choose=choose)
if not isinstance(file_input_data, Mapping):
raise TypeError(
'Contents loaded from "%s" must be a mapping; got: %s'
% (infile, type(file_input_data))
)
assert len(file_input_data) > 0, "No input data found"
# File already loaded
elif isinstance(infile, Mapping) :
file_input_data = infile
# Add to overall container
for k, v in file_input_data.items() :
append_arrays_dict(k, v, input_data)
#
# Parse metadata from file
#
if required_metadata is not None :
# Events and EventsPi objects have attr `metadata`
file_metadata = getattr(file_input_data, 'metadata', None)
# HDF files have attr `attrs` attached, if present (see pisa.utils.hdf)
if not file_metadata:
file_metadata = getattr(file_input_data, 'attrs', None)
if file_metadata:
# Check format
if not isinstance(file_metadata, Mapping):
raise TypeError(
"metadata or attrs expected to be a Mapping, but got {}".format(
type(file_metadata)
)
)
# Loop over expected metadata
for k in required_metadata :
assert k in file_metadata, "Expected metadata '%s' not found" % k
# For the special case of livetime, append livetiem from each file
# Otherwise, expect identical value in all cases
if k in self.metadata :
if k == "livetime" :
self.metadata[k] += file_metadata[k]
else :
assert self.metadata[k] == file_metadata[k]
else :
self.metadata[k] = file_metadata[k]
#
# Re-format inputs
#
# The following is intended to re-format input data into the desired
# format. This is required to handle various inout cases and to ensure
# backwards compatibility with older input file formats.
# Convert to the required event keys, e.g. "numu_cc", "nutaubar_nc", etc.
if self.neutrinos:
input_data = split_nu_events_by_flavor_and_interaction(input_data)
# The value for each category should itself be a dict of the event
# variables, where each entry is has a variable name as the key and an
# np.array filled once per event as the value.
#
# For backwards compatibility, convert to this format from known older
# formats first
if self.neutrinos:
for key, cat_dict in input_data.items():
if not isinstance(cat_dict, Mapping):
raise Exception(
"'%s' input data is not a mapping, unknown format (%s)"
% (key, type(cat_dict))
)
for var_key, var_data in cat_dict.items():
if not isinstance(var_data, np.ndarray):
raise Exception(
"'%s/%s' input data is not a numpy array, unknown"
" format (%s)" % (key, var_key, type(var_data))
)
# Ensure backwards compatibility with the old style "oppo" flux
# variables
if self.neutrinos:
fix_oppo_flux(input_data)
#
# Load the event data
#
# Should be organised under a single layer of keys, each representing
# some category of input data
# Loop over the input types
for data_key in input_data.keys():
if data_key in self:
raise ValueError(
"Key '%s' has already been added to this data structure"
)
self[data_key] = OrderedDict()
# Loop through variable mapping
# If none provided, just use all variables and keep the input names
if variable_mapping is None:
variable_mapping_to_use = tuple(
zip(input_data[data_key].keys(), input_data[data_key].keys())
)
else:
variable_mapping_to_use = variable_mapping.items()
# Init stuff for down-sampling later
chosen_event_indices = None
rand = np.random.RandomState(seed) # Enforce same sample each time
# Get the array data (stacking if multiple input variables defined)
# and check the variable exists in the input data
for var_dst, var_src in variable_mapping_to_use:
# TODO What about non-float data? Use dtype...
# Prepare for the stacking
array_data = None
if isinstance(var_src, str):
var_src = [var_src]
# Perform the stacking
array_data_to_stack = []
for var in var_src:
if var in input_data[data_key]:
array_data_to_stack.append(
input_data[data_key][var].astype(FTYPE)
)
else:
raise KeyError(
"Variable '%s' cannot be found for '%s' events"
% (var, data_key)
)
# Note `squeeze` removes the extraneous 2nd dim in case of a
# single `src`
array_data = np.squeeze(np.stack(array_data_to_stack, axis=1))
# Check actually have some data
if array_data is None:
raise ValueError(
"Cannot find source variable(s) '%s' for '%s'"
% (var_src, data_key)
)
#
# Event down sampling
#
# Only if requested by user
if self.fraction_events_to_keep is not None:
# Define events to keep only once for each species (e.g. same choice for all variables for a given species)
if chosen_event_indices is None :
# Get intitial conditions
initial_num_events = array_data.size
desired_num_events = int( self.fraction_events_to_keep * float(initial_num_events) )
# Start with all events as input
current_event_indices = np.array( range(initial_num_events) )
# Loop over subsamples (will break out once reach desired subsample)
i = 0
while True :
# Get indices for the events to keep for this current sub-sample
assert current_event_indices.size >= desired_num_events, "Not enough events available" # Earlier checks on `fraction_events_to_keep` and `events_subsample_index` should prevent this error ever happening
chosen_event_indices = np.sort( rand.choice(current_event_indices, replace=False, size=desired_num_events) )
# If this is the requested sub-sample, done here
if i == self.events_subsample_index :
break
# Otherwise have not yet reached our subsample.
# Choose the remaining events as the new input events in the algorithm,
# and on the next iteration of this loop these remaining events will be
# used for extracting the new sub-sample.
# This will result in statistically independent sub-samples
remaining_event_indices = np.sort( np.setxor1d(current_event_indices, chosen_event_indices) )
current_event_indices = remaining_event_indices
i += 1
# Report
logging.info("Down-sampled %s events : %i -> %i (%0.2g%%)" % ( data_key, array_data.size, chosen_event_indices.size, 100.*(chosen_event_indices.size/array_data.size) ))
# Extract just the requested events
array_data = array_data[chosen_event_indices]
# Add to array
self[data_key][var_dst] = array_data
[docs]
def apply_cut(self, keep_criteria):
"""Apply a cut by specifying criteria for keeping events. The cut must
be successfully applied to all flav/ints in the events object before
the changes are kept, otherwise the cuts are reverted.
Parameters
----------
keep_criteria : string
Any string interpretable as numpy boolean expression.
Examples
--------
Keep events with true energies in [1, 80] GeV (note that units are not
recognized, so have to be handled outside this method)
>>> events = events.apply_cut("(true_energy >= 1) & (true_energy <= 80)")
Do the opposite with "~" inverting the criteria
>>> events = events.apply_cut("~((true_energy >= 1) & (true_energy <= 80))")
Numpy namespace is available for use via `np` prefix
>>> events = events.apply_cut("np.log10(true_energy) >= 0")
"""
assert isinstance(keep_criteria, str)
# Check if have already applied these cuts
if keep_criteria in self.metadata["cuts"]:
logging.debug(
"Criteria '%s' have already been applied. Returning"
" events unmodified.",
keep_criteria,
)
return self
# TODO Get everything from the GPU first ?
# Prepare the post-cut data container
cut_data = EventsPi(name=self.name)
cut_data.metadata = copy.deepcopy(self.metadata)
# Loop over the data containers
for key in self.keys():
cut_data[key] = {}
# TODO Need to think about how to handle array, scalar and binned data
# TODO Check for `events` data mode, or should this kind of logic
# already be in the Container class?
variables = self[key].keys()
# Create the cut expression, and get the resulting mask
crit_str = keep_criteria
for variable_name in variables:
# using word boundary \b to replace whole words only
crit_str = re.sub(
r'\b{}\b'.format(variable_name),
'self["%s"]["%s"]' % (key, variable_name),
crit_str
)
mask = eval(crit_str) # pylint: disable=eval-used
# Fill a new container with the post-cut data
for variable_name in variables:
cut_data[key][variable_name] = copy.deepcopy(
self[key][variable_name][mask]
)
# TODO update to GPUs?
# Record the cuts
cut_data.metadata["cuts"].append(keep_criteria)
return cut_data
[docs]
def keep_inbounds(self, binning):
"""Cut out any events that fall outside `binning`. Note that events
that fall exactly on an outer edge are kept.
Parameters
----------
binning : OneDimBinning or MultiDimBinning
Returns
-------
cut_data : EventsPi
"""
# Get the binning instance
try:
binning = OneDimBinning(binning)
except: # pylint: disable=bare-except
pass
if isinstance(binning, OneDimBinning):
binning = [binning]
binning = MultiDimBinning(binning)
# Define a cut to remove events outside of the binned region
bin_edge_cuts = [dim.inbounds_criteria for dim in binning]
bin_edge_cuts = " & ".join([str(x) for x in bin_edge_cuts])
# Apply the cut
return self.apply_cut(bin_edge_cuts)
def __str__(self): # TODO Handle non-array data cases
string = "-----------------------------\n"
string += "EventsPi container %s :" % self.name
for key, container in self.items():
string += " %s :\n" % key
for var, array in container.items():
array_data = array
if len(array_data) <= 4:
array_data_string = str(array_data)
else:
array_data_string = "[%s, %s, ..., %s, %s]" % (
array_data[0],
array_data[1],
array_data[-2],
array_data[-1],
)
string += " %s : %i elements : %s\n" % (
var,
len(array_data),
array_data_string,
)
string += "-----------------------------"
return string
[docs]
def split_nu_events_by_flavor_and_interaction(input_data):
"""Split neutrino events by nu vs nubar, and CC vs NC.
Should be compatible with DRAGON and GRECO samples, but this depends on the
contents of the original I3 files and whatever conversion script was used
to produce the HDF5 files from these.
Parameters
----------
input_data : mapping
Returns
-------
output_data : OrderedDict
"""
# TODO Split into one function for nu/nubar and one for CC/NC?
assert isinstance(input_data, Mapping)
assert input_data, "`input_data` has no members"
output_data = OrderedDict()
# Loop through subcategories in the input data
for key, data in input_data.items():
# If key already is one of the desired keys, nothing new to do
# Just move the data to the output container
if key in OUTPUT_NUFLAVINT_KEYS:
if key in output_data:
output_data[key] = np.concatenate(output_data[key], data)
else:
output_data[key] = data
continue
# Legacy PISA HDF5 files are structured as
# {"<flavor>": {"<int_type>": data}};
# and `flavor` can have "_" separating "bar". Remove such underscores
# and flatten the nested dicts into
# {"<flavor>_<int_type>": data}
# format
if key in LEGACY_FLAVKEY_XLATION:
new_flav_key = LEGACY_FLAVKEY_XLATION[key]
for sub_key, sub_data in data.items():
assert sub_key in ("cc", "nc"), str(sub_key)
output_key = new_flav_key + "_" + sub_key
if output_key in output_data:
output_data[output_key] = np.concatenate(
output_data[output_key], sub_data
)
else:
output_data[output_key] = sub_data
continue
assert "pdg_code" in data, "No 'pdg_code' variable found for %s data" % key
# Check these are neutrino events
assert np.all(np.isin(data["pdg_code"], NU_FLAVORS.values())), (
"%s data does not appear to be a neutrino data" % key
)
assert "interaction" in data, (
"No 'interaction' variable found for %s data" % key
)
# Define a mask to select the events for each desired output key
key_mask_pairs = [
("%s_%s" % (fk, ik), (data["pdg_code"] == fc) & (data["interaction"] == ic))
for fk, fc in NU_FLAVORS.items()
for ik, ic in NU_INTERACTIONS.items()
]
# Loop over the keys/masks and write the data for each class to the
# output container
for mkey, mask in key_mask_pairs:
if np.any(mask): # Only if mask has some data
if mkey in output_data:
output_data[mkey] = np.concatenate(output_data[mkey], data)
else:
output_data[mkey] = data
if len(output_data) == 0:
raise ValueError("Failed splitting neutrino events by flavor/interaction")
return output_data
[docs]
def fix_oppo_flux(input_data):
"""Fix this `oppo` flux insanity
someone added this in the nominal flux calculation that
oppo flux is nue flux if flavour is nuebar, and vice versa
here we revert that, incase these oppo keys are there
"""
for key, val in input_data.items():
if "neutrino_oppo_nue_flux" not in val:
continue
logging.warning(
'renaming the outdated "oppo" flux keys in "%s", in the future do'
" not use those anymore",
key,
)
if "bar" in key:
for new, old in OPPO_FLUX_LEGACY_FIX_MAPPING_NUBAR.items() :
val[new] = val.pop(old)
else:
for new, old in OPPO_FLUX_LEGACY_FIX_MAPPING_NU.items() :
val[new] = val.pop(old)
[docs]
def main():
"""Load an events file and print the contents"""
parser = argparse.ArgumentParser(description="Events parsing")
parser.add_argument(
"--neutrinos",
action="store_true",
help="Treat input file as if it contains neutrino MC",
)
parser.add_argument(
"-i", "--input-file", type=str, required=True, help="Input HDF5 events file"
)
args = parser.parse_args()
events = EventsPi(neutrinos=args.neutrinos)
events.load_events_file(args.input_file)
logging.info("Loaded events from : %s", args.input_file)
print("Metadata:")
print(events.metadata)
print(events)
if __name__ == "__main__":
main()