"""
Functions to retrieve the bin index for a 1- to 3-dimensional sample.
Functions were adapted from translation.py
Notes
-----
The binning convention in PISA (from numpy.histogramdd) is that the lower edge
is inclusive and upper edge is exclusive for a given bin, except for the
upper-most bin whose upper edge is also inclusive. Visually, for 1D:
[ bin 0 ) [ bin 1 ) ... [ bin num_bins - 1 ]
First bin is index = 0 and last bin is index = (num_bins - 1)
* Values below the lowermost-edge of any dimension's binning return index = -1
* NaN values return index = -1
* Otherwise, values above the uppermost-edge of any dimension's binning return
index = num_bins
"""
from __future__ import absolute_import, print_function, division
import numpy as np
from numba import guvectorize
from pisa import FTYPE, TARGET
from pisa.core.binning import OneDimBinning, MultiDimBinning
from pisa.core.translation import find_index
from pisa.utils.log import logging, set_verbosity
__all__ = ["lookup_indices", "test_lookup_indices"]
FX = "f4" if FTYPE == np.float32 else "f8"
@guvectorize(
[f"({FX}[:], {FX}[:], i8[:])"],
"(), (j) -> ()",
target=TARGET,
)
def lookup_indices_vectorized_1d(sample_x, bin_edges_x, out):
"""Lookup bin indices for sample_x values, where binning is defined by
`bin_edges_x`."""
out[0] = find_index(sample_x[0], bin_edges_x)
@guvectorize(
[f"({FX}[:], {FX}[:], {FX}[:], {FX}[:], i8[:])"],
"(), (), (a), (b) -> ()",
target=TARGET,
)
def lookup_indices_vectorized_2d(sample_x, sample_y, bin_edges_x, bin_edges_y, out):
"""Same as above, except we get back the index"""
idx_x = find_index(sample_x[0], bin_edges_x)
idx_y = find_index(sample_y[0], bin_edges_y)
n_x_bins = len(bin_edges_x) - 1
n_y_bins = len(bin_edges_y) - 1
n_bins = n_x_bins * n_y_bins
if idx_x == -1 or idx_y == -1:
# any dim underflowed
out[0] = -1
elif idx_x == n_x_bins or idx_y == n_y_bins:
# any dim overflowed
out[0] = n_bins
else:
out[0] = idx_x * n_y_bins + idx_y
@guvectorize(
[f"({FX}[:], {FX}[:], {FX}[:], {FX}[:], {FX}[:], {FX}[:], i8[:])"],
"(), (), (), (a), (b), (c) -> ()",
target=TARGET,
)
def lookup_indices_vectorized_3d(
sample_x, sample_y, sample_z, bin_edges_x, bin_edges_y, bin_edges_z, out
):
"""Vectorized gufunc to perform the lookup"""
idx_x = find_index(sample_x[0], bin_edges_x)
idx_y = find_index(sample_y[0], bin_edges_y)
idx_z = find_index(sample_z[0], bin_edges_z)
n_x_bins = len(bin_edges_x) - 1
n_y_bins = len(bin_edges_y) - 1
n_z_bins = len(bin_edges_z) - 1
n_bins = n_x_bins * n_y_bins * n_z_bins
if idx_x == -1 or idx_y == -1 or idx_z == -1:
# any dim underflowed
out[0] = -1
elif idx_x == n_x_bins or idx_y == n_y_bins or idx_z == n_z_bins:
# any dim overflowed
out[0] = n_bins
else:
out[0] = (idx_x * n_y_bins + idx_y) * n_z_bins + idx_z
[docs]
def lookup_indices(sample, binning):
"""Lookup (flattened) bin index for sample points.
Parameters
----------
sample : length-M_dimensions sequence of length-N_events arrays
All smart arrays must have the same lengths; corresponding elements of
the arrays are the coordinates of an event in the dimensions each array
represents.
binning : pisa.core.binning.MultiDimBinning or convertible thereto
`binning` is passed to instantiate ``MultiDimBinning``, so e.g., a
pisa.core.binning.OneDimBinning is valid to pass as `binning`
Returns
-------
indices : length-N_events arrays
One for each event the index of the histogram in which it falls into
Notes
-----
this method works for 1d, 2d and 3d histogram only
"""
# Convert non-MultiDimBinning objects into MultiDimBinning if possible;
# if this fails, an error will result, as it should
binning = MultiDimBinning(binning)
if len(sample) != binning.num_dims:
raise ValueError(
f"`binning` has {binning.num_dims} dimension(s), but `sample`"
f" contains {len(sample)} arrays (so represents {len(sample)}"
f" dimensions)"
)
lookup_funcs = {
1: lookup_indices_vectorized_1d,
2: lookup_indices_vectorized_2d,
3: lookup_indices_vectorized_3d,
}
if binning.num_dims not in lookup_funcs:
raise NotImplementedError(
"binning must have num_dims in {}; got {}".format(
sorted(lookup_funcs.keys()), binning.num_dims
)
)
lookup_func = lookup_funcs[binning.num_dims]
lookup_func_args = (
[a for a in sample]
+ [dim.edge_magnitudes.astype(FTYPE) for dim in binning]
)
logging.trace("lookup_func_args = {}".format(lookup_func_args))
# Create an array to store the results
indices = np.empty_like(sample[0], dtype=np.int64)
# Perform the lookup
lookup_func(*lookup_func_args, out=indices)
return indices
[docs]
def test_lookup_indices():
"""Unit tests for `lookup_indices` function"""
#
# Test a variety of points.
# Points falling exactly on the bound are included in the
#
n_evts = 100
x = np.array([-5, 0.5, 1.5, 7.0, 6.5, 8.0, 6.5], dtype=FTYPE)
y = np.array([-5, 0.5, 1.5, 1.5, 3.0, 1.5, 2.5], dtype=FTYPE)
z = np.array([-5, 0.5, 1.5, 1.5, 0.5, 6.0, 0.5], dtype=FTYPE)
w = np.ones(n_evts, dtype=FTYPE)
binning_x = OneDimBinning(name="x", num_bins=7, is_lin=True, domain=[0, 7])
binning_y = OneDimBinning(name="y", num_bins=4, is_lin=True, domain=[0, 4])
binning_z = OneDimBinning(name="z", num_bins=2, is_lin=True, domain=[0, 2])
binning_1d = binning_x
binning_2d = binning_x * binning_y
binning_3d = binning_x * binning_y * binning_z
# 1D case: check that each event falls into its predicted bin
#
# All values higher or equal to the last bin edges are assigned an index of zero
#
logging.trace("TEST 1D:")
logging.trace("Total number of bins: {}".format(7))
logging.trace("array in 1D: {}".format(x))
logging.trace("Binning: {}".format(binning_1d.bin_edges[0]))
indices = lookup_indices([x], binning_1d)
logging.trace("indices of each array element: {}".format(indices))
logging.trace("*********************************")
test = indices
ref = np.array([-1, 0, 1, 6, 6, 7, 6])
assert np.array_equal(test, ref), "test={} != ref={}".format(test, ref)
# 2D case:
#
# The binning edges are flattened as follows:
# [(x=0, y=0), (x=0, y=1), (x=1, y=0), ...]
#
logging.trace("TEST 2D:")
logging.trace("Total number of bins: {}".format(7 * 4))
logging.trace("array in 2D: {}".format(list(zip(x, y))))
logging.trace("Binning: {}".format(binning_2d.bin_edges))
indices = lookup_indices([x, y], binning_2d)
logging.trace("indices of each array element: {}".format(indices))
logging.trace("*********************************")
test = indices
ref = np.array([-1, 0, 5, 25, 27, 28, 26])
assert np.array_equal(test, ref), "test={} != ref={}".format(test, ref)
# 3D case:
#
# the binning edges are flattened as follows:
# [(x=0, y=0, z=0), (x=0, y=0, z=1), (x=0, y=1, z=0)...]
#
logging.trace("TEST 3D:")
logging.trace("Total number of bins: {}".format(7 * 4 * 2))
logging.trace("array in 3D: {}".format(list(zip(x, y, z))))
logging.trace("Binning: {}".format(binning_3d.bin_edges))
indices = lookup_indices([x, y, z], binning_3d)
logging.trace("indices of each array element: {}".format(indices))
logging.trace("*********************************")
test = indices
ref = np.array([-1, 0, 11, 51, 54, 56, 52])
assert np.array_equal(test, ref), "test={} != ref={}".format(test, ref)
logging.info("<< PASS : test_lookup_indices >>")
if __name__ == "__main__":
set_verbosity(1)
test_lookup_indices()