# pylint: disable=unsubscriptable-object, too-many-function-args, not-callable, unexpected-keyword-arg, no-value-for-parameter, too-many-boolean-expressions
"""
Module for data representation translation methods
"""
# TODO:
# - right now we distinguish on histogramming/lookup for scalars (normal) or array, which means that instead
# of just a single value per e.g. histogram bin, there can be an array of values
# This should be made more general that one function can handle everything...since now we have several
# functions doing similar things. not very pretty
from __future__ import absolute_import, print_function, division
from copy import deepcopy
import numpy as np
from numba import guvectorize
import numba
from numba import njit, prange
# When binnings are fully regular, we can use this for super speed
import fast_histogram as fh
from collections.abc import Iterable
from concurrent.futures import ThreadPoolExecutor
from pisa import FTYPE, TARGET, PISA_NUM_THREADS
from pisa.core.binning import OneDimBinning, MultiDimBinning
from pisa.utils.comparisons import recursiveEquality
from pisa.utils.log import logging, set_verbosity
from pisa.utils.numba_tools import myjit
from pisa.utils import vectorizer
from pisa.utils.profiler import line_profile, profile
__all__ = [
'resample',
'histogram',
'lookup',
'find_index',
'find_index_unsafe',
'test_histogram',
'test_find_index',
]
FX = 'f4' if FTYPE == np.float32 else 'f8'
# --------- resampling ------------
[docs]
def resample(weights, old_sample, old_binning, new_sample, new_binning):
"""Resample binned data with a given binning into any arbitrary
`new_binning`
Parameters
----------
weights : np.ndarray
old_sample : list of np.ndarray
old_binning : PISA MultiDimBinning
new_sample : list of np.ndarray
new_binning : PISA MultiDimBinning
Returns
-------
new_hist_vals
"""
if old_binning.names != new_binning.names:
raise ValueError(f'cannot translate betwen {old_binning} and {new_binning}')
# This is a two step process: first histogram the weights into the new binning
# and keep the flat_hist_counts
flat_hist = histogram_np(old_sample, weights, new_binning, apply_weights=True)
flat_hist_counts = histogram_np(old_sample, weights, new_binning, apply_weights=False)
with np.errstate(divide='ignore', invalid='ignore'):
flat_hist /= flat_hist_counts
flat_hist = np.nan_to_num(flat_hist)
# now do the inverse, a lookup of hist vals at `new_sample` points
new_hist_vals = lookup(new_sample, weights, old_binning)
# Now, for bin we have 1 or less counts, take the lookedup value instead:
mask = flat_hist_counts > 1
new_hist_vals[mask] = flat_hist[mask]
return new_hist_vals
# --------- histogramming methods ---------------
[docs]
def histogram(sample, weights, binning, averaged, apply_weights=True):
"""Histogram `sample` points, weighting by `weights`, according to `binning`.
Parameters
----------
sample : list of np.ndarray
weights : np.ndarray
binning : PISA MultiDimBinning
averaged : bool
If True, the histogram entries are averages of the numbers that end up
in a given bin. This for example must be used when oscillation
probabilities are translated, otherwise we end up with
probability*count per bin
apply_weights : bool
wether to use weights or not
"""
if not isinstance(binning, MultiDimBinning):
raise ValueError("Binning should be a PISA MultiDimBinning")
if binning.is_irregular or not binning.is_lin:
flat_hist = histogram_np(sample, weights, binning, apply_weights=True)
else:
flat_hist = histogram_fh(sample, weights, binning, apply_weights=True)
if averaged:
if binning.is_irregular or not binning.is_lin:
flat_hist_counts = histogram_np(sample, weights, binning,
apply_weights=False)
else:
flat_hist_counts = histogram_fh(sample, weights, binning,
apply_weights=False)
with np.errstate(divide='ignore', invalid='ignore'):
flat_hist /= flat_hist_counts
flat_hist = np.nan_to_num(flat_hist)
return flat_hist
def _threaded_fh_histogramdd(sample, weights, bins, bin_range):
if not TARGET == "parallel":
return fh.histogramdd(sample=sample, weights=weights, bins=bins, range=bin_range)
splits = PISA_NUM_THREADS
with ThreadPoolExecutor(max_workers=splits) as pool:
chunk = len(sample[0]) // splits
chunked_sample = []
if weights is not None:
chunked_weights = []
ndim = len(sample)
for i in range(splits):
one_chunk = tuple(sample[j][i * chunk:(i+1) * chunk] for j in range(ndim))
chunked_sample.append(one_chunk)
if weights is not None:
chunked_weights.append(weights[i * chunk:(i+1) * chunk])
if weights is not None:
f = lambda s, w: fh.histogramdd(s, weights=w, bins=bins, range=bin_range)
results = pool.map(f, chunked_sample, chunked_weights)
else:
f = lambda s: fh.histogramdd(s, weights=None, bins=bins, range=bin_range)
results = pool.map(f, chunked_sample)
results = sum(results)
return results
def histogram_fh(sample, weights, binning, apply_weights=True): # pylint: disable=missing-docstring
"""Helper function for fast_histogram historams.
This requires binnings to be fully regular and linear.
"""
if binning.is_irregular or not binning.is_lin:
raise ValueError("Binning should be linearly-regular to use the fast_histogram library.")
ranges = [b.domain.m for b in binning]
bins = binning.num_bins
if isinstance(sample, np.ndarray):
if sample.ndim == 1:
_sample = (sample,)
else:
_sample = tuple(s for s in sample.T)
elif isinstance(sample, Iterable):
_sample = tuple(s for s in sample)
else:
raise ValueError("Sample should be either an (N, D) array, or an (N,) array, "
"or a (D, N) array-like.")
if weights is not None and weights.ndim == 2:
# that means it's 1-dim data instead of scalars
hists = []
for i in range(weights.shape[1]):
w = weights[:, i] if apply_weights else None
hist = _threaded_fh_histogramdd(sample=_sample, weights=w, bins=bins, bin_range=ranges)
hists.append(hist.ravel())
flat_hist = np.stack(hists, axis=1)
else:
w = weights if apply_weights else None
hist = _threaded_fh_histogramdd(sample=_sample, weights=w, bins=bins, bin_range=ranges)
flat_hist = hist.ravel()
return flat_hist.astype(FTYPE)
def histogram_np(sample, weights, binning, apply_weights=True): # pylint: disable=missing-docstring
"""helper function for numpy historams"""
bin_edges = [edges.magnitude for edges in binning.bin_edges]
if weights is not None and weights.ndim == 2:
# that means it's 1-dim data instead of scalars
hists = []
for i in range(weights.shape[1]):
w = weights[:, i] if apply_weights else None
hist, _ = np.histogramdd(sample=sample, weights=w, bins=bin_edges)
hists.append(hist.ravel())
flat_hist = np.stack(hists, axis=1)
else:
w = weights if apply_weights else None
hist, _ = np.histogramdd(sample=sample, weights=w, bins=bin_edges)
flat_hist = hist.ravel()
return flat_hist.astype(FTYPE)
# ---------- Lookup methods ---------------
[docs]
def lookup(sample, flat_hist, binning):
"""The inverse of histograming: Extract the histogram values at `sample`
points.
Parameters
----------
sample : num_dims list of length-num_samples np.ndarray
Points at which to find histogram's values
flat_hist : np.ndarray
Histogram values
binning : num_dims MultiDimBinning
Histogram's binning
Returns
-------
hist_vals : len-num_samples np.ndarray
Notes
-----
Handles up to 3D.
"""
if not isinstance(binning, MultiDimBinning):
raise ValueError("Binning should be a PISA MultiDimBinning")
assert binning.num_dims <= 3, 'can only do up to 3D at the moment'
bin_edges = [edges.magnitude for edges in binning.bin_edges]
if not binning.is_irregular and binning.is_lin:
if flat_hist.ndim == 1:
hist_vals = np.zeros_like(sample[0])
if binning.num_dims == 1:
lookup_regular_1d(
sample[0],
flat_hist,
xmin=bin_edges[0][0],
xmax=bin_edges[0][-1],
nx=len(bin_edges[0]) - 1,
out=hist_vals,
)
elif binning.num_dims == 2:
lookup_regular_2d(
sample[0],
sample[1],
flat_hist,
xmin=bin_edges[0][0],
xmax=bin_edges[0][-1],
nx=len(bin_edges[0]) - 1,
ymin=bin_edges[1][0],
ymax=bin_edges[1][-1],
ny=len(bin_edges[1]) - 1,
out=hist_vals,
)
elif binning.num_dims == 3:
lookup_regular_3d(
sample[0],
sample[1],
sample[2],
flat_hist,
xmin=bin_edges[0][0],
xmax=bin_edges[0][-1],
nx=len(bin_edges[0]) - 1,
ymin=bin_edges[1][0],
ymax=bin_edges[1][-1],
ny=len(bin_edges[1]) - 1,
zmin=bin_edges[2][0],
zmax=bin_edges[2][-1],
nz=len(bin_edges[2]) - 1,
out=hist_vals,
)
return hist_vals
elif flat_hist.ndim == 2:
hist_shape = (sample[0].size, flat_hist.shape[1])
hist_vals = np.zeros(hist_shape, dtype=FTYPE)
if binning.num_dims == 1:
lookup_regular_1d_array(
sample[0],
flat_hist,
xmin=bin_edges[0][0],
xmax=bin_edges[0][-1],
nx=len(bin_edges[0]) - 1,
out=hist_vals,
)
elif binning.num_dims == 2:
lookup_regular_2d_array(
sample[0],
sample[1],
flat_hist,
xmin=bin_edges[0][0],
xmax=bin_edges[0][-1],
nx=len(bin_edges[0]) - 1,
ymin=bin_edges[1][0],
ymax=bin_edges[1][-1],
ny=len(bin_edges[1]) - 1,
out=hist_vals,
)
elif binning.num_dims == 3:
lookup_regular_3d_array(
sample[0],
sample[1],
sample[2],
flat_hist,
xmin=bin_edges[0][0],
xmax=bin_edges[0][-1],
nx=len(bin_edges[0]) - 1,
ymin=bin_edges[1][0],
ymax=bin_edges[1][-1],
ny=len(bin_edges[1]) - 1,
zmin=bin_edges[2][0],
zmax=bin_edges[2][-1],
nz=len(bin_edges[2]) - 1,
out=hist_vals,
)
return hist_vals
else:
raise NotImplementedError()
if flat_hist.ndim == 1:
#print 'looking up 1D'
hist_vals = np.zeros_like(sample[0])
if binning.num_dims == 1:
lookup_vectorized_1d(
sample[0],
flat_hist,
bin_edges[0],
out=hist_vals,
)
elif binning.num_dims == 2:
lookup_vectorized_2d(
sample[0],
sample[1],
flat_hist,
bin_edges[0],
bin_edges[1],
out=hist_vals,
)
elif binning.num_dims == 3:
lookup_vectorized_3d(
sample[0],
sample[1],
sample[2],
flat_hist,
bin_edges[0],
bin_edges[1],
bin_edges[2],
out=hist_vals,
)
elif flat_hist.ndim == 2:
#print 'looking up ND'
hist_vals = np.zeros((sample[0].size, flat_hist.shape[1]), dtype=FTYPE)
if binning.num_dims == 1:
lookup_vectorized_1d_arrays(
sample[0],
flat_hist,
bin_edges[0],
out=hist_vals,
)
elif binning.num_dims == 2:
lookup_vectorized_2d_arrays(
sample[0],
sample[1],
flat_hist,
bin_edges[0],
bin_edges[1],
out=hist_vals,
)
elif binning.num_dims == 3:
lookup_vectorized_3d_arrays(
sample[0],
sample[1],
sample[2],
flat_hist,
bin_edges[0],
bin_edges[1],
bin_edges[2],
out=hist_vals,
)
else:
raise NotImplementedError()
return hist_vals
@njit(parallel=True if TARGET == "parallel" else False)
def lookup_regular_1d(x, flat_hist, xmin, xmax, nx, out):
normx = nx / (xmax - xmin)
for idx in prange(len(out)):
if x[idx] >= xmin and x[idx] < xmax:
ix = (int)((x[idx] - xmin) * normx)
out[idx] = flat_hist[ix]
continue
out[idx] = 0.
@njit(parallel=True if TARGET == "parallel" else False)
def lookup_regular_2d(x, y, flat_hist, xmin, xmax, nx, ymin, ymax, ny, out):
normx = nx / (xmax - xmin)
normy = ny / (ymax - ymin)
for idx in prange(len(out)):
if x[idx] >= xmin and x[idx] < xmax:
if y[idx] >= ymin and y[idx] < ymax:
ix = (int)((x[idx] - xmin) * normx)
iy = (int)((y[idx] - ymin) * normy)
out[idx] = flat_hist[iy + ny*ix]
continue
out[idx] = 0.
@njit(parallel=True if TARGET == "parallel" else False)
def lookup_regular_3d(x, y, z, flat_hist, xmin, xmax, nx, ymin, ymax, ny,
zmin, zmax, nz, out):
normx = nx / (xmax - xmin)
normy = ny / (ymax - ymin)
normz = nz / (zmax - zmin)
for idx in prange(len(out)):
if x[idx] >= xmin and x[idx] < xmax:
if y[idx] >= ymin and y[idx] < ymax:
if z[idx] >= zmin and z[idx] < zmax:
ix = (int)((x[idx] - xmin) * normx)
iy = (int)((y[idx] - ymin) * normy)
iz = (int)((z[idx] - zmin) * normz)
out[idx] = flat_hist[iz + nz*iy + nz*ny*ix]
continue
out[idx] = 0.
@njit(parallel=True if TARGET == "parallel" else False)
def lookup_regular_1d_array(x, flat_hist, xmin, xmax, nx, out):
normx = nx / (xmax - xmin)
for idx in prange(len(out)):
if x[idx] >= xmin and x[idx] < xmax:
ix = (int)((x[idx] - xmin) * normx)
for d in range(flat_hist.shape[1]):
out[idx][d] = flat_hist[ix][d]
continue
for d in range(flat_hist.shape[1]):
out[idx][d] = 0.
@njit(parallel=True if TARGET == "parallel" else False)
def lookup_regular_2d_array(x, y, flat_hist, xmin, xmax, nx, ymin, ymax, ny, out):
normx = nx / (xmax - xmin)
normy = ny / (ymax - ymin)
for idx in prange(len(out)):
if x[idx] >= xmin and x[idx] < xmax:
if y[idx] >= ymin and y[idx] < ymax:
ix = (int)((x[idx] - xmin) * normx)
iy = (int)((y[idx] - ymin) * normy)
for d in range(flat_hist.shape[1]):
out[idx][d] = flat_hist[iy + ny*ix][d]
continue
for d in range(flat_hist.shape[1]):
out[idx][d] = 0.
@njit(parallel=True if TARGET == "parallel" else False)
def lookup_regular_3d_array(x, y, z, flat_hist, xmin, xmax, nx, ymin, ymax, ny,
zmin, zmax, nz, out):
normx = nx / (xmax - xmin)
normy = ny / (ymax - ymin)
normz = nz / (zmax - zmin)
for idx in prange(len(out)):
if x[idx] >= xmin and x[idx] < xmax:
if y[idx] >= ymin and y[idx] < ymax:
if z[idx] >= zmin and z[idx] < zmax:
ix = (int)((x[idx] - xmin) * normx)
iy = (int)((y[idx] - ymin) * normy)
iz = (int)((z[idx] - zmin) * normz)
for d in range(flat_hist.shape[1]):
out[idx][d] = flat_hist[iz + nz*iy + nz*ny*ix][d]
continue
for d in range(flat_hist.shape[1]):
out[idx][d] = 0.
@myjit
def find_index(val, bin_edges):
"""Find index in binning for `val`. If `val` is below binning range or is
nan, return -1; if `val` is above binning range, return num_bins. Edge
inclusivity/exclusivity is defined as .. ::
[ bin 0 ) [ bin 1 ) ... [ bin num_bins-1 ]
Using these indices to produce histograms should yield identical results
(ignoring underflow and overflow, which `find_index` has) that are
equivalent to those produced by ``numpy.histogramdd``.
Parameters
----------
val : scalar
Value for which to find bin index
bin_edges : 1d numpy ndarray of 2 or more scalars
Must be monotonically increasing, and all bins are assumed to be
adjacent
Returns
-------
bin_idx : int in [-1, num_bins]
-1 is returned for underflow or if `val` is nan. `num_bins` is returned
for overflow. Otherwise, for bin_edges[0] <= `val` <= bin_edges[-1],
0 <= `bin_idx` <= num_bins - 1
"""
num_edges = len(bin_edges)
num_bins = num_edges - 1
assert num_bins >= 1, 'bin_edges must define at least one bin'
underflow_idx = -1
overflow_idx = num_bins
if val >= bin_edges[0]:
if val <= bin_edges[-1]:
bin_idx = find_index_unsafe(val, bin_edges)
# Paranoia: In case of unforseen numerical issues, force clipping of
# returned bin index to [0, num_bins - 1] (any `val` outside of binning
# is already handled, so this should be valid)
bin_idx = min(max(0, bin_idx), num_bins - 1)
else:
bin_idx = overflow_idx
else: # either value is below first bin or is NaN
bin_idx = underflow_idx
return bin_idx
@myjit
def find_index_unsafe(val, bin_edges):
"""Find bin index of `val` within binning defined by `bin_edges`.
Validity of `val` and `bin_edges` is not checked.
Parameters
----------
val : scalar
Assumed to be within range of `bin_edges` (including lower and upper
bin edges)
bin_edges : array
Returns
-------
index
See also
--------
find_index : includes bounds checking and handling of special cases
"""
# Initialize to point to left-most edge
left_edge_idx = 0
# Initialize to point to right-most edge
right_edge_idx = len(bin_edges) - 1
while left_edge_idx < right_edge_idx:
# See where value falls w.r.t. an edge ~midway between left and right edges
# ``>> 1``: integer division by 2 (i.e., divide w/ truncation)
test_edge_idx = (left_edge_idx + right_edge_idx) >> 1
# ``>=``: bin left edges are inclusive
if val >= bin_edges[test_edge_idx]:
left_edge_idx = test_edge_idx + 1
else:
right_edge_idx = test_edge_idx
# break condition of while loop is that left_edge_idx points to the
# right edge of the bin that `val` is inside of; that is one more than
# that _bin's_ index
return left_edge_idx - 1
@guvectorize(
[f'({FX}[:], {FX}[:], {FX}[:], {FX}[:])'],
'(), (j), (k) -> ()',
target='cpu',
)
def lookup_vectorized_1d(
sample,
flat_hist,
bin_edges,
weights,
):
"""Vectorized gufunc to perform the lookup"""
x = sample[0]
if (bin_edges[0] <= x <= bin_edges[-1]):
idx = find_index_unsafe(x, bin_edges)
weights[0] = flat_hist[idx]
else: # outside of binning or nan
weights[0] = 0.
@guvectorize(
[f'({FX}[:], {FX}[:, :], {FX}[:], {FX}[:])'],
'(), (j, d), (k) -> (d)',
target='cpu',
)
def lookup_vectorized_1d_arrays(
sample,
flat_hist,
bin_edges,
weights,
):
"""Vectorized gufunc to perform the lookup"""
x = sample[0]
if (bin_edges[0] <= x <= bin_edges[-1]):
idx = find_index_unsafe(x, bin_edges)
for i in range(weights.size):
weights[i] = flat_hist[idx, i]
else: # outside of binning or nan
for i in range(weights.size):
weights[i] = 0.
@guvectorize(
[f'({FX}[:], {FX}[:], {FX}[:], {FX}[:], {FX}[:], {FX}[:])'],
'(), (), (j), (k), (l) -> ()',
target='cpu',
)
def lookup_vectorized_2d(
sample_x,
sample_y,
flat_hist,
bin_edges_x,
bin_edges_y,
weights,
):
"""Vectorized gufunc to perform the lookup"""
x = sample_x[0]
y = sample_y[0]
if (
x >= bin_edges_x[0]
and x <= bin_edges_x[-1]
and y >= bin_edges_y[0]
and y <= bin_edges_y[-1]
):
idx_x = find_index_unsafe(x, bin_edges_x)
idx_y = find_index_unsafe(y, bin_edges_y)
idx = idx_x * (len(bin_edges_y) - 1) + idx_y
weights[0] = flat_hist[idx]
else: # outside of binning or nan
weights[0] = 0.
@guvectorize(
[f'({FX}[:], {FX}[:], {FX}[:, :], {FX}[:], {FX}[:], {FX}[:])'],
'(), (), (j, d), (k), (l) -> (d)',
target='cpu',
)
def lookup_vectorized_2d_arrays(
sample_x,
sample_y,
flat_hist,
bin_edges_x,
bin_edges_y,
weights,
):
"""Vectorized gufunc to perform the lookup while flat hist and weights have
both a second dimension
"""
x = sample_x[0]
y = sample_y[0]
if (
x >= bin_edges_x[0]
and x <= bin_edges_x[-1]
and y >= bin_edges_y[0]
and y <= bin_edges_y[-1]
):
idx_x = find_index_unsafe(x, bin_edges_x)
idx_y = find_index_unsafe(y, bin_edges_y)
idx = idx_x * (len(bin_edges_y) - 1) + idx_y
for i in range(weights.size):
weights[i] = flat_hist[idx, i]
else: # outside of binning or nan
for i in range(weights.size):
weights[i] = 0.
@guvectorize(
[f'({FX}[:], {FX}[:], {FX}[:], {FX}[:], {FX}[:], {FX}[:], {FX}[:], {FX}[:])'],
'(), (), (), (j), (k), (l), (m) -> ()',
target='cpu',
)
def lookup_vectorized_3d(
sample_x,
sample_y,
sample_z,
flat_hist,
bin_edges_x,
bin_edges_y,
bin_edges_z,
weights,
):
"""Vectorized gufunc to perform the lookup"""
x = sample_x[0]
y = sample_y[0]
z = sample_z[0]
if (
x >= bin_edges_x[0]
and x <= bin_edges_x[-1]
and y >= bin_edges_y[0]
and y <= bin_edges_y[-1]
and z >= bin_edges_z[0]
and z <= bin_edges_z[-1]
):
idx_x = find_index_unsafe(x, bin_edges_x)
idx_y = find_index_unsafe(y, bin_edges_y)
idx_z = find_index_unsafe(z, bin_edges_z)
idx = (idx_x * (len(bin_edges_y) - 1) + idx_y) * (len(bin_edges_z) - 1) + idx_z
weights[0] = flat_hist[idx]
else: # outside of binning or nan
weights[0] = 0.
@guvectorize(
[f'({FX}[:], {FX}[:], {FX}[:], {FX}[:, :], {FX}[:], {FX}[:], {FX}[:], {FX}[:])'],
'(), (), (), (j, d), (k), (l), (m) -> (d)',
target='cpu',
)
def lookup_vectorized_3d_arrays(
sample_x,
sample_y,
sample_z,
flat_hist,
bin_edges_x,
bin_edges_y,
bin_edges_z,
weights,
):
"""Vectorized gufunc to perform the lookup while flat hist and weights have
both a second dimension"""
x = sample_x[0]
y = sample_y[0]
z = sample_z[0]
if (
x >= bin_edges_x[0]
and x <= bin_edges_x[-1]
and y >= bin_edges_y[0]
and y <= bin_edges_y[-1]
and z >= bin_edges_z[0]
and z <= bin_edges_z[-1]
):
idx_x = find_index_unsafe(x, bin_edges_x)
idx_y = find_index_unsafe(y, bin_edges_y)
idx_z = find_index_unsafe(z, bin_edges_z)
idx = (idx_x * (len(bin_edges_y) - 1) + idx_y) * (len(bin_edges_z) - 1) + idx_z
for i in range(weights.size):
weights[i] = flat_hist[idx, i]
else: # outside of binning or nan
for i in range(weights.size):
weights[i] = 0.
[docs]
def test_histogram():
"""Unit tests for `histogram` function.
Correctness is defined as matching the histogram produced by
numpy.histogramdd.
"""
all_num_bins = [2, 3, 4]
n_evts = 10000
rand = np.random.RandomState(seed=0)
weights = rand.rand(n_evts).astype(FTYPE)
binning = []
sample = []
for num_dims, num_bins in enumerate(all_num_bins, start=1):
binning.append(
OneDimBinning(
name=f'dim{num_dims - 1}',
num_bins=num_bins,
is_lin=True,
domain=[0, num_bins],
)
)
s = rand.rand(n_evts).astype(FTYPE) * num_bins
sample.append(s)
bin_edges = [b.edge_magnitudes for b in binning]
test = histogram(sample, weights, MultiDimBinning(binning), averaged=False)
ref, _ = np.histogramdd(sample=sample, bins=bin_edges, weights=weights)
ref = ref.astype(FTYPE).ravel()
assert recursiveEquality(test, ref), f'\ntest:\n{test}\n\nref:\n{ref}'
test_avg = histogram(sample, weights, MultiDimBinning(binning), averaged=True)
ref_counts, _ = np.histogramdd(sample=sample, bins=bin_edges, weights=None)
ref_counts = ref_counts.astype(FTYPE).ravel()
ref_avg = (ref / ref_counts).astype(FTYPE)
assert recursiveEquality(test_avg, ref_avg), \
f'\ntest_avg:\n{test_avg}\n\nref_avg:\n{ref_avg}'
logging.info('<< PASS : test_histogram >>')
[docs]
def test_find_index():
"""Unit tests for `find_index` function.
Correctness is defined as producing the same histogram as numpy.histogramdd
by using the output of `find_index` (ignoring underflow and overflow values).
Additionally, -1 should be returned if a value is below the range
(underflow) or is nan, and num_bins should be returned for a value above
the range (overflow).
"""
# Negative, positive, integer, non-integer, binary-unrepresentable (0.1) edges
basic_bin_edges = [-1, -0.5, -0.1, 0, 0.1, 0.5, 1, 2, 3, 4]
failures = 0
for basic_bin_edges in [
# Negative, positive, integer, non-integer, binary-unrepresentable (0.1) edges
[-1, -0.5, -0.1, 0, 0.1, 0.5, 1, 2, 3, 4],
# A single infinite bin: [-np.inf, np.inf]
[],
# Half-infinite bins (lower or upper edge) & [-inf, .1, +inf]
[0.1],
# Single bin with finite edges & +/-inf-edge(s)-added variants
[-0.1, 0.1],
]:
# Bin edges from above, w/ and w/o +/-inf as left and/or right edges
for le, re in [
(None, None),
(-np.inf, None),
(None, np.inf),
(-np.inf, np.inf)
]:
bin_edges = deepcopy(basic_bin_edges)
if le is not None:
bin_edges = [le] + bin_edges
if re is not None:
bin_edges = bin_edges + [re]
if len(bin_edges) < 2:
continue
logging.debug('bin_edges being tested: %s', bin_edges)
bin_edges = np.array(bin_edges, dtype=FTYPE)
num_bins = len(bin_edges) - 1
underflow_idx = -1
overflow_idx = num_bins
#
# Construct test values to try out
#
non_finite_vals = [-np.inf, +np.inf, np.nan]
# Values within bins (i.e., not on edges)
inbin_vals = []
for idx in range(len(bin_edges) - 1):
lower_be = bin_edges[idx]
upper_be = bin_edges[idx + 1]
if np.isfinite(lower_be):
if np.isfinite(upper_be):
inbin_val = (lower_be + upper_be) / 2
else:
inbin_val = lower_be + 10.5
else:
if np.isfinite(upper_be):
inbin_val = upper_be - 10.5
else:
inbin_val = 10.5
inbin_vals.append(inbin_val)
# Values above/below bin edges by one unit of floating point
# accuracy
eps = np.finfo(FTYPE).eps # pylint: disable=no-member
below_edges_vals = [FTYPE((1 - eps)*be) for be in bin_edges]
above_edges_vals = [FTYPE((1 + eps)*be) for be in bin_edges]
test_vals = np.concatenate(
[
non_finite_vals,
bin_edges,
inbin_vals,
below_edges_vals,
above_edges_vals,
]
)
logging.trace('test_vals = %s', test_vals)
#
# Run tests
#
for val in test_vals:
val = FTYPE(val)
np_histvals, _ = np.histogramdd([val], np.atleast_2d(bin_edges))
nonzero_indices = np.nonzero(np_histvals)[0] # select first & only dim
if np.isnan(val):
assert len(nonzero_indices) == 0, str(len(nonzero_indices))
expected_idx = underflow_idx
elif val < bin_edges[0]:
assert len(nonzero_indices) == 0, str(len(nonzero_indices))
expected_idx = underflow_idx
elif val > bin_edges[-1]:
assert len(nonzero_indices) == 0, str(len(nonzero_indices))
expected_idx = overflow_idx
else:
assert len(nonzero_indices) == 1, str(len(nonzero_indices))
expected_idx = nonzero_indices[0]
found_idx = find_index(val, bin_edges)
if found_idx != expected_idx:
failures += 1
msg = 'val={}, edges={}: Expected idx={}, found idx={}'.format(
val, bin_edges, expected_idx, found_idx
)
logging.error(msg)
assert failures == 0, f"{failures} failures, inspect ERROR messages above for info"
logging.info('<< PASS : test_find_index >>')
if __name__ == '__main__':
set_verbosity(1)
test_find_index()
test_histogram()