"""
Implementation of the Improved Sheather Jones (ISJ) KDE bandwidth selection
method outlined in:
Z. I. Botev, J. F. Grotowski, and D. P. Kroese. Kernel density
estimation via diffusion. The Annals of Statistics, 38(5):2916-2957, 2010.
and extension to use this in varaible bandwidth KDE via
I.S. Abramson, On bandwidth variation in kernel estimates - A
square root law, Annals of Stat. Vol. 10, No. 4, 1217-1223 1982
See also
P. Hall, T. C. Hu, J. S. Marron, Improved Variable Window Kernel
Estimates of Probability Densities, Annals of Statistics
Vol. 23, No. 1, 1-10, 1995
Some code in this module is Copyright (c) 2007 Zdravko Botev. See __license__
for details.
"""
# TODO: Make normalization use a relative density value normalized after ISJ
# bandwidth to take into consideration really-high densities (which can have
# narrower bandwidths than ISJ dictates) or really-low densities (which should
# have very wide bandwidths, wider than ISJ dictates)
# TODO : accuracy tests for fbwkde and vbwkde
from __future__ import absolute_import, division
from math import exp, sqrt
from time import time
import numpy as np
from scipy import fftpack, interpolate, optimize
from pisa import FTYPE, numba_jit
from pisa.utils.gaussians import gaussians
from pisa.utils.log import logging, set_verbosity, tprofile
__all__ = ['OPT_TYPE', 'FIXED_POINT_IMPL',
'fbwkde', 'vbwkde', 'isj_bandwidth', 'test_fbwkde', 'test_vbwkde']
__author__ = 'Z. Botev, J.L. Lanfranchi'
__license__ = '''Original BSD license, applicable *ONLY* to functions
"isj_bandwidth" and "fixed_point" since these were derived from Botev's
original work (this license applies to any future code derived from those
functions as well):
Copyright (c) 2007, Zdravko Botev
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in
the documentation and/or other materials provided with the
distribution
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
============================================================================
All other code in this module is under the following copyright/license:
Copyright (c) 2014-2017, The IceCube Collaboration
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.'''
# NOTE: 'minimum' is giving very rough overfit results. Switching to 'root' for
# now.
OPT_TYPE = 'root'
FIXED_POINT_IMPL = 'numba_loops'
_PI = np.pi
_TWOPI = 2*np.pi
_SQRTPI = np.sqrt(np.pi)
_SQRT2PI = np.sqrt(2*np.pi)
_PISQ = np.pi*np.pi
[docs]
def fbwkde(data, weights=None, n_dct=None, min=None, max=None,
evaluate_dens=True, evaluate_at=None):
"""Fixed-bandwidth (standard) Gaussian KDE using the Improved
Sheather-Jones bandwidth.
Code adapted for Python from the implementation in Matlab by Zdravko Botev.
Ref: Z. I. Botev, J. F. Grotowski, and D. P. Kroese. Kernel density
estimation via diffusion. The Annals of Statistics, 38(5):2916-2957, 2010.
Parameters
----------
data : array
weights : array or None
n_dct : None or int
Number of points with which to form a regular grid, from `min` to
`max`; histogram values at these points are sent through a discrete
Cosine Transform (DCT), so `n_dct` should be an integer power of 2 for
speed purposes. If None, uses next-highest-power-of-2 above
len(data)*10.
min : float or None
max : float or None
evaluate_dens : bool
evaluate_at : None or array
Returns
-------
bandwidth : float
evaluate_at : array of float
density : None or array of float
"""
if n_dct is None:
n_dct = int(2**np.ceil(np.log2(len(data)*10)))
assert int(n_dct) == n_dct
n_dct = int(n_dct)
n_datapoints = len(data)
# Parameters to set up the points on which to evaluate the density
if min is None or max is None:
minimum = data.min()
maximum = data.max()
data_range = maximum - minimum
min = minimum - data_range/2 if min is None else min
max = maximum + data_range/2 if max is None else max
hist_range = max - min
# Histogram the data to get a crude first approximation of the density
data_hist, bins = np.histogram(
data, bins=n_dct, range=(min, max), density=False, weights=weights
)
# Make into a probability mass function
if weights is None:
data_hist = data_hist / n_datapoints
else:
data_hist = data_hist / np.sum(weights)
# Define a minimum bandwidth relative to mean of distances between points
distances = np.diff(np.sort(data))
min_bandwidth = 2*np.pi*np.mean(distances)
logging.trace('min_bandwidth, 2pi*mean: %.5e', min_bandwidth)
# Solve for the ISJ fixed point to obtain the "optimal" bandwidth
isj_bw, t_star, dct_data = isj_bandwidth(
y=data_hist, n_datapoints=n_datapoints, x_range=hist_range,
min_bandwidth=min_bandwidth
)
# TODO: Detect numerical instability issues here, prior to returning!
if not evaluate_dens:
return isj_bw, evaluate_at, None
if evaluate_at is None:
# Smooth the discrete-cosine-transformed data using t_star
sm_dct_data = dct_data*np.exp(-np.arange(n_dct)**2 * _PISQ*t_star/2)
# Inverse DCT to get density
density = fftpack.idct(sm_dct_data, norm=None)*n_dct/hist_range
if np.any(density < 0):
logging.trace(
'ISJ encountered numerical instability in IDCT (resulting in a'
' negative density). Result will be computed without the IDCT.'
)
evaluate_at = (bins[0:-1] + bins[1:]) / 2
else:
evaluate_at = (bins[0:-1] + bins[1:]) / 2
density = density/np.trapz(density, evaluate_at)
return isj_bw, evaluate_at, density
else:
evaluate_at = np.asarray(evaluate_at, dtype=FTYPE)
density = gaussians(
x=evaluate_at,
mu=data.astype(FTYPE),
sigma=np.full(shape=n_datapoints, fill_value=isj_bw, dtype=FTYPE),
weights=weights
)
return isj_bw, evaluate_at, density
[docs]
def vbwkde(data, weights=None, n_dct=None, min=None, max=None, n_addl_iter=0,
evaluate_dens=True, evaluate_at=None):
"""Variable-bandwidth (standard) Gaussian KDE that uses the function
`fbwkde` for a pilot density estimate.
Parameters
----------
data : array
The data points for which the density estimate is sought
weights : array or None
Weight for each point in `data`
n_dct : None or int
Number of points with which to form a regular grid, from `min` to
`max`; histogram values at these points are sent through a discrete
Cosine Transform (DCT), so `n_dct` should be an integer power of 2 for
speed purposes. If None, uses next-highest-power-of-2 above
len(data)*10.
min : None or float
Minimum of range over which to compute density.
If None, defaults to min(data) - range(data)/2
max : None or float
Maximum of range over which to compute density.
If None: max(data) + range(data)/2
n_addl_iter : int >= 0
Number of additional iterations on the Abramson VBW density *after*
the initial VBW estimate. This can help smooth the tails of the
distribution, at the expense of additional computational cost.
evaluate_dens : bool
Whether to evaluate and return the density estimate on the points
defined by `evaluate_at`
evaluate_at : None, float, or array of float
Point(s) at which to evaluate the density estimate. If None,
evaluate_at = np.linspace(min + dx/2, max - dx/2, n_dct)
where
dx = (max - min) / n_dct
Returns
-------
kernel_bandwidths : array
evaluate_at : array
Locations at which the density estimates would be evaluated
density : array
Density estimates
Notes
-----
Specifying the range:
The specification of min and max are critical for obtaining a
reasonable density estimate. If the true underlying density slowly
decays to zero on one side or the other, like a gaussian, specifying
too-small a range will distort the edge the VBW-KDE finds. On the
other hand, an abrupt cut-off in the distribution should be
accompanied by a similar cutoff in the computational range (min and/or
max). The algorithm here will approximate such a sharp cut-off with
roughly the same performance to the reflection method for standard
KDE's (as the fixed-BW portion uses a DCT of the data), but note that
this will not perform as well as polynomial-edges or other
modifications that have been proposed in the literature.
"""
if n_dct is None:
n_dct = int(2**np.ceil(np.log2(len(data)*10)))
assert n_addl_iter >= 0 and int(n_addl_iter) == n_addl_iter
n_addl_iter = int(n_addl_iter)
# Parameters to set up the points on which to evaluate the density
if min is None or max is None:
minimum = data.min()
maximum = data.max()
data_range = maximum - minimum
min = minimum - data_range/2 if min is None else min
max = maximum + data_range/2 if max is None else max
# Pilot density estimate for the VBW KDE comes from fixed bandwidth KDE
# using the Improved Sheather-Jones algorithm. By specifying
# `evaluate_at` to be None, `fbwkde` derives a regular grid at which to
# evaluate the points and does so without needing to do a sum of Gaussians
# (only a freq.-domain multiply and inverse DCT)
isj_bw, grid, pilot_dens_on_grid = fbwkde(
data=data, weights=weights, n_dct=n_dct, min=min, max=max,
evaluate_dens=True, evaluate_at=None
)
if np.any(pilot_dens_on_grid < 0):
raise ValueError('ISJ')
# Include edges (min, max) in the grid and extend the densities to the
# left/right, respectively. (Make the profile constant out to the edge).
all_gridpoints = []
all_dens = []
if grid[0] != min:
all_gridpoints.append([min])
all_dens.append([pilot_dens_on_grid[0]])
all_gridpoints.append(grid)
all_dens.append(pilot_dens_on_grid)
if grid[-1] != max:
all_gridpoints.append([max])
all_dens.append([pilot_dens_on_grid[-1]])
grid = np.concatenate(all_gridpoints)
pilot_dens_on_grid = np.concatenate(all_dens)
# Use linear interpolation to get density at the data points
interp = interpolate.interp1d(
x=grid,
y=pilot_dens_on_grid,
kind='linear',
copy=False,
bounds_error=True,
fill_value=np.nan
)
# TODO: For some reason this gets translated into an object array, so the
# `astype` call is necessary
pilot_dens_at_datapoints = interp(data).astype(FTYPE)
n_iter = 1 + n_addl_iter
for n in range(n_iter):
# Note below diverges from the published Abramson method, by forcing
# the bandwidth at the max of the density distribution to be exactly
# the bandwidth found above with the improved Sheather-Jones BW
# selection technique. Refs:
# I.S. Abramson, On bandwidth variation in kernel estimates - A
# square root law, Annals of Stat. Vol. 10, No. 4, 1217-1223 1982
# P. Hall, T. C. Hu, J. S. Marron, Improved Variable Window Kernel
# Estimates of Probability Densities, Annals of Statistics
# Vol. 23, No. 1, 1-10, 1995
kernel_bandwidths = (
isj_bw * np.sqrt(np.max(pilot_dens_at_datapoints))
/ np.sqrt(pilot_dens_at_datapoints)
)
if n < n_addl_iter:
pilot_dens_at_datapoints = gaussians(
x=data,
mu=data,
sigma=kernel_bandwidths,
weights=weights
)
else: # final iteration
if evaluate_at is None:
evaluate_at = grid
if evaluate_dens:
density = gaussians(
x=evaluate_at,
mu=data,
sigma=kernel_bandwidths,
weights=weights
)
else:
density = None
return kernel_bandwidths, evaluate_at, density
[docs]
def isj_bandwidth(y, n_datapoints, x_range, min_bandwidth):
"""
Parameters
----------
y : array of float
n_datapoints : int
x_range : float
Returns
-------
bandwidth : float
t_star : float
dct_data : array of float
"""
# Ensure double-precision datatypes are used
y = np.asarray(y, dtype=np.float64)
x_range = np.float64(x_range)
n_dct = len(y)
min_t_star = (min_bandwidth/x_range)**2
i_range = np.arange(1, n_dct, dtype=np.float64)**2
log_i_range = np.log(i_range)
dct_data = fftpack.dct(y, norm=None)
dct_data_sq = 0.25 * (dct_data * dct_data)[1:]
logging.trace('Fixed point implementation = %s', FIXED_POINT_IMPL)
logging.trace('Optimization type = %s', OPT_TYPE)
if FIXED_POINT_IMPL == 'numba_np':
func = fixed_point_numba_np
args = n_datapoints, i_range, log_i_range, dct_data_sq
elif FIXED_POINT_IMPL == 'numba_loops':
func = fixed_point_numba_loops
args = n_datapoints, i_range, log_i_range, dct_data_sq
elif FIXED_POINT_IMPL == 'numba_orig':
func = fixed_point_numba_orig
args = n_datapoints, i_range, dct_data_sq
else:
raise ValueError('Unknown FIXED_POINT_IMPL "%s"' % FIXED_POINT_IMPL)
try:
if OPT_TYPE == 'root':
t_star = optimize.brentq(
f=func,
a=min_t_star/1000,
b=0.5,
rtol=np.finfo(np.float64).eps*1e2,
args=args,
full_output=True,
disp=True
)[0]
elif OPT_TYPE == 'minimum':
t_star = optimize.minimize_scalar(
fun=func,
bounds=(min_t_star/1000, 0.5),
method='Bounded',
args=args,
options=dict(maxiter=1e6, xatol=1e-22),
).x
else:
raise ValueError('Unknown OPT_TYPE "%s"' % OPT_TYPE)
if t_star < min_t_star:
logging.trace('t_star = %.4e < min_t_star = %.4e;'
' setting to min_t_star', t_star, min_t_star)
t_star = min_t_star
# NOTE: Use `math.sqrt` _not_ `numpy.sqrt` to ensure failures raise
# exceptions (numpy might be coaxed to do so, but it'd probably be
# slow)
bandwidth = sqrt(t_star)*x_range
except ValueError:
logging.error('Improved Sheather-Jones bandwidth %s-finding failed.',
OPT_TYPE)
if min_bandwidth is not None:
logging.error('Using supplied `min_bandwidth` instead.')
bandwidth = min_bandwidth
t_star = min_t_star
else:
raise
return bandwidth, t_star, dct_data
if OPT_TYPE == 'root':
def optfunc(f):
"""No-op decorator"""
return f
elif OPT_TYPE == 'minimum':
def optfunc(f):
"""Decorator for returning abs of function output"""
def func(*args, **kw):
"""Return absolute value of function"""
return np.abs(f(*args, **kw))
return func
_ELL = 7
_TWOPI2ELL = 2 * _PI**(2*_ELL)
_K0 = np.array([
(1 + 0.5**(_S + 0.5)) * np.prod(np.arange(1, 2*_S, 2)) * 2/(3*_SQRT2PI)
for _S in range(_ELL-1, 1, -1)
])
@optfunc
@numba_jit(nopython=True, nogil=True, cache=True, fastmath=False)
def fixed_point_numba_np(t, n_datapoints, i_range, log_i_range, a2):
"""ISJ fixed-point calculation as per Botev et al..
In this implementation, Numba makes calls to Numpy routines.
Parameters
----------
t
n_datapoints
i_range
log_i_range
a2
Returns
-------
t_star
"""
exparg = _ELL*log_i_range - i_range*(_PISQ*t)
ex = np.exp(exparg)
eff = _TWOPI2ELL * np.dot(a2, ex)
for s in range(_ELL-1, 1, -1):
t_elap = (_K0[s] / (n_datapoints * eff))**(2 / (3 + 2*s))
exparg = s*log_i_range - i_range*(_PISQ*t_elap)
ex = np.exp(exparg)
dp = np.dot(a2, ex)
eff = 2*_PI**(2*s) * dp
return t - (2.0 * n_datapoints * _SQRTPI * eff)**-0.4
@optfunc
@numba_jit('{f}({f}, int64, {f}[:], {f}[:], {f}[:])'.format(f='float64'),
nopython=True, nogil=True, cache=True, fastmath=False)
def fixed_point_numba_loops(t, n_datapoints, i_range, log_i_range, a2):
"""ISJ fixed-point calculation as per Botev et al..
In this implementation, Numba explicitly loops over arrays.
Parameters
----------
t
n_datapoints
i_range
log_i_range
a2
Returns
-------
t_star
"""
len_i = len(i_range)
tot = 0.0
for idx in range(len_i):
a2_el = a2[idx]
log_i_range_el = log_i_range[idx]
i_range_el = i_range[idx]
exparg = _ELL*log_i_range_el - i_range_el * _PISQ * t
tot += a2_el * exp(exparg)
eff = _TWOPI2ELL * tot
for s in range(_ELL-1, 1, -1):
k0 = _K0[s]
t_elap = (k0 / (n_datapoints * eff))**(2 / (3 + 2*s))
tot = 0.0
for idx in range(len_i):
a2_el = a2[idx]
log_i_range_el = log_i_range[idx]
i_range_el = i_range[idx]
exparg = s*log_i_range_el - i_range_el*_PISQ*t_elap
tot += a2_el * exp(exparg)
eff = 2 * _PI**(2*s) * tot
return t - (2*n_datapoints*_SQRTPI*eff)**-0.4
@optfunc
@numba_jit('{f}({f}, int64, {f}[:], {f}[:])'.format(f='float64'),
nopython=True, nogil=True, cache=True, fastmath=False)
def fixed_point_numba_orig(t, n_datapoints, i_range, a2):
"""Fixed point algorithm for Improved Sheather Jones bandwidth
selection.
Implements the fixed-point finding for the function
``t - zeta * gamma^{[l]}(t)``
See The Annals of Statistics, 38(5):2916-2957, 2010.
Parameters
----------
t : float64
n_datapoints : int64
i_range : array of float64
a2 : array of float64
Returns
-------
t_star : float64
NOTES
-----
Original implementation by Botev et al. is quad-precision, whereas this is
double precision only. This might cause discrepancies from the reference
implementation.
"""
len_i = len(i_range)
tot = 0.0
for idx in range(len_i):
tot += i_range[idx]**_ELL * a2[idx] * exp(-i_range[idx] * _PISQ * t)
eff = 2 * _PI**(2*_ELL) * tot
for s in range(_ELL-1, 1, -1):
k0 = np.prod(np.arange(1, 2*s, 2)) / _SQRT2PI
const = (1 + (0.5)**(s+0.5)) / 3.0
t_elap = (2 * const * k0 / (n_datapoints * eff))**(2.0 / (3.0 + 2.0*s))
tot = 0.0
for idx in range(len_i):
tot += (
i_range[idx]**s * a2[idx] * exp(-i_range[idx] * _PISQ * t_elap)
)
eff = 2 * _PI**(2*s) * tot
return t - (2.0 * n_datapoints * _SQRTPI * eff)**-0.4
# tests failing with numba 0.62.1
[docs]
def test_fbwkde():
"""Test speed of fbwkde implementation"""
try:
n_samp = int(1e4)
n_dct = int(2**12)
n_eval = int(1e4)
x = np.linspace(0, 20, n_eval)
np.random.seed(0)
times = []
for _ in range(3):
enuerr = np.random.noncentral_chisquare(df=3, nonc=1, size=n_samp)
t0 = time()
fbwkde(data=enuerr, n_dct=n_dct, evaluate_at=x)
times.append(time() - t0)
tprofile.debug(
'median time to run fbwkde, %d samples %d dct, eval. at %d points: %f'
' ms', n_samp, n_dct, n_eval, np.median(times)*1000
)
logging.info('<< PASS : test_fbwkde >>')
except Exception as e:
# don't fail, just document here
logging.error("test_fbwkde failed: '%s'. This is under investigation..." % str(e))
[docs]
def test_vbwkde():
"""Test speed of unweighted vbwkde implementation"""
try:
n_samp = int(1e4)
n_dct = int(2**12)
n_eval = int(5e3)
n_addl = 0
x = np.linspace(0, 20, n_samp)
np.random.seed(0)
times = []
for _ in range(3):
enuerr = np.random.noncentral_chisquare(df=3, nonc=1, size=n_eval)
t0 = time()
vbwkde(data=enuerr, n_dct=n_dct, evaluate_at=x, n_addl_iter=n_addl)
times.append(time() - t0)
tprofile.debug(
'median time to run vbwkde, %d samples %d dct %d addl iter, eval. at'
' %d points: %f ms', n_samp, n_dct, n_addl, n_eval, np.median(times)*1e3
)
logging.info('<< PASS : test_vbwkde >>')
except Exception as e:
# don't fail, just document here
logging.error("test_vbwkde failed: '%s'. This is under investigation..." % str(e))
def test_weighted_vbwkde():
"""Test speed of vbwkde implementation using weights"""
try:
n_samp = int(1e4)
n_dct = int(2**12)
n_eval = int(5e3)
n_addl = 0
x = np.linspace(0, 20, n_samp)
np.random.seed(0)
times = []
for _ in range(3):
enuerr = np.random.noncentral_chisquare(df=3, nonc=1, size=n_eval)
weights = np.random.rand(n_eval)
t0 = time()
vbwkde(data=enuerr, weights=weights,
n_dct=n_dct, evaluate_at=x, n_addl_iter=n_addl)
times.append(time() - t0)
tprofile.debug(
'median time to run weighted vbwkde, %d samples %d dct %d addl iter,'
' eval. at %d points: %f ms',
n_samp, n_dct, n_addl, n_eval, np.median(times)*1e3
)
logging.info('<< PASS : test_weighted_vbwkde >>')
except Exception as e:
# don't fail, just document here
logging.error("test_weighted_vbwkde failed: '%s'. This is under investigation..." % str(e))
if __name__ == "__main__":
set_verbosity(2)
test_fbwkde()
test_vbwkde()
test_weighted_vbwkde()