#! /usr/bin/env python
"""
Implementation of the Pipeline object, and a simple script to instantiate and
run a pipeline (the outputs of which can be plotted and stored to disk).
"""
from __future__ import absolute_import
from argparse import ArgumentParser
from collections import OrderedDict
from collections.abc import Mapping
from configparser import NoSectionError
from copy import deepcopy
from importlib import import_module
from itertools import product
from inspect import getsource
import os
from tabulate import tabulate
from time import time
import traceback
import numpy as np
from pisa import ureg
from pisa.core.events import Data
from pisa.core.map import Map, MapSet
from pisa.core.param import ParamSet, DerivedParam
from pisa.core.stage import Stage
from pisa.core.container import Container, ContainerSet
from pisa.core.binning import MultiDimBinning, OneDimBinning, VarBinning
from pisa.utils.config_parser import PISAConfigParser, parse_pipeline_config
from pisa.utils.fileio import mkdir
from pisa.utils.format import format_times
from pisa.utils.hash import hash_obj
from pisa.utils.log import logging, set_verbosity
from pisa.utils.profiler import profile
__all__ = ["Pipeline", "test_Pipeline", "parse_args", "main"]
__author__ = "J.L. Lanfranchi, P. Eller"
__license__ = """Copyright (c) 2014-2025, The IceCube Collaboration
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License."""
# TODO: should we check that the output binning of a previous stage produces
# the inputs required by the current stage, or that the aggregate outputs that
# got produced by previous stages (less those that got consumed in other
# previous stages) hold what the current stage requires for inputs... or
# should we not assume either will check out, since it's possible that the
# stage requires sideband objects that are to be introduced at the top of the
# pipeline by the user (and so there's no way to verify that all inputs are
# present until we see what the user hands the pipeline as its top-level
# input)? Alternatively, the lack of apparent inputs for a stage could show
# a warning message. Or we just wait to see if it fails when the user runs the
# code.
[docs]
class Pipeline():
"""Instantiate stages according to a parsed config object; excecute
stages.
Parameters
----------
config : string, OrderedDict, or PISAConfigParser
If string, interpret as resource location; send to the
`config_parser.parse_pipeline_config()` method to get a config
OrderedDict. If `OrderedDict`, use directly as pipeline configuration.
profile : bool
Perform timings
"""
def __init__(self, config, profile=False):
if isinstance(config, (str, PISAConfigParser)):
config = parse_pipeline_config(config=config)
elif isinstance(config, OrderedDict):
pass
else:
raise TypeError(
"`config` passed is of type %s but must be string,"
" PISAConfigParser, or OrderedDict" % type(config).__name__
)
self.pisa_version = None
self.name = config['pipeline']['name']
self.data = ContainerSet(self.name)
self.detector_name = config['pipeline']['detector_name']
self._output_binning = config['pipeline']['output_binning']
self.output_key = config['pipeline']['output_key']
self._profile = profile
self._setup_times = []
self._run_times = []
self._get_outputs_times = []
self._stages = []
self._config = config
self._init_stages()
self._apply_modes = [s.apply_mode for s in self.stages]
self._source_code_hash = None
if isinstance(self._output_binning, VarBinning):
self.assert_varbinning_compat()
self.assert_exclusive_varbinning()
else:
self.assert_apply_modes_consistency()
# check in case someone decided to add a non-daemonflux parameter with daemon_
# in it, which would potentially make penalty calculation incorrect
if "daemon_chi2" in self.params.names:
num_daemon_params = len([name for name in self.params.names if "daemon_" in name])
assert num_daemon_params == self.params["daemon_params_len"].value.m_as("dimensionless"), \
'Incorrect number of parameters with "daemon_" in their name detected. Non-daemonflux parameters can not have "daemon_" in their name. Rename your non-daemonflux parameters which do not comly!'
self._covariance_set = False
def __repr__(self):
return self.tabulate(tablefmt="presto")
def _repr_html_(self):
return self.tabulate(tablefmt="html")
[docs]
def tabulate(self, tablefmt="plain"):
headers = ['stage number', 'name', 'calc_mode', 'apply_mode', 'has setup', 'has compute', 'has apply', '# fixed params', '# free params']
colalign=["right"] + ["center"] * (len(headers) -1 )
table = []
for i, s in enumerate(self.stages):
table.append([i, s.__class__.__name__, s.calc_mode, s.apply_mode])
table[-1].append(s.setup_function.__func__.__module__ == s.__class__.__module__)
table[-1].append(s.compute_function.__func__.__module__ == s.__class__.__module__)
table[-1].append(s.apply_function.__func__.__module__ == s.__class__.__module__)
table[-1] += [len(s.params.fixed), len(s.params.free)]
return tabulate(table, headers, tablefmt=tablefmt, colalign=colalign)
[docs]
def report_profile(self, detailed=False, format_num_kwargs=None):
"""Report timing information on pipeline and contained services
Parameters
----------
detailed : bool, default False
Whether to increase level of detail
format_num_kwargs : dict, optional
Dictionary containing arguments passed to `utils.format.format_num`.
Will display each number with three decimal digits by default.
"""
if not self.profile:
# Report warning only at the pipeline level, which is what the
# typical user should come across. Assume that users calling
# `report_profile` on a `Stage` instance directly know what they're
# doing.
logging.warn(
'`profile` is set to False. Results may not show the expected '
'numbers of function calls.'
)
if format_num_kwargs is None:
format_num_kwargs = {
'precision': 1e-3, 'fmt': 'full', 'trailing_zeros': True
}
assert isinstance(format_num_kwargs, Mapping)
print(f'Pipeline: {self.name}')
for func_str, times in [
('- setup: ', self._setup_times),
('- run: ', self._run_times),
('- get_outputs: ', self._get_outputs_times)
]:
print(func_str,
format_times(times=times,
nindent_detailed=len(func_str) + 1,
detailed=detailed, **format_num_kwargs)
)
print('Individual services:')
for stage in self.stages:
stage.report_profile(detailed=detailed, **format_num_kwargs)
@property
def profile(self):
return self._profile
@profile.setter
def profile(self, value):
for stage in self.stages:
stage.profile = value
self._profile = value
[docs]
def index(self, stage_id):
"""Return the index in the pipeline of `stage_id`.
Parameters
----------
stage_id : string or int
Name of the stage, or stage number (0-indexed)
Returns
-------
idx : integer stage number (0-indexed)
Raises
------
ValueError : if `stage_id` not in pipeline.
"""
assert isinstance(stage_id, (int, str))
for stage_num, stage in enumerate(self):
if stage_id in [stage_num, stage.stage_name]:
return stage_num
raise ValueError('No stage "%s" found in the pipeline.' % stage_id)
def __len__(self):
return len(self._stages)
def __iter__(self):
return iter(self._stages)
def __getitem__(self, idx):
if isinstance(idx, str):
return self.stages[self.index(idx)]
if isinstance(idx, (int, slice)):
return self.stages[idx]
raise ValueError(
'Cannot locate stage "%s" in pipeline. Stages'
" available are %s." % (idx, self.stage_names)
)
def __getattr__(self, attr):
for stage in self:
if stage.stage_name == attr:
return stage
raise AttributeError(
'"%s" is neither a stage in this pipeline nor an attribute/property'
" of the `Pipeline` object." % attr
)
def _init_stages(self):
"""Stage factory: Instantiate stages specified by self.config.
Conventions required for this to work:
* Stage and service names must be lower-case
* Service implementations must be found at Python path
`pisa.stages.<stage_name>.<service_name>`
* `service` cannot be an instantiation argument for a service
"""
stages = []
for stage_num, item in enumerate(
self.config.items()
):
try:
name, settings = item
if isinstance(name, str):
if name == 'pipeline':
continue
stage_name, service_name = name
# old cfgs compatibility
if service_name.startswith('pi_'):
logging.warning(f"Old stage name `{service_name}` is automatically renamed to `{service_name.replace('pi_', '')}`. " +
"Please change your config in the future!")
service_name = service_name.replace('pi_', '')
logging.debug(
"instantiating stage %s / service %s", stage_name, service_name
)
# Import service's module
logging.trace(f"Importing service module: {stage_name}.{service_name}")
try:
module_path = f"pisa.stages.{stage_name}.{service_name}"
module = import_module(module_path)
except:
logging.debug(
f"Module {stage_name}.{service_name} not found in PISA, trying "
"to import from external definition."
)
module_path = f"{stage_name}.{service_name}"
module = import_module(module_path)
# Get service class from module
service_cls = getattr(module, service_name)
# Instantiate service
logging.trace(
"initializing stage.service %s.%s with settings %s"
% (stage_name, service_name, settings)
)
try:
service = service_cls(**settings, profile=self._profile)
except Exception:
logging.error(
"Failed to instantiate stage.service %s.%s with settings %s",
stage_name,
service_name,
settings.keys(),
)
raise
if not isinstance(service, Stage):
raise TypeError(
'Trying to create service "%s" for stage #%d (%s),'
" but object %s instantiated from class %s is not a"
" PISA Stage type but instead is of type %s."
% (
service_name,
stage_num,
stage_name,
service,
service_cls,
type(service),
)
)
stages.append(service)
except:
logging.error(
"Failed to initialize stage #%d (stage=%s, service=%s).",
stage_num,
stage_name,
service_name,
)
raise
# set parameters with an identical name to the same object
# otherwise we get inconsistent behaviour when setting repeated params
# See Isues #566 and #648
all_parans = self.params
self.update_params(all_parans, existing_must_match=True, extend=False)
param_selections = set()
for service in stages:
param_selections.update(service.param_selections)
param_selections = sorted(param_selections)
for stage in stages:
stage.select_params(param_selections, error_on_missing=False)
self._stages = stages
self.setup()
[docs]
def get_outputs(self, **get_outputs_kwargs):
"""Wrapper around `_get_outputs`. The latter might
have quite some overhead compared to `run` alone"""
if self.profile:
start_t = time()
outputs = self._get_outputs(**get_outputs_kwargs)
end_t = time()
self._get_outputs_times.append(end_t - start_t)
else:
outputs = self._get_outputs(**get_outputs_kwargs)
return outputs
def _get_outputs_multdimbinning(self, output_binning, output_key):
"""Logic that produces a single `MapSet` when the pipeline's
output binning is a regular `MultiDimBinning`.
Returns
-------
outputs : MapSet
"""
self.data.representation = output_binning
if isinstance(output_key, tuple):
assert len(output_key) == 2
outputs = self.data.get_mapset(output_key[0], error=output_key[1])
else:
outputs = self.data.get_mapset(output_key)
return outputs
def _get_outputs_varbinning(self, output_binning, output_key):
"""Logic that produces multiple `MapSet`s when the pipeline's
output binning is a `VarBinning`.
Returns
-------
outputs : list of MapSet
"""
assert self.data.representation == "events"
outputs = []
selections = output_binning.selections
for i in range(output_binning.nselections):
# there will be a new ContainerSet created for each selection
containers = []
for c in self.data.containers:
cc = Container(name=c.name)
# Find the events that belong to the given selection, depending on
# type of selection.
# TODO: consider optimisations such as caching these masks?
if isinstance(selections, list):
keep = c.get_keep_mask(selections[i])
else:
assert isinstance(selections, OneDimBinning)
cut_var = c[selections.name]
# cut on bin edges
keep = (cut_var >= selections.edge_magnitudes[i]) & (cut_var < selections.edge_magnitudes[i+1])
for var_name in output_binning.binnings[i].names:
# Store the selected var_name entries (corresponding to the
# dimensions in which the data for this selection will be
# binned) in the fresh Container
cc[var_name] = c[var_name][keep]
# store the quantities that will populate each bin
if isinstance(output_key, tuple):
assert len(output_key) == 2
cc[output_key[0]] = c[output_key[0]][keep]
cc.tranlation_modes[output_key[0]] = 'sum'
cc[output_key[1]] = np.square(c[output_key[0]][keep])
cc.tranlation_modes[output_key[1]] = 'sum'
else:
cc[output_key] = c[output_key][keep]
cc.tranlation_modes[output_key] = 'sum'
containers.append(cc)
dat = ContainerSet(
name=self.data.name,
containers=containers,
representation=output_binning.binnings[i],
)
if isinstance(output_key, tuple):
for c in dat.containers:
# uncertainties
c[output_key[1]] = np.sqrt(c[output_key[1]])
outputs.append(dat.get_mapset(output_key[0], error=output_key[1]))
else:
outputs.append(dat.get_mapset(output_key))
return outputs
def _get_outputs(self, output_binning=None, output_key=None):
"""Get MapSet output"""
original_binning = None
if output_binning is None:
self.run()
output_binning = self.output_binning
elif isinstance(output_binning, MultiDimBinning):
original_binning = self.output_binning
self.output_binning = output_binning
self.run()
elif isinstance(output_binning, VarBinning):
# Only have to check exclusivity in case external output binning
# is requested
self.assert_exclusive_varbinning(output_binning=output_binning)
if output_key is None:
output_key = self.output_key
assert(isinstance(output_binning, (MultiDimBinning, VarBinning)))
if isinstance(output_binning, MultiDimBinning):
outputs = self._get_outputs_multdimbinning(output_binning, output_key)
elif isinstance(output_binning, VarBinning):
# Any contained stages' apply_modes could have changed, whether
# an external output binning is specified here or not
self.assert_varbinning_compat()
outputs = self._get_outputs_varbinning(output_binning, output_key)
if original_binning is not None:
self.output_binning = original_binning
return outputs
[docs]
def add_covariance(self, covmat):
"""
Incorporates covariance between parameters.
This is done by replacing relevant correlated parameters with "DerivedParams"
that depend on new parameters in an uncorrelated basis
The parameters are all updated, but this doesn't add the new parameters in
So we go to the first stage we find that has one of the original parameters and manually add this in
"""
if self._covariance_set:
logging.warn("Tried to add covariance matrix while one is already here.")
logging.fatal("Add larger covariance matrix rather than calling this multiple times")
paramset = self.params
paramset.add_covariance(covmat)
self._covariance_set = True
# this should go and replace existing stage parameters with the new ones
self.update_params(paramset)
self._add_rotated(paramset)
def _add_rotated(self, paramset:ParamSet, suppress_warning=False):
"""
Used to manually add in the new, rotated parameters
"""
# now we need to add in the new, rotated, parameters
# we want to add the new rotated parameters into a stage that had the correlated parameter
# it doesn't really matter where these uncorrelated parameters go, all stages
# that need to are already using those Derived Params
derived_name = ""
for param in paramset:
if isinstance(param, DerivedParam):
derived_name = param.name
depends = param.dependson
break
if len(depends.keys())==0:
if not suppress_warning:
logging.warn("Added covariance matrix but found no Derived Params")
return False
success = True
# now we find where a derived parameter lives
for stage in self.stages:
included = stage._param_selector.params.names
if derived_name in included:
success = True
# TODO incorporate selector !!
stage._param_selector.update(paramset)
return success
[docs]
def run(self):
"""Wrapper around `_run_function`"""
apply_modes = [s.apply_mode for s in self.stages]
if apply_modes != self._apply_modes:
# possible that stage apply_modes got manipulated in between runs
self.assert_apply_modes_consistency()
if self.profile:
start_t = time()
self._run_function()
end_t = time()
self._run_times.append(end_t - start_t)
else:
self._run_function()
# record apply_modes for this run so we can detect potential changes
self._apply_modes = apply_modes
def _run_function(self):
"""Run the pipeline to compute"""
for stage in self.stages:
logging.debug(f"Working on stage {stage.stage_name}.{stage.service_name}")
stage.run()
[docs]
def setup(self):
"""Wrapper around `_setup_function`"""
if self.profile:
start_t = time()
self._setup_function()
end_t = time()
self._setup_times.append(end_t - start_t)
else:
self._setup_function()
def _setup_function(self):
"""Setup (reset) all stages"""
self.data = ContainerSet(self.name)
for stage in self.stages:
stage.data = self.data
stage.setup()
[docs]
def update_params(self, params, existing_must_match=False, extend=False):
"""Update params for the pipeline.
Note that any param in `params` in excess of those that already exist
in the pipeline's stages will have no effect.
Parameters
----------
params : ParamSet
Parameters to be updated
existing_must_match : bool
extend : bool
"""
for stage in self:
stage._param_selector.update(params, existing_must_match=existing_must_match, extend=extend)
#stage.params.update(params, existing_must_match=existing_must_match, extend=extend)
[docs]
def select_params(self, selections, error_on_missing=False):
"""Select a set of alternate param values/specifications.
Parameters
-----------
selections : string or iterable of strings
error_on_missing : bool
Raises
------
KeyError if `error_on_missing` is `True` and any of `selections` does
not exist in any stage in the pipeline.
"""
successes = 0
for stage in self:
try:
stage.select_params(selections, error_on_missing=True)
except KeyError:
pass
else:
successes += 1
if error_on_missing and successes == 0:
raise KeyError(
"None of the stages in this pipeline has all of the"
" selections %s available." % (selections,)
)
@property
def params(self):
"""pisa.core.param.ParamSet : pipeline's parameters"""
params = ParamSet()
for stage in self:
params.extend(stage.params)
return params
@property
def param_selections(self):
"""list of strings : param selections collected from all stages"""
selections = set()
for stage in self:
selections.update(stage.param_selections)
return sorted(selections)
@property
def stages(self)->'list[Stage]':
"""list of Stage : stages in the pipeline"""
return [s for s in self]
@property
def stage_names(self):
"""list of strings : names of stages in the pipeline"""
return [s.stage_name for s in self]
@property
def service_names(self):
"""list of strings : names of services in the pipeline"""
return [s.service_name for s in self]
@property
def config(self):
"""Deepcopy of the OrderedDict used to instantiate the pipeline"""
return deepcopy(self._config)
@property
def source_code_hash(self):
"""Hash for the source code of this object's class.
Not meant to be perfect, but should suffice for tracking provenance of
an object stored to disk that were produced by a Stage.
"""
if self._source_code_hash is None:
self._source_code_hash = hash_obj(getsource(self.__class__))
return self._source_code_hash
@property
def hash(self):
"""int : Hash of the state of the pipeline. This hashes together a hash
of the Pipeline class's source code and a hash of the state of each
contained stage."""
return hash_obj([self.source_code_hash] + [stage.hash for stage in self])
def __hash__(self):
return self.hash
[docs]
def assert_apply_modes_consistency(self):
"""Asserts that pipeline setup does not result in non-sensical
transformations between Maps or from Maps to events
(cf. Container.translate: rebinning without averaging is not
implemented because of lacking benefits, and similarly going
from a histogram to events).
"""
ref_binning = None
ref_name = None
for s in self.stages:
# it suffices to start from the first occurrence of a MultiDimBinning
if isinstance(s.apply_mode, MultiDimBinning) and ref_binning is None:
ref_binning = s.apply_mode
ref_name = f"{s.stage_name}.{s.service_name}"
elif ref_binning is not None and s.apply_mode != ref_binning:
raise ValueError(
f"Stage {s.stage_name}.{s.service_name} has '{s.apply_mode}'"
" as apply_mode, which deviates from a previously detected "
f"MultiDimBinning apply_mode, of stage {ref_name}. This "
"configuration would result in an unreliable pipeline output."
)
[docs]
def assert_varbinning_compat(self):
"""Asserts that pipeline setup is compatible with `VarBinning`:
all stages need to apply to events (this precludes use with
any histogramming service, which requires a binning as apply_mode).
Raises
------
ValueError
if at least one stage has apply_mode!='events'
"""
incompat = []
for s in self.stages:
if not s.apply_mode == 'events':
incompat.append(s)
if len(incompat) >= 1:
str_incompat = ", ".join(
[f"{stage.stage_name}.{stage.service_name}" for stage in incompat]
)
raise ValueError(
"When a variable binning is used, all stages need to set "
f"apply_mode='events', but '{str_incompat}' of '{self.name}' "
"do(es) not!"
)
[docs]
def assert_exclusive_varbinning(self, output_binning=None):
"""Assert that `VarBinning` selections are mutually exclusive.
This is done individually for each `Container` in `self.data`.
Parameters
-----------
output_binning : None, MultiDimBinning, VarBinning
Raises
------
ValueError
if a `VarBinning` is tested and at least two selections
(if applicable) are not mutually exclusive
"""
if output_binning is None:
selections = self.output_binning.selections
nselections = self.output_binning.nselections
else:
selections = output_binning.selections
nselections = output_binning.nselections
if isinstance(selections, list):
# list of selection-criteria strings
# perform and report sanity checks on selected event counts
# total count per selection across all containers
tot_sel_counts = {sel: 0 for sel in selections}
for c in self.data:
keep = np.zeros(c.size)
# Looping over all selections for a fixed container is
# sufficient to detect overlaps
for sel in selections:
keep_mask = c.get_keep_mask(sel)
keep += keep_mask
# number of events selected from this container
sel_count = np.sum(keep_mask)
logging.debug(f"'{c.name}' selected by '{sel}': {sel_count}")
tot_sel_counts[sel] += sel_count
if not np.all(keep <= 1):
raise ValueError(
f"Selections {selections} are not mutually exclusive for "
f"'{c.name}' (at least) in pipeline '{self.name}'!"
)
# Warn on empty selections (don't assume that this must be an error)
empty_sels = [sel for sel in selections if tot_sel_counts[sel] == 0]
if empty_sels:
empty_sels_str = ", ".join(empty_sels)
logging.warning(
f"There are empty selections in pipeline '{self.name}': "
f"'{empty_sels_str}'"
)
@property
def output_binning(self):
return self._output_binning
@output_binning.setter
def output_binning(self, binning):
if isinstance(binning, VarBinning):
self.assert_varbinning_compat()
self.assert_exclusive_varbinning(output_binning=binning)
self._output_binning = binning
if isinstance(binning, MultiDimBinning):
for s in self.stages:
if isinstance(s.apply_mode, MultiDimBinning):
s.apply_mode = binning
self.setup()
[docs]
def test_Pipeline():
"""Unit tests for Pipeline class"""
# TODO: make a test config file with hierarchy AND material selector,
# uncomment / add in tests commented / removed below
#
# Test: select_params and param_selections
#
hierarchies = ["nh", "ih"]
#materials = ["iron", "pyrolite"]
materials = []
t23 = dict(ih=49.5 * ureg.deg, nh=42.3 * ureg.deg)
#YeO = dict(iron=0.4656, pyrolite=0.4957)
# Instantiate with two pipelines: first has both nh/ih and iron/pyrolite
# param selectors, while the second only has nh/ih param selectors.
pipeline = Pipeline("settings/pipeline/example.cfg") # pylint: disable=redefined-outer-name
#current_mat = "iron"
current_hier = "nh"
for new_hier, new_mat in product(hierarchies, materials):
#_ = YeO[new_mat]
assert pipeline.param_selections == sorted([current_hier]), str(
pipeline.param_selections
)
assert pipeline.params.theta23.value == t23[current_hier], str(
pipeline.params.theta23
)
#assert pipeline.params.YeO.value == YeO[current_mat], str(pipeline.params.YeO)
# Select just the hierarchy
pipeline.select_params(new_hier)
assert pipeline.param_selections == sorted([new_hier]), str(
pipeline.param_selections
)
assert pipeline.params.theta23.value == t23[new_hier], str(
pipeline.params.theta23
)
#assert pipeline.params.YeO.value == YeO[current_mat], str(pipeline.params.YeO)
## Select just the material
#pipeline.select_params(new_mat)
#assert pipeline.param_selections == sorted([new_hier, new_mat]), str(
# pipeline.param_selections
#)
assert pipeline.params.theta23.value == t23[new_hier], str(
pipeline.params.theta23
)
#assert pipeline.params.YeO.value == YeO[new_mat], str(pipeline.params.YeO)
# Reset both to "current"
pipeline.select_params([current_hier])
assert pipeline.param_selections == sorted([current_hier]), str(
pipeline.param_selections
)
assert pipeline.params.theta23.value == t23[current_hier], str(
pipeline.params.theta23
)
#assert pipeline.params.YeO.value == YeO[current_mat], str(pipeline.params.YeO)
## Select both hierarchy and material
#pipeline.select_params([new_hier])
#assert pipeline.param_selections == sorted([new_hier, new_mat]), str(
# pipeline.param_selections
#)
#assert pipeline.params.theta23.value == t23[new_hier], str(
# pipeline.params.theta23
#)
#assert pipeline.params.YeO.value == YeO[new_mat], str(pipeline.params.YeO)
#current_hier = new_hier
#current_mat = new_mat
#
# Test: detection of inconsistent apply_mode combinations
#
binned_apply_mode = pipeline.output_binning
assert isinstance(binned_apply_mode, MultiDimBinning)
pipeline.stages[1].apply_mode = binned_apply_mode
try:
out = pipeline.get_outputs()
except ValueError:
# Needs to fail: going from a binned output (after flux) to events
# (after osc.)
pass
else:
assert False
#
# Test: passing a custom output binning to get_outputs
#
pipeline.stages[1].apply_mode = "events"
# first get the original event distribution as reference
out = pipeline.get_outputs()
counts_tot = sum(out.num_entries.values())
# use an oversampled output binning instead and ensure identical total count
out2 = pipeline.get_outputs(output_binning=pipeline.output_binning.oversample(2))
counts_tot2 = sum(out2.num_entries.values())
assert np.isclose(counts_tot2, counts_tot)
#
# Test: a pipeline using a VarBinning
#
p = Pipeline("settings/pipeline/varbin_example.cfg")
out = p.get_outputs()
# a split into two event selections has to result in two MapSets
assert len(out) == 2
# a binned apply_mode has to result in a ValueError
# first get a pre-existing binning
binned_calc_mode = p.stages[2].calc_mode
assert isinstance(binned_calc_mode, MultiDimBinning)
p.stages[2].apply_mode = binned_calc_mode
try:
out = p.get_outputs()
except ValueError:
pass
else:
assert False
# also verify that pipeline correctly detects non-exclusive selections
p.stages[2].apply_mode = "events"
vb = p.output_binning
# define a selection string which will simply be repeated
assert vb.nselections > 1
sel = ["pid > 0"] * vb.nselections
invalid_vb = VarBinning(binnings=vb.binnings, selections=sel)
try:
p.output_binning = invalid_vb
except ValueError:
pass
else:
assert False
# pipeline should accept any empty selections
sel[-1] = "pid > np.inf"
invalid_vb = VarBinning(binnings=vb.binnings, selections=sel)
p.output_binning = invalid_vb
# ----- Most of this below cang go (?) ---
[docs]
def parse_args():
"""Parse command line arguments if `pipeline.py` is called as a script."""
parser = ArgumentParser(
# formatter_class=ArgumentDefaultsHelpFormatter,
description="""Instantiate and run a pipeline from a config file.
Optionally store the resulting distribution(s) and plot(s) to disk."""
)
required = parser.add_argument_group("required arguments")
required.add_argument(
"-p",
"--pipeline",
metavar="CONFIGFILE",
type=str,
required=True,
help="File containing settings for the pipeline.",
)
parser.add_argument(
"-a",
"--arg",
metavar="SECTION ARG=VAL",
nargs="+",
action="append",
help="""Set config arg(s) manually. SECTION can be e.g.
"stage:<stage_name>" (like "stage:flux", "stage:reco", etc.),
"pipeline", and so forth. Arg values specified here take precedence
over those in the config file, but note that the sections specified
must already exist in the config file.""",
)
parser.add_argument(
"--select",
metavar="PARAM_SELECTIONS",
nargs="+",
default=None,
help="""Param selectors (separated by spaces) to use to override any
defaults in the config file.""",
)
parser.add_argument(
"--inputs",
metavar="FILE",
type=str,
help="""File from which to read inputs to be fed to the pipeline.""",
)
parser.add_argument(
"--only-stage",
metavar="STAGE",
type=str,
help="""Test stage: Instantiate a single stage in the pipeline
specification and run it in isolation (as the sole stage in a
pipeline). If it is a stage that requires inputs, these can be
specified with the --infile argument, or else dummy stage input maps
(numpy.ones(...), matching the input binning specification) are
generated for testing purposes.""",
)
parser.add_argument(
"--stop-after-stage",
metavar="STAGE",
help="""Instantiate a pipeline up to and including STAGE, but stop
there. Can specify a stage by index in the pipeline config (e.g., 0, 1,
etc.) or name (e.g., flux, osc, etc.)""",
)
parser.add_argument(
"--outdir",
metavar="DIR",
type=str,
help="""Store all output files (data and plots) to this directory.
Directory will be created (including missing parent directories) if it
does not exist already. If no dir is provided, no outputs will be
saved.""",
)
parser.add_argument(
"--intermediate",
action="store_true",
help="""Store all intermediate outputs, not just the final stage's
outputs.""",
)
parser.add_argument("--pdf", action="store_true", help="""Produce pdf plot(s).""")
parser.add_argument("--png", action="store_true", help="""Produce png plot(s).""")
parser.add_argument(
"--annotate", action="store_true", help="""Annotate plots with counts per bin"""
)
parser.add_argument(
"-v",
action="count",
default=None,
help="""Set verbosity level. Repeat for increased verbosity. -v is
info-level, -vv is debug-level and -vvv is trace-level output.""",
)
args = parser.parse_args()
return args
[docs]
def main(return_outputs=False):
"""Main; call as script with `return_outputs=False` or interactively with
`return_outputs=True`
FIXME: This is broken in various ways (easiest fix:
pipeline.get_outputs() has no idx parameter anymore)
"""
from pisa.utils.plotter import Plotter
args = parse_args()
set_verbosity(args.v)
# Even if user specifies an integer on command line, it comes in as a
# string. Try to convert to int (e.g. if `'1'` is passed to indicate the
# second stage), and -- if successful -- use this as `args.only_stage`.
# Otherwise, the string value passed will be used (e.g. `'osc'` could be
# passed).
try:
only_stage_int = int(args.only_stage)
except (ValueError, TypeError):
pass
else:
args.only_stage = only_stage_int
if args.outdir:
mkdir(args.outdir)
else:
if args.pdf or args.png:
raise ValueError("No --outdir provided, so cannot save images.")
# Most basic parsing of the pipeline config (parsing only to this level
# allows for simple strings to be specified as args for updating)
bcp = PISAConfigParser()
bcp.read(args.pipeline)
# Update the config with any args specified on command line
if args.arg is not None:
for arg_list in args.arg:
if len(arg_list) < 2:
raise ValueError(
'Args must be formatted as: "section arg=val". Got "%s"'
" instead." % " ".join(arg_list)
)
section = arg_list[0]
remainder = " ".join(arg_list[1:])
eq_split = remainder.split("=")
newarg = eq_split[0].strip()
value = ("=".join(eq_split[1:])).strip()
logging.debug(
'Setting config section "%s" arg "%s" = "%s"', section, newarg, value
)
try:
bcp.set(section, newarg, value)
except NoSectionError:
logging.error(
'Invalid section "%s" specified. Must be one of %s',
section,
bcp.sections(),
)
raise
# Instantiate the pipeline
pipeline = Pipeline(bcp) # pylint: disable=redefined-outer-name
if args.select is not None:
pipeline.select_params(args.select, error_on_missing=True)
if args.only_stage is None:
stop_idx = args.stop_after_stage
try:
stop_idx = int(stop_idx)
except (TypeError, ValueError):
pass
if isinstance(stop_idx, str):
stop_idx = pipeline.index(stop_idx)
outputs = pipeline.get_outputs( # pylint: disable=redefined-outer-name
idx=stop_idx
)
if stop_idx is not None:
stop_idx += 1
indices = slice(0, stop_idx)
else:
assert args.stop_after_stage is None
idx = pipeline.index(args.only_stage)
stage = pipeline[idx]
indices = slice(idx, idx + 1)
# Create dummy inputs if necessary
inputs = None
if hasattr(stage, "input_binning"):
logging.warning(
"Stage requires input, so building dummy"
" inputs of random numbers, with random state set to the input"
" index according to alphabetical ordering of input names and"
" filled in alphabetical ordering of dimension names."
)
input_maps = []
tmp = deepcopy(stage.input_binning)
alphabetical_binning = tmp.reorder_dimensions(sorted(tmp.names))
for input_num, input_name in enumerate(sorted(stage.input_names)):
# Create a new map with all 3's; name according to the input
hist = np.full(shape=alphabetical_binning.shape, fill_value=3.0)
input_map = Map(
name=input_name, binning=alphabetical_binning, hist=hist
)
# Apply Poisson fluctuations to randomize the values in the map
input_map.fluctuate(method="poisson", random_state=input_num)
# Reorder dimensions according to user's original binning spec
input_map.reorder_dimensions(stage.input_binning)
input_maps.append(input_map)
inputs = MapSet(maps=input_maps, name="ones", hash=1)
outputs = stage.run(inputs=inputs)
for stage in pipeline[indices]:
if not args.outdir:
break
stg_svc = stage.stage_name + "__" + stage.service_name
fbase = os.path.join(args.outdir, stg_svc)
if args.intermediate or stage == pipeline[indices][-1]:
stage.outputs.to_json(fbase + "__output.json.bz2")
# also only plot if args intermediate or last stage
if args.intermediate or stage == pipeline[indices][-1]:
formats = OrderedDict(png=args.png, pdf=args.pdf)
if isinstance(stage.outputs, Data):
# TODO(shivesh): plots made here will use the most recent
# "pisa_weight" column and so all stages will have identical plots
# (one workaround is to turn on "memcache_deepcopy")
# TODO(shivesh): intermediate stages have no output binning
if stage.output_binning is None:
logging.debug("Skipping plot of intermediate stage %s", stage)
continue
outputs = stage.outputs.histogram_set(
binning=stage.output_binning,
nu_weights_col="pisa_weight",
mu_weights_col="pisa_weight",
noise_weights_col="pisa_weight",
mapset_name=stg_svc,
errors=True,
)
try:
for fmt, enabled in formats.items():
if not enabled:
continue
my_plotter = Plotter(
stamp="Event rate",
outdir=args.outdir,
fmt=fmt,
log=False,
annotate=args.annotate,
)
my_plotter.ratio = True
my_plotter.plot_2d_array(
outputs, fname=stg_svc + "__output", cmap="RdBu"
)
except ValueError as exc:
logging.error(
"Failed to save plot to format %s. See exception" " message below",
fmt,
)
traceback.format_exc()
logging.exception(exc)
logging.warning("I can't go on, I'll go on.")
if return_outputs:
return pipeline, outputs
if __name__ == "__main__":
pipeline, outp = main(return_outputs=True)