"""Module implementing the :py:func:`inference <eulerpi.core.inference.inference>` with a Markov Chain Monte Carlo(MCMC) sampling.
The inference with a sampling based approach approximates the the (joint/marginal) parameter distribution(s) by calculating a parameter Markov Chain
using multiple walkers in parallel. This module is currently based on the emcee package.
.. _emcee: https://emcee.readthedocs.io/en/stable/
.. note::
The functions in this module are mainly intended for internal use and are accessed by :func:`inference <eulerpi.core.inference>` function.
Read the documentation of :func:`inference_mcmc <inference_mcmc>` to learn more about the available options for the MCMC based inference.
"""
import typing
import warnings
from multiprocessing import get_context
from os import path
import emcee
import numpy as np
from eulerpi import logger
from eulerpi.core.data_transformations import DataTransformation
from eulerpi.core.inference_types import InferenceType
from eulerpi.core.kde import calc_kernel_width
from eulerpi.core.models import BaseModel
from eulerpi.core.result_manager import ResultManager
from eulerpi.core.transformations import eval_log_transformed_density
[docs]
def run_emcee_once(
model: BaseModel,
data: np.ndarray,
data_transformation: DataTransformation,
data_stdevs: np.ndarray,
slice: np.ndarray,
initial_walker_positions: np.ndarray,
num_walkers: int,
num_steps: int,
num_processes: int,
) -> np.ndarray:
"""Run the emcee particle swarm sampler once.
Args:
model (BaseModel): The model which will be sampled
data (np.ndarray): data
data_transformation (DataTransformation): The data transformation used to normalize the data.
data_stdevs (np.ndarray): kernel width for the data
slice (np.ndarray): slice of the parameter space which will be sampled
initial_walker_positions (np.ndarray): initial parameter values for the walkers
num_walkers (int): number of particles in the particle swarm sampler
num_steps (int): number of samples each particle performs before storing the sub run
num_processes (int): number of parallel threads
Returns:
np.ndarray: samples from the transformed parameter density
"""
# Create a pool of worker processes
pool = get_context("spawn").Pool(processes=num_processes)
sampling_dim = slice.shape[0]
# Call the sampler for all parallel workers (possibly use arg moves = movePolicy)
try:
sampler = emcee.EnsembleSampler(
num_walkers,
sampling_dim,
eval_log_transformed_density,
pool=pool,
args=[model, data, data_transformation, data_stdevs, slice],
)
# Extract the final walker position and close the pool of worker processes.
final_walker_positions, _, _, _ = sampler.run_mcmc(
initial_walker_positions, num_steps, tune=True, progress=True
)
except ValueError as e:
# If the message equals "Probability function returned NaN."
if "Probability function returned NaN" in str(e):
raise ValueError(
"Probability function returned NaN. "
"You possibly have to exclude data dimensions which do not depend on the paramaters. "
"In addition your parameters should not be linearly dependent."
)
else:
raise e
if pool is not None:
pool.close()
pool.join()
# TODO: Keep as 3d array?
# Should have shape (num_steps, num_walkers, param_dim+data_dim+1)
sampler_results = sampler.get_blobs()
data_dim = model.data_dim
sampler_results = sampler_results.reshape(
num_steps * num_walkers, sampling_dim + data_dim + 1
)
logger.info(
f"The acceptance fractions of the emcee sampler per walker are: {np.round(sampler.acceptance_fraction, 2)}"
)
try:
corrTimes = sampler.get_autocorr_time()
logger.info(f"autocorrelation time: {corrTimes[0]}")
except emcee.autocorr.AutocorrError as e:
logger.warning(
"The autocorrelation time could not be calculate reliable"
)
return sampler_results, final_walker_positions
[docs]
def run_emcee_sampling(
model: BaseModel,
data: np.ndarray,
data_transformation: DataTransformation,
slice: np.ndarray,
result_manager: ResultManager,
num_processes: int,
num_runs: int,
num_walkers: int,
num_steps: int,
num_burn_in_samples: int,
thinning_factor: int,
) -> typing.Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Create a representative sample from the transformed parameter density using the emcee particle swarm sampler.
Inital values are not stored in the chain and each file contains <num_steps> blocks of size num_walkers.
Args:
model (BaseModel): The model which will be sampled
data (np.ndarray): data
data_transformation (DataTransformation): The data transformation used to normalize the data.
slice (np.ndarray): slice of the parameter space which will be sampled
result_manager (ResultManager): ResultManager which will store the results
num_processes (int): number of parallel threads.
num_runs (int): number of stored sub runs.
num_walkers (int): number of particles in the particle swarm sampler.
num_steps (int): number of samples each particle performs before storing the sub run.
num_burn_in_samples(int): Number of samples that will be deleted (burned) per chain (i.e. walker). Only for mcmc inference.
thinning_factor (int): thinning factor for the samples.
Returns:
typing.Tuple[np.ndarray, np.ndarray, np.ndarray]: Array with all params, array with all data, array with all log probabilities
TODO check: are those really log probabilities?
"""
# Calculate the standard deviation of the data for each dimension
data_stdevs = calc_kernel_width(data)
sampling_dim = slice.shape[0]
# Initialize each walker at a Gaussian-drawn random, slightly different parameter close to the central parameter.
# compute element-wise min of the difference between the central parameter and the lower sampling limit and the difference between the central parameter and the upper sampling limit
d_min = np.minimum(
model.central_param - model.param_limits[:, 0],
model.param_limits[:, 1] - model.central_param,
)
initial_walker_positions = model.central_param[slice] + d_min[slice] * (
np.random.rand(num_walkers, sampling_dim) - 0.5
)
# Count and print how many runs have already been performed for this model
num_existing_files = result_manager.count_emcee_sub_runs(slice)
logger.debug(f"{num_existing_files} existing files found")
# Loop over the remaining sub runs and contiune the counter where it ended.
for run in range(num_existing_files, num_existing_files + num_runs):
logger.info(f"Run {run} of {num_runs}")
# If there are current walker positions defined by runs before this one, use them.
position_path = (
result_manager.get_slice_path(slice) + "/currentPos.csv"
)
if path.isfile(position_path):
initial_walker_positions = np.loadtxt(
position_path,
delimiter=",",
ndmin=2,
)
logger.info(
f"Continue sampling from saved sampler position in {position_path}"
)
# Run the sampler.
with warnings.catch_warnings():
# This warning is raised when the model returned a -inf value for the log probability, e.g. because the parameters are out of bounds.
# We want to ignore this warning, because the sampler will handle this case correctly.
# NaN values and other errors are not affected by this.
warnings.filterwarnings(
"ignore",
module="red_blue",
category=RuntimeWarning,
message="invalid value encountered in scalar subtract",
)
sampler_results, final_walker_positions = run_emcee_once(
model,
data,
data_transformation,
data_stdevs,
slice,
initial_walker_positions,
num_walkers,
num_steps,
num_processes,
)
result_manager.save_run(
model, slice, run, sampler_results, final_walker_positions
)
(
overall_params,
overall_sim_results,
overall_density_evals,
) = result_manager.load_slice_inference_results(
slice, num_burn_in_samples, thinning_factor
)
result_manager.save_overall(
slice,
overall_params,
overall_sim_results,
overall_density_evals,
)
return (
overall_params,
overall_sim_results,
overall_density_evals,
)
[docs]
def calc_walker_acceptance(
model: BaseModel,
slice: np.ndarray,
num_walkers: int,
num_burn_in_samples: int,
result_manager: ResultManager,
):
"""Calculate the acceptance ratio for each individual walker of the emcee chain.
This is especially important to find "zombie" walkers, that are never moving.
Args:
model (BaseModel): The model for which the acceptance ratio should be calculated
slice (np.ndarray): slice for which the acceptance ratio should be calculated
num_walkers (int): number of walkers in the emcee chain
num_burn_in_samples(int): Number of samples that will be deleted (burned) per chain (i.e. walker). Only for mcmc inference.
result_manager (ResultManager): ResultManager to load the results from
Returns:
np.ndarray: Array with the acceptance ratio for each walker
"""
# load the emcee parameter chain
(
params,
sim_res,
density_evals,
) = result_manager.load_slice_inference_results(
slice, num_burn_in_samples, 1
)
# calculate the number of steps each walker walked
# subtract 1 because we count the steps between the parameters
num_steps = int(params.shape[0] / num_walkers) - 1
# Unflatten the parameter chain and count the number of accepted steps for each walker
params = params.reshape(num_steps + 1, num_walkers, model.param_dim)
# Build a boolean array that is true if the parameters of the current step are the same as the parameters of the next step and sum over it
# If the parameters are the same, the step is not accepted and we add 0 to the number of accepted steps
# If the parameters are different, the step is accepted and we add 1 to the number of accepted steps
numAcceptedSteps = np.sum(
np.any(params[1:, :, :] != params[:-1, :, :], axis=2),
axis=0,
)
# calculate the acceptance ratio by dividing the number of accepted steps by the overall number of steps
acceptanceRatios = numAcceptedSteps / num_steps
return acceptanceRatios
[docs]
def inference_mcmc(
model: BaseModel,
data: np.ndarray,
data_transformation: DataTransformation,
result_manager: ResultManager,
slices: list[np.ndarray],
num_processes: int,
num_runs: int = 1,
num_walkers: int = 10,
num_steps: int = 2500,
num_burn_in_samples: typing.Optional[int] = None,
thinning_factor: typing.Optional[int] = None,
get_walker_acceptance: bool = False,
) -> typing.Tuple[
typing.Dict[str, np.ndarray],
typing.Dict[str, np.ndarray],
typing.Dict[str, np.ndarray],
ResultManager,
]:
"""This function runs a MCMC sampling for the given model and data.
Args:
model (BaseModel): The model describing the mapping from parameters to data.
data (np.ndarray): The data to be used for the inference.
data_transformation (DataTransformation): The data transformation used to normalize the data.
result_manager (ResultManager): The result manager to be used for the inference.
slices (np.ndarray): A list of slices to be used for the inference.
num_processes (int): The number of processes to be used for the inference.
num_runs (int, optional): The number of runs to be used for the inference. For each run except the first, all walkers continue with the end position of the previous run - this parameter does not affect the number of Markov chains, but how often results for each chain are saved. Defaults to 1.
num_walkers (int, optional): The number of walkers to be used for the inference. Corresponds to the number of Markov chains. Defaults to 10.
num_steps (int, optional): The number of steps to be used for the inference. Defaults to 2500.
num_burn_in_samples (int, optional): number of samples to be discarded as burn-in. Defaults to None means a burn in of 10% of the total number of samples.
thinning_factor (int, optional): thinning factor for the samples. Defaults to None means no thinning.
get_walker_acceptance (bool, optional): If True, the acceptance rate of the walkers is calculated and printed. Defaults to False.
Returns:
Tuple[typing.Dict[str, np.ndarray], typing.Dict[str, np.ndarray], typing.Dict[str, np.ndarray], ResultManager]: The parameter samples, the corresponding simulation results, the corresponding density
evaluations for each slice and the result manager used for the inference.
"""
# Set default values for burn in and thinning factor
if num_burn_in_samples is None:
num_burn_in_samples = int(num_runs * num_steps * 0.1)
if thinning_factor is None:
thinning_factor = 1
# create the return dictionaries
overall_params, overall_sim_results, overall_density_evals = {}, {}, {}
for slice in slices:
slice_name = result_manager.get_slice_name(slice)
result_manager.save_inference_information(
slice=slice,
model=model,
inference_type=InferenceType.MCMC,
num_processes=num_processes,
num_runs=num_runs,
num_walkers=num_walkers,
num_steps=num_steps,
num_burn_in_samples=num_burn_in_samples,
thinning_factor=thinning_factor,
)
(
overall_params[slice_name],
overall_sim_results[slice_name],
overall_density_evals[slice_name],
) = run_emcee_sampling(
model=model,
data=data,
data_transformation=data_transformation,
slice=slice,
result_manager=result_manager,
num_runs=num_runs,
num_walkers=num_walkers,
num_steps=num_steps,
num_burn_in_samples=num_burn_in_samples,
thinning_factor=thinning_factor,
num_processes=num_processes,
)
if get_walker_acceptance:
num_burn_in_steps = int(num_steps * num_runs * 0.01)
acceptance = calc_walker_acceptance(
model, slice, num_walkers, num_burn_in_steps, result_manager
)
logger.info(f"Acceptance rate for slice {slice}: {acceptance}")
return (
overall_params,
overall_sim_results,
overall_density_evals,
result_manager,
)