Source code for pisa.scripts.add_flux_to_events_file

#! /usr/bin/env python

"""
Add neutrino fluxes for each event.
"""


from __future__ import absolute_import, division, print_function

from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
import glob
from os import listdir
from os.path import basename, dirname, isdir, isfile, join, splitext

from pisa.utils.fileio import from_file, to_file, mkdir, nsort
from pisa.utils.flux_weights import load_2d_table, calculate_2d_flux_weights
from pisa.utils.hdf import HDF5_EXTS
from pisa.utils.log import logging, set_verbosity
from pisa.utils.resources import find_resource


__all__ = ['add_fluxes_to_file', 'main']

__license__ = '''Copyright (c) 2014-2025, The IceCube Collaboration

 Licensed under the Apache License, Version 2.0 (the "License");
 you may not use this file except in compliance with the License.
 You may obtain a copy of the License at

   http://www.apache.org/licenses/LICENSE-2.0

 Unless required by applicable law or agreed to in writing, software
 distributed under the License is distributed on an "AS IS" BASIS,
 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 limitations under the License.'''

[docs] def add_fluxes_to_file(data_file_path, flux_table, flux_name, outdir=None, label=None, overwrite=False): """Add fluxes to PISA events file (e.g. for use by an mc stage) Parameters ----------- data_file_path : str path to events file without fluxes flux_table : str path to flux table, e.g. Honda 2d flux_name : str will be part of flux entry, e.g. "nominal": "nominal_nue_flux" outdir : str or None If None, output is to the same directory as `data_file_path` overwrite : bool, optional """ data = from_file(find_resource(data_file_path)) bname, ext = splitext(basename(data_file_path)) assert ext.lstrip('.') in HDF5_EXTS if outdir is None: outdir = dirname(data_file_path) if label is None: label = '' else: assert isinstance(label, str) label = '_' + label outpath = join(outdir, f'{bname}__with_fluxes{label}{ext}') if not overwrite and isfile(outpath): logging.warning('Output path "%s" already exists, not regenerating', outpath) return mkdir(outdir, warn=False) # Loop over the top-level keys for primary, primary_node in data.items(): # Only handling neutrnio fluxes here, skip past e.g. muon or noise MC events if primary.startswith("nu") : logging.info('Adding fluxes to "%s" events', primary) # Input data may have one layer of hierarchy before the event variables (e.g. [numu_cc]), # or for older files there maybe be a second layer (e.g. [numu][cc]). # Handling either case here... if "true_energy" in primary_node : secondary_nodes = [primary_node] else : secondary_nodes = primary_node.values() for secondary_node in secondary_nodes : true_e = secondary_node['true_energy'] true_cz = secondary_node['true_coszen'] # calculate all 4 fluxes (nue, nuebar, numu and numubar) for table in ['nue', 'nuebar', 'numu', 'numubar']: flux = calculate_2d_flux_weights( true_energies=true_e, true_coszens=true_cz, en_splines=flux_table[table] ) keyname = flux_name + '_' + table + '_flux' secondary_node[keyname] = flux to_file(data, outpath, attrs=data.attrs, overwrite=overwrite) logging.info('--> Wrote file including fluxes to "%s"', outpath)
def parse_args(description=__doc__): """Parse command-line arguments""" parser = ArgumentParser(description=description, formatter_class=ArgumentDefaultsHelpFormatter) parser.add_argument( '--input', metavar='(H5_FILE|DIR)', nargs='+', type=str, required=True, help='''Path to a PISA events HDF5 file or a directory containing HDF5 files; output files are copies of this/these, but with flux fields added.''' ) parser.add_argument( '--flux-file', metavar='FLUX_FILE', type=str, required=True, help='''Flux file from which to obtain fluxes, e.g. "flux/honda-2015-spl-solmin-aa.d"''' ) parser.add_argument( '--outdir', metavar='DIR', default=None, help='''Directory to save the output to; if none is provided, output is placed in same dir as --input.''' ) parser.add_argument( '--label', type=str, default=None, help='''Label to give output files. If a label is not specified, default label is the flux file's basename with extension removed.''' ) parser.add_argument( '-v', action='count', default=1, help='''Increase verbosity level _beyond_ INFO level by specifying -v (DEBUG) or -vv (TRACE)''' ) return parser.parse_args()
[docs] def main(): """Run `add_fluxes_to_file` function with arguments from command line""" args = parse_args() set_verbosity(args.v) flux_table = load_2d_table(args.flux_file) flux_file_bname, ext = splitext(basename(args.flux_file)) input_paths = [] for input_path in args.input: if isdir(input_path): for filename in listdir(input_path): filepath = join(input_path, filename) input_paths.append(filepath) else: input_paths += glob.glob(input_path) input_paths = nsort(input_paths) paths_to_process = [] basenames = [] for input_path in input_paths: if isdir(input_path): logging.debug('Path "%s" is a directory, skipping', input_path) continue firstpart, ext = splitext(input_path) if ext.lstrip('.') not in HDF5_EXTS: logging.debug('Path "%s" is not an HDF5 file, skipping', input_path) continue bname = basename(firstpart) if bname in basenames: raise ValueError( f'Found files with duplicate basename "{bname}" (despite files' ' having different paths); resolve the ambiguous names and' ' re-run. Offending files are:\n' f' "{paths_to_process[basenames.index(bname)]}"\n' f' "{input_path}"' ) basenames.append(bname) paths_to_process.append(input_path) logging.info('Will process %d input file(s)...', len(paths_to_process)) for filepath in paths_to_process: logging.info('Working on input file "%s"', filepath) add_fluxes_to_file( data_file_path=filepath, flux_table=flux_table, flux_name='nominal', outdir=args.outdir, label=flux_file_bname )
if __name__ == '__main__': main()