Source code for tests.test_kde

"""Tests for the Kernel Density Estimation module"""

import jax
import jax.scipy.stats as jstats
import matplotlib.pyplot as plt
import numpy as np
import pytest
from matplotlib import cm

from eulerpi.core.dense_grid import generate_regular_grid
from eulerpi.core.kde import calc_kernel_width, eval_kde_cauchy, eval_kde_gauss


[docs] def kernel_estimators(): """Yields the kernel density estimators""" yield eval_kde_cauchy yield eval_kde_gauss
[docs] def kde_data_test_set(): """Yields test data for the KDE data tests""" data_list = [ np.array([[0.5, 2.0]]), np.array([[0.0, 0.0], [1.0, 1.0], [2.0, 0.0]]), ] stdev_list = [np.array([2.0, 1.0]), np.array([0.5, 1.0])] grid_bounds_list = [[[-2.5, 3.5], [0.5, 3.5]], [[-1.0, 3.0], [-1, 2.0]]] for i in range(len(data_list)): yield data_list[i], stdev_list[i], grid_bounds_list[i]
# Mark the test as expected to fail because the function is not implemented yet, but definitly should be implemented soon!
[docs] @pytest.mark.xfail(reason="Not implemented yet, but is important!") def test_calc_kernel_width(): data = np.array([[0.0, 1.0], [-1.0, 2.0], [1.0, 3.0]]) data_stdevs = calc_kernel_width(data) assert 0 == 1
[docs] @pytest.mark.xfail(reason="Not implemented yet, contains false values") def test_kde_gauss(): from eulerpi.core.kde import eval_kde_gauss data = np.array([[0.0], [2.0]]) data_stdevs = calc_kernel_width(data) grid = np.array([[0.0], [1.0], [2.0]]) kde = eval_kde_gauss(data, grid, data_stdevs) assert kde == np.array([0.5, 0.25, 0.5])
[docs] @pytest.mark.parametrize("evalKDE", kernel_estimators()) @pytest.mark.parametrize("batch", [False, True]) def test_KDE_batch(batch, evalKDE): """Test both kernel density estimators by using random data points and evaluating the Kernel Density Estimation at one point Args: batch: evalKDE: Returns: """ # Define random data points in 2D data_dim = 2 num_data_points = 3 data = np.random.rand(num_data_points, data_dim) stdevs = calc_kernel_width(data) # define the evaluation point(s) n_samples = 5 if batch: evalPoint = np.random.rand(n_samples, data_dim) else: evalPoint = np.array([0.5] * data_dim) # evaluate the KDE evaluated = evalKDE(data, evalPoint, stdevs) # The additional dimension should be still there if batch is True if batch: assert evaluated.shape == (n_samples,) else: assert evaluated.shape == ()
[docs] @pytest.mark.parametrize("data, stdevs, grid_bounds", kde_data_test_set()) @pytest.mark.parametrize("evalKDE", kernel_estimators()) def test_KDE_data(evalKDE, data, stdevs, grid_bounds, resolution=33): """Test both kernel density estimators by using one data point and evaluating the Kernel Density Estimation over a grid Args: evalKDE: data: stdevs: grid_bounds: resolution: (Default value = 33) Returns: """ xGrid = np.linspace(*(grid_bounds[0]), resolution) yGrid = np.linspace(*(grid_bounds[1]), resolution) xMesh, yMesh = np.meshgrid(xGrid, yGrid) # We only want to vectorize the call for the evaluation points in the mesh, not for the data points. # Map over axis 0 because the grid points are stored row-wise in the mesh evaluated = jax.vmap(evalKDE, in_axes=(None, 0, None))( data, np.stack([xMesh, yMesh], axis=-1), stdevs ) fig, ax = plt.subplots(subplot_kw={"projection": "3d"}) surf = ax.plot_surface( xMesh, yMesh, evaluated, alpha=0.75, cmap=cm.coolwarm, linewidth=0, antialiased=False, ) plt.show()
# WARNING: The following code only works for the simplest case. Equidistant grid, same number of points in each dimension, ...
[docs] def integrate(z, x, y): # Integrate the function over the grid integral = np.trapz(np.trapz(z, y, axis=0), x, axis=0) return integral
[docs] @pytest.mark.parametrize("dim", [1, 2], ids=["1D", "2D"]) def test_kde_convergence_gauss( dim, num_grid_points=100, num_data_points=10000 ): """Test whether the KDE converges to the true distribution.""" # Generate random numbers from a normal distribution data = np.random.randn(num_data_points, dim) stdevs = calc_kernel_width(data) # Define the grid num_grid_points = np.array( [num_grid_points for _ in range(dim)], dtype=np.int32 ) limits = np.array([[-5, 5] for _ in range(dim)]) grid = generate_regular_grid(num_grid_points, limits, flatten=True) kde_on_grid = eval_kde_gauss(data, grid, stdevs) # Evaluate the KDE mean = np.zeros(dim) cov = np.eye(dim) exact_on_grid = jstats.multivariate_normal.pdf( grid, mean, cov ) # Evaluate the true distribution diff = np.abs(kde_on_grid - exact_on_grid) # difference between the two # Plot the KDE import matplotlib.pyplot as plt if dim == 1: grid = grid[:, 0] error = np.trapz(diff, grid) # Calculate the error assert error < 0.1 # ~0.06 for 100 grid points, 1000 data points plt.plot(grid, kde_on_grid) plt.plot(grid, exact_on_grid) elif dim == 2: # Calculate the error diff = diff.reshape(num_grid_points[0], num_grid_points[1]) x = np.linspace(limits[0, 0], limits[0, 1], num_grid_points[0]) y = np.linspace(limits[1, 0], limits[1, 1], num_grid_points[1]) error = integrate(diff, x, y) assert error < 0.15 # ~0.13 for 100 grid points, 1000 data points # Surface plot fig = plt.figure() ax = fig.add_subplot(111, projection="3d") grid_2d = grid.reshape(num_grid_points[0], num_grid_points[1], dim) exact_on_grid_2d = exact_on_grid.reshape( num_grid_points[0], num_grid_points[1] ) surf = ax.plot_surface( grid_2d[:, :, 0], grid_2d[:, :, 1], exact_on_grid_2d, alpha=0.7, label="exact", ) surf._edgecolors2d = surf._edgecolor3d surf._facecolors2d = surf._facecolor3d kde_on_grid_2d = kde_on_grid.reshape( num_grid_points[0], num_grid_points[1] ) surf = ax.plot_surface( grid_2d[:, :, 0], grid_2d[:, :, 1], kde_on_grid_2d, alpha=0.7, label="kde", ) surf._edgecolors2d = surf._edgecolor3d surf._facecolors2d = surf._facecolor3d plt.show()