"""This module implements the random variable transformation of the EPI algorithm."""fromtypingimportTupleimportjax.numpyasjnpimportnumpyasnpfromjaximportjitfromeulerpiimportloggerfromeulerpi.core.data_transformationsimportDataTransformationfromeulerpi.core.kdeimporteval_kde_gaussfromeulerpi.core.modelsimportBaseModel
[docs]defevaluate_density(param:np.ndarray,model:BaseModel,data:np.ndarray,data_transformation:DataTransformation,data_stdevs:np.ndarray,slice:np.ndarray,)->Tuple[np.double,np.ndarray]:"""Calculate the parameter density as backtransformed data density using the simulation model .. math:: \\Phi_\\mathcal{Q}(q) = \\Phi_\\mathcal{Y}(s(q)) \\cdot \\sqrt{\\det \\left({\\frac{ds}{dq}(q)}^\\intercal {\\frac{ds}{dq}(q)}\\right)} Args: param (np.ndarray): parameter for which the transformed density shall be evaluated model (BaseModel): model to be evaluated data (np.ndarray): data for the model. 2D array with shape (#num_data_points, #data_dim) data_transformation (DataTransformation): The data transformation used to normalize the data. data_stdevs (np.ndarray): array of suitable kernel width for each data dimension slice (np.ndarray): slice of the parameter vector that is to be evaluated Returns: Tuple[np.double, np.ndarray]: : parameter density at the point param : vector containing the parameter, the simulation result and the density Examples: .. code-block:: python import numpy as np from eulerpi.examples.heat import Heat from eulerpi.core.kde import calc_kernel_width from eulerpi.core.data_transformations import DataIdentity from eulerpi.core.transformations import evaluate_density # use the heat model model = Heat() # generate 1000 artificial, 5D data points for the Heat example model data_mean = np.array([0.5, 0.1, 0.5, 0.9, 0.5]) data = np.random.randn(1000, 5)/25.0 + data_mean # evaluating the parameter probabiltiy density at the central parameter of the Heat model eval_param = model.central_param # calculating the kernel widths for the data based on Silverman's rule of thumb data_stdevs = calc_kernel_width(data) # evaluate the three-variate joint density slice = np.array([0,1,2]) (central_param_density, all_res) = evaluate_density(param = eval_param, model = model, data = data, data_transformation = DataIdentity(), # no data transformatio, data_stdevs = data_stdevs, slice = slice) # all_res is the concatenation of the evaluated parameter, the simulation result arising from that parameter and the inferred paramter density. Decompose as follows: eval_param = all_res[0:model.param_dim] sim_result = all_res[model.param_dim:model.param_dim+model.data_dim] central_param_density = all_res[-1] """# Build the full parameter vector for evaluation based on the passed param slice and the constant central pointsfullParam=model.central_param.copy()fullParam[slice]=param# Check if the tried parameter is within the just-defined bounds and return the lowest possible density if not.if(np.any(param<model.param_limits[slice,0])ornp.any(param>model.param_limits[slice,1])ornotmodel.param_is_within_domain(fullParam)):logger.info("Parameters outside of predefined range")# Slows down the sampling to much? -> Change logger level to warning or even errorreturn0.0,np.zeros(slice.shape[0]+model.data_dim+1)# If the parameter is within the valid ranges...else:# Try evaluating the model for the given parameters. Evaluating the model and the jacobian for the specified parameter simultaneously provide a little speedup over calculating it separately in some cases.try:sim_res,model_jac=model.forward_and_jacobian(fullParam)exceptExceptionase:logger.error("Error while evaluating model and its jacobian.""The program will continue, but the density will be set to 0."f"The parameter that caused the error is: {fullParam}"f"The error message is: {e}")return0,np.zeros(slice.shape[0]+model.data_dim+1)# normalize sim_restransformed_sim_res=data_transformation.transform(sim_res)# Evaluate the data density in the simulation result.densityEvaluation=eval_kde_gauss(data,transformed_sim_res,data_stdevs)# Calculate the simulation model's pseudo-determinant in the parameter point (also called the correction factor).# Scale with the determinant of the transformation matrix.transformation_jac=data_transformation.jacobian(sim_res)correction=calc_gram_determinant(jnp.dot(transformation_jac,model_jac)# We use dot, because matmul does not support scalars)# Multiply data density and correction factor.trafo_density_evaluation=densityEvaluation*correction# Store the current parameter, its simulation result as well as its density in a large vector that is stored separately by emcee.evaluation_results=np.concatenate((param,sim_res,np.array([trafo_density_evaluation])))returntrafo_density_evaluation,evaluation_results
[docs]defeval_log_transformed_density(param:np.ndarray,model:BaseModel,data:np.ndarray,data_transformation:DataTransformation,data_stdevs:np.ndarray,slice:np.ndarray,)->Tuple[np.double,np.ndarray]:"""Calculate the logarithmical parameter density as backtransformed data density using the simulation model .. math:: \\log{\\Phi_\\mathcal{Q}(q)} := \\begin{cases} \\log{\\Phi_\\mathcal{Q}(q)} \\quad \\text{if } \\Phi_\\mathcal{Q}(q) > 0 \\\\ -\\infty \\quad \\text{else} \\end{cases} Args: param (np.ndarray): parameter for which the transformed density shall be evaluated model (BaseModel): model to be evaluated data (np.ndarray): data for the model. 2D array with shape (#num_data_points, #data_dim) data_transformation (DataTransformation): The data transformation used to normalize the data. data_stdevs (np.ndarray): array of suitable kernel width for each data dimension slice (np.ndarray): slice of the parameter vector that is to be evaluated Returns: Tuple[np.double, np.ndarray]: : natural log of the parameter density at the point param : sampler_results (array concatenation of parameters, simulation results and evaluated density, stored as "blob" by the emcee sampler) """trafo_density_evaluation,evaluation_results=evaluate_density(param,model,data,data_transformation,data_stdevs,slice)iftrafo_density_evaluation==0:return-np.inf,evaluation_resultsreturnnp.log(trafo_density_evaluation),evaluation_results
[docs]defcalc_gram_determinant(jac:jnp.ndarray)->jnp.double:"""Evaluate the pseudo-determinant of the jacobian .. math:: \\sqrt{\\det \\left({\\frac{ds}{dq}(q)}^\\intercal {\\frac{ds}{dq}(q)}\\right)} .. warning:: The pseudo-determinant of the model jacobian serves as a correction term in the :py:func:`evaluate_density <evaluate_density>` function. Therefore this function returns 0 if the result is not finite. Args: jac(jnp.ndarray): The jacobian for which the pseudo determinant shall be calculated Returns: jnp.double: The pseudo-determinant of the jacobian. Returns 0 if the result is not finite. Examples: .. code-block:: python import jax.numpy as jnp from eulerpi.core.transformations import calc_gram_determinant jac = jnp.array([[1,2], [3,4], [5,6], [7,8]]) pseudo_det = calc_gram_determinant(jac) """correction=_calc_gram_determinant(jac)# If the correction factor is not finite, return 0 instead to not affect the sampling.ifnotjnp.isfinite(correction):correction=0.0logger.warning("Invalid value encountered for correction factor")returncorrection
@jitdef_calc_gram_determinant(jac:jnp.ndarray)->jnp.double:"""Jitted calculation of the pseudo-determinant of the jacobian. This function is called by calc_gram_determinant() and should not be called directly. It does not check if the correction factor is finite. Not much faster than a similar numpy version. However it can run on gpu and is maybe a bit faster because we can jit compile the sequence of operations. Args: jac (jnp.ndarray): The jacobian for which the pseudo determinant shall be calculated Returns: jnp.double: The pseudo-determinant of the jacobian """jac=jnp.atleast_2d(jac)ifjac.shape[0]==jac.shape[1]:returnjnp.abs(jnp.linalg.det(jac))else:jacT=jnp.transpose(jac)# The pseudo-determinant is calculated as the square root of the determinant of the matrix-product of the Jacobian and its transpose.# For numerical reasons, one can regularize the matrix product by adding a diagonal matrix of ones before calculating the determinant.# correction = np.sqrt(np.linalg.det(np.matmul(jacT,jac) + np.eye(param.shape[0])))correction=jnp.sqrt(jnp.linalg.det(jnp.matmul(jacT,jac)))returncorrection