import jax.scipy.stats as jstats
import matplotlib.pyplot as plt
import numpy as np
from eulerpi.core.dense_grid import generate_regular_grid
from eulerpi.core.inference import InferenceType, inference
from eulerpi.core.kde import calc_kernel_width, eval_kde_gauss
from eulerpi.core.result_manager import ResultManager
from eulerpi.examples.simple_models import LinearODE
# WARNING: The following code only works for the simplest case. Equidistant grid, same number of points in each dimension, ...
[docs]
def integrate(z, x, y):
# Integrate the function over the grid
integral = np.trapz(np.trapz(z, y, axis=0), x, axis=0)
return integral
# TODO: Generalize, currently only works for dense vs mcmc
[docs]
def test_inference_mcmc_dense_exact(
num_data_points=1000,
num_steps=1500,
num_runs=2, # Please keep this higher than 1. This tests is also responsible for testing whether the MCMC sampler and result_manager can concatenate multiple runs.
num_grid_points=50,
):
# define the model
model = LinearODE()
# generate artificial data
params = model.generate_artificial_params(num_data_points)
data = model.generate_artificial_data(params)
# run EPI with all inference types
result_params, result_sim_res, result_densities = {}, {}, {}
full_slice = [np.arange(model.param_dim)]
for inference_type in InferenceType._member_map_.values():
result_manager = ResultManager(
model.name, str(inference_type), full_slice
)
if InferenceType(inference_type) == InferenceType.MCMC:
inference(
model,
data,
inference_type,
result_manager=result_manager,
num_steps=num_steps,
num_runs=num_runs,
)
# Take every second sample and skip the first 5% of the chain
(
result_params[inference_type],
result_sim_res[inference_type],
result_densities[inference_type],
) = result_manager.load_inference_results(
full_slice, num_steps * num_runs // 20, 2
)
elif InferenceType(inference_type) == InferenceType.DENSE_GRID:
inference(
model,
data,
inference_type,
result_manager=result_manager,
num_grid_points=num_grid_points,
)
(
result_params[inference_type],
result_sim_res[inference_type],
result_densities[inference_type],
) = result_manager.load_inference_results(full_slice)
else:
# skip other inference types
continue
# define true pdf
def true_pdf(grid, distribution="uniform"):
if distribution == "uniform":
val = (
1.0
/ (
LinearODE.TRUE_PARAM_LIMITS[0, 1]
- LinearODE.TRUE_PARAM_LIMITS[0, 0]
)
/ (
LinearODE.TRUE_PARAM_LIMITS[1, 1]
- LinearODE.TRUE_PARAM_LIMITS[1, 0]
)
)
# return val where grid is in the limits and 0 otherwise
return np.where(
np.logical_and(
np.logical_and(
grid[:, 0] >= LinearODE.TRUE_PARAM_LIMITS[0, 0],
grid[:, 0] <= LinearODE.TRUE_PARAM_LIMITS[0, 1],
),
np.logical_and(
grid[:, 1] >= LinearODE.TRUE_PARAM_LIMITS[1, 0],
grid[:, 1] <= LinearODE.TRUE_PARAM_LIMITS[1, 1],
),
),
val,
0,
)
elif distribution == "gaussian":
mean = np.zeros(model.param_dim) + 1.5
cov = np.eye(model.param_dim)
return jstats.multivariate_normal.pdf(grid, mean, cov)
# Extract and process the results
lims = LinearODE.PARAM_LIMITS
x = np.linspace(lims[0, 0], lims[0, 1], num_grid_points)
y = np.linspace(lims[1, 0], lims[1, 1], num_grid_points)
grid = generate_regular_grid(
np.array([num_grid_points, num_grid_points]), lims, flatten=True
)
grid_2d = grid.reshape(num_grid_points, num_grid_points, model.param_dim)
# grid = results[InferenceType.DENSE_GRID][2]
mcmc_params = result_params[InferenceType.MCMC]["Slice_Q0Q1"]
mcmc_kde = eval_kde_gauss(
mcmc_params, grid, calc_kernel_width(mcmc_params)
)
dense_grid_pdf = result_densities[InferenceType.DENSE_GRID]["Slice_Q0Q1"]
true_pdf_grid = true_pdf(grid)
true_kde = eval_kde_gauss(params, grid, calc_kernel_width(params))
true_pdf_samples = true_pdf(params)
def to2d(grid):
return grid.reshape(num_grid_points, num_grid_points)
integral_mcmc_kde = integrate(to2d(mcmc_kde), x, y)
integral_dense_grid_pdf = integrate(to2d(dense_grid_pdf), x, y)
integral_true_pdf = integrate(to2d(true_pdf_grid), x, y)
# DEBUGGING
print("integral of mcmc kde", integral_mcmc_kde)
print("integral of dense grid pdf ", integral_dense_grid_pdf)
print("integral of true pdf ", integral_true_pdf)
# Just a quick check if the integrals are correct and that the range chosen limits are large enough
threshold = 0.9 # We want to capture at least 90% of the probability mass
# TODO: The threshold should be adapted depending on how hard the problem is
# and how many samples / grid points we have
assert integral_mcmc_kde > threshold
assert integral_dense_grid_pdf > threshold
assert integral_true_pdf > threshold
# Calculate the errors on the grid
mcmc_kde_error = np.abs(mcmc_kde - true_pdf_grid)
dense_grid_pdf_error = np.abs(dense_grid_pdf - true_pdf_grid)
def l2_error(x, y, error):
return np.sqrt(integrate(to2d(error**2), x, y))
# Calculate the L2 error
integral_mcmc_kde_error = l2_error(x, y, mcmc_kde_error)
integral_dense_grid_pdf_error = l2_error(x, y, dense_grid_pdf_error)
# Divide the integral through the area of the grid
integral_mcmc_kde_error /= (lims[0, 1] - lims[0, 0]) * (
lims[1, 1] - lims[1, 0]
)
integral_dense_grid_pdf_error /= (lims[0, 1] - lims[0, 0]) * (
lims[1, 1] - lims[1, 0]
)
# DEBUGGING
print("integral of mcmc kde error", integral_mcmc_kde_error)
print("integral of dense grid pdf error", integral_dense_grid_pdf_error)
scatter_mcmc_params = False
surf_mcmc_kde = True
scatter_true_params = False
surf_true_pdf_grid = True
surf_true_kde = False
surf_dense_grid_pdf = True
# Surface plot
fig = plt.figure()
ax = fig.add_subplot(111, projection="3d")
if scatter_mcmc_params:
ax.scatter(
mcmc_params[:, 0], mcmc_params[:, 1], 0, label="mcmc params"
) # We dont know the normalizing constant, so we cant plot the density
if scatter_true_params:
ax.scatter(
params[:, 0], params[:, 1], true_pdf_samples, label="true params"
)
if surf_mcmc_kde:
surf = ax.plot_surface(
grid_2d[:, :, 0],
grid_2d[:, :, 1],
to2d(mcmc_kde),
alpha=0.3,
label="mcmc kde",
)
surf._edgecolors2d = surf._edgecolor3d
surf._facecolors2d = surf._facecolor3d
if surf_true_pdf_grid:
surf = ax.plot_surface(
grid_2d[:, :, 0],
grid_2d[:, :, 1],
to2d(true_pdf_grid),
alpha=0.3,
label="true pdf",
)
surf._edgecolors2d = surf._edgecolor3d
surf._facecolors2d = surf._facecolor3d
if surf_true_kde:
surf = ax.plot_surface(
grid_2d[:, :, 0],
grid_2d[:, :, 1],
to2d(true_kde),
alpha=0.1,
label="true kde",
)
surf._edgecolors2d = surf._edgecolor3d
surf._facecolors2d = surf._facecolor3d
if surf_dense_grid_pdf:
surf = ax.plot_surface(
grid_2d[:, :, 0],
grid_2d[:, :, 1],
to2d(dense_grid_pdf),
alpha=0.3,
label="dense grid pdf",
)
surf._edgecolors2d = surf._edgecolor3d
surf._facecolors2d = surf._facecolor3d
ax.set_xlabel("param 1")
ax.set_ylabel("param 2")
ax.set_zlabel("density")
ax.legend()
plt.show()
# Assert that the errors are "small"
# TODO: Giving a meaning to the hreshold / error should be easy, because we know the true pdf and the pdfs are normalized
# TODO: Then evaluate whether the threshold is set reasonable
# TODO: The threshold should be adapted depending on how hard the problem is
# and how many samples / grid points we have
threshold = 0.02
assert integral_mcmc_kde_error < threshold
assert integral_dense_grid_pdf_error < threshold
[docs]
def test_thinning_and_burn_in():
"""Test if the thinning and burn in works as expected"""
model = LinearODE()
# generate artificial data
params = model.generate_artificial_params(1000)
data = model.generate_artificial_data(params)
# run EPI with one trivial slice
slices = [np.array([0])]
num_steps = 1000
num_walkers = 4
num_runs = 2
num_burn_in_samples = 100
thinning_factor = 4
# MCMC inference
overall_params, sim_results, density_evals, result_manager = inference(
model=model,
data=data,
slices=slices,
inference_type=InferenceType.MCMC,
num_steps=num_steps,
num_walkers=num_walkers,
num_burn_in_samples=num_burn_in_samples,
thinning_factor=thinning_factor,
num_runs=num_runs,
run_name="test_thinning_and_burn_in",
)
# Check if the thinning and burn in results in the expected shapes
num_total_samples = (
num_steps * num_walkers * num_runs - num_walkers * num_burn_in_samples
) // thinning_factor
assert overall_params["Slice_Q0"].shape[1] == slices[0].shape[0]
assert sim_results["Slice_Q0"].shape[1] == model.data_dim
assert density_evals["Slice_Q0"].shape[1] == 1
assert overall_params["Slice_Q0"].shape[0] == num_total_samples
assert sim_results["Slice_Q0"].shape[0] == num_total_samples
assert density_evals["Slice_Q0"].shape[0] == num_total_samples
# create some artificial runs to test the thinning and burn in
artificial_test_data_run = (
np.arange(num_walkers * num_steps) // thinning_factor
)
np.savetxt(
result_manager.get_slice_path(np.array([0])) + "/Params/params_0.csv",
artificial_test_data_run,
delimiter=",",
)
artificial_test_data_run += 1000
np.savetxt(
result_manager.get_slice_path(np.array([0])) + "/Params/params_1.csv",
artificial_test_data_run,
delimiter=",",
)
(
overall_params,
sim_results,
density_evals,
) = result_manager.load_inference_results()
# check if the correct samples where burned and thinned
assert np.all(
(overall_params["Slice_Q0"] - num_burn_in_samples) % thinning_factor
== 0
)
assert np.all(overall_params["Slice_Q0"] >= num_burn_in_samples)
# Run the inference if main
if __name__ == "__main__":
test_inference_mcmc_dense_exact()