__author__ = "sibirrer"
from lenstronomy.Sampling.Likelihoods.time_delay_likelihood import TimeDelayLikelihood
from lenstronomy.Sampling.Likelihoods.image_likelihood import ImageLikelihood
from lenstronomy.Sampling.Likelihoods.tracer_likelihood import TracerLikelihood
from lenstronomy.Sampling.Likelihoods.position_likelihood import PositionLikelihood
from lenstronomy.Sampling.Likelihoods.flux_ratio_likelihood import FluxRatioLikelihood
from lenstronomy.Sampling.Likelihoods.prior_likelihood import PriorLikelihood
from lenstronomy.Sampling.Likelihoods.kinematic_2D_likelihood import KinLikelihood
import lenstronomy.Util.class_creator as class_creator
import numpy as np
__all__ = ["LikelihoodModule"]
[docs]
class LikelihoodModule(object):
"""This class contains the routines to run a MCMC process.
the key components are:
- imSim_class: an instance of a class that simulates one (or more) images and returns the likelihood, such as
ImageModel(), Multiband(), MultiExposure()
- param_class: instance of a Param() class that can cast the sorted list of parameters that are sampled into the
conventions of the imSim_class
Additional arguments are supported for adding a time-delay likelihood etc (see __init__ definition)
"""
[docs]
def __init__(
self,
kwargs_data_joint,
kwargs_model,
param_class,
image_likelihood=True,
check_bounds=True,
check_matched_source_position=False,
astrometric_likelihood=False,
image_position_likelihood=False,
source_position_likelihood=False,
image_position_uncertainty=0.004,
check_positive_flux=False,
source_position_tolerance=0.001,
source_position_sigma=0.001,
force_no_add_image=False,
source_marg=False,
linear_prior=None,
restrict_image_number=False,
max_num_images=None,
bands_compute=None,
time_delay_likelihood=False,
image_likelihood_mask_list=None,
flux_ratio_likelihood=False,
kwargs_flux_compute=None,
prior_lens=None,
prior_source=None,
prior_extinction=None,
prior_lens_light=None,
prior_ps=None,
prior_special=None,
prior_lens_kde=None,
prior_source_kde=None,
prior_lens_light_kde=None,
prior_ps_kde=None,
prior_special_kde=None,
prior_extinction_kde=None,
prior_lens_lognormal=None,
prior_source_lognormal=None,
prior_extinction_lognormal=None,
prior_lens_light_lognormal=None,
prior_ps_lognormal=None,
prior_special_lognormal=None,
custom_logL_addition=None,
kwargs_pixelbased=None,
kinematic_2d_likelihood=False,
kin_lens_idx=0,
kin_lens_light_idx=0,
tracer_likelihood=False,
tracer_likelihood_mask=None,
):
"""Initializing class.
:param param_class: instance of a Param() class that can cast the sorted list of
parameters that are sampled into the conventions of the imSim_class
:param image_likelihood: bool, option to compute the imaging likelihood
:param source_position_likelihood: bool, if True, ray-traces image positions
back to source plane and evaluates relative errors in respect ot the
position_uncertainties in the image plane
:param check_bounds: bool, option to punish the hard bounds in parameter space
:param check_matched_source_position: bool, option to check whether point source
position of solver finds a solution to match all the image positions in the
same source plane coordinate
:param astrometric_likelihood: bool, additional likelihood term of the predicted
vs modelled point source position
:param image_position_uncertainty: float, 1-sigma Gaussian uncertainty on the
point source position (only used if point_source_likelihood=True)
:param check_positive_flux: bool, option to punish models that do not have all
positive linear amplitude parameters
:param source_position_tolerance: float, punishment of check_solver occurs when
image positions are predicted further away than this number
:param image_likelihood_mask_list: list of boolean 2d arrays of size of images
marking the pixels to be evaluated in the likelihood
:param force_no_add_image: bool, if True: computes ALL image positions of the
point source. If there are more images predicted than modelled, a punishment
occurs
:param source_marg: marginalization addition on the imaging likelihood based on
the covariance of the inferred linear coefficients
:param linear_prior: float or list of floats (when multi-linear setting is
chosen) indicating the range of linear amplitude priors when computing the
marginalization term.
:param restrict_image_number: bool, if True: computes ALL image positions of the
point source. If there are more images predicted than indicated in
max_num_images, a punishment occurs
:param max_num_images: int, see restrict_image_number
:param bands_compute: list of bools with same length as data objects, indicates
which "band" to include in the fitting
:param time_delay_likelihood: bool, if True computes the time-delay likelihood
of the FIRST point source
:param kwargs_flux_compute: keyword arguments of how to compute the image
position fluxes (see FluxRatioLikeliood)
:param custom_logL_addition: a definition taking as arguments (kwargs_lens,
kwargs_source, kwargs_lens_light, kwargs_ps, kwargs_special,
kwargs_extinction) and returns a logL (punishing) value.
:param kwargs_pixelbased: keyword arguments with various settings related to the
pixel-based solver (see SLITronomy documentation)
:param kinematic_2d_likelihood: bool, option to compute the kinematic likelihood
:param tracer_likelihood: option to perform likelihood on tracer quantity derived from imaging or spectroscopy
"""
# TODO unpack also tracer model from kwargs_data
(
multi_band_list,
multi_band_type,
time_delays_measured,
time_delays_uncertainties,
flux_ratios,
flux_ratio_errors,
ra_image_list,
dec_image_list,
kinematic_data,
tracer_data,
) = self._unpack_data(**kwargs_data_joint)
if len(multi_band_list) == 0:
image_likelihood = False
self.kinematic_data = kinematic_data
self.param = param_class
self._lower_limit, self._upper_limit = self.param.param_limits()
self._prior_likelihood = PriorLikelihood(
prior_lens,
prior_source,
prior_lens_light,
prior_ps,
prior_special,
prior_extinction,
prior_lens_kde,
prior_source_kde,
prior_lens_light_kde,
prior_ps_kde,
prior_special_kde,
prior_extinction_kde,
prior_lens_lognormal,
prior_source_lognormal,
prior_lens_light_lognormal,
prior_ps_lognormal,
prior_special_lognormal,
prior_extinction_lognormal,
)
self._time_delay_likelihood = time_delay_likelihood
self._image_likelihood = image_likelihood
self._flux_ratio_likelihood = flux_ratio_likelihood
self._tracer_likelihood = tracer_likelihood
self._kinematic_2D_likelihood = kinematic_2d_likelihood
if kwargs_flux_compute is None:
kwargs_flux_compute = {}
linear_solver = self.param.linear_solver
self._kwargs_flux_compute = kwargs_flux_compute
self._check_bounds = check_bounds
self._custom_logL_addition = custom_logL_addition
self._kwargs_time_delay = {
"time_delays_measured": time_delays_measured,
"time_delays_uncertainties": time_delays_uncertainties,
}
self._kwargs_image_likelihood = {
"source_marg": source_marg,
"linear_prior": linear_prior,
"check_positive_flux": check_positive_flux,
"kwargs_pixelbased": kwargs_pixelbased,
"linear_solver": linear_solver,
}
self._kwargs_image_sim = {
"multi_band_list": multi_band_list,
"multi_band_type": multi_band_type,
"bands_compute": bands_compute,
"image_likelihood_mask_list": image_likelihood_mask_list,
}
self._kwargs_position = {
"astrometric_likelihood": astrometric_likelihood,
"image_position_likelihood": image_position_likelihood,
"source_position_likelihood": source_position_likelihood,
"ra_image_list": ra_image_list,
"dec_image_list": dec_image_list,
"image_position_uncertainty": image_position_uncertainty,
"check_matched_source_position": check_matched_source_position,
"source_position_tolerance": source_position_tolerance,
"source_position_sigma": source_position_sigma,
"force_no_add_image": force_no_add_image,
"restrict_image_number": restrict_image_number,
"max_num_images": max_num_images,
}
self._kwargs_tracer = {
"tracer_data": tracer_data,
"tracer_likelihood_mask": tracer_likelihood_mask,
"linear_solver": linear_solver,
}
self._kwargs_flux = {
"flux_ratios": flux_ratios,
"flux_ratio_errors": flux_ratio_errors,
}
self._kwargs_flux.update(self._kwargs_flux_compute)
if self._kinematic_2D_likelihood is True:
print(
"Note that the 2D kinematic likelihood assumes that the lens and lens light have the same center and orientation"
)
if len(multi_band_list) > 1:
print(
"Kinematic Likelihood not meant for multiband, using first band by default"
)
if kwargs_model["lens_model_list"][kin_lens_idx] not in ["EPL_Q_PHI"]:
print(
"Lens for kinematic is not EPL_Q_PHI, the 2D kinematic likelihood will break."
)
if kwargs_model["lens_light_model_list"][kin_lens_light_idx] not in [
"SERSIC_ELLIPSE_Q_PHI"
]:
print(
"Lens light for kinematic is not SERSIC_ELLIPSE_Q_PHI , the 2D kinematic likelihood will break."
)
self._kin_lens_idx = kin_lens_idx
self._kin_lens_light_idx = kin_lens_light_idx
self._class_instances(
kwargs_model=kwargs_model,
kwargs_image_sim=self._kwargs_image_sim,
kwargs_image_likelihood=self._kwargs_image_likelihood,
kwargs_position=self._kwargs_position,
kwargs_flux=self._kwargs_flux,
kwargs_time_delay=self._kwargs_time_delay,
kinematic_data=self.kinematic_data,
kwargs_tracer=self._kwargs_tracer,
)
@property
def kwargs_imaging(self):
"""Dictionary of imaging model keyword arguments.
:return: kwargs_imaging
"""
kwargs_imaging = {**self._kwargs_image_likelihood, **self._kwargs_image_sim}
return kwargs_imaging
def _class_instances(
self,
kwargs_model,
kwargs_image_sim,
kwargs_image_likelihood,
kwargs_position,
kwargs_flux,
kwargs_time_delay,
kinematic_data,
kwargs_tracer,
):
"""
:param kwargs_model: lenstronomy model keyword arguments
:param kwargs_imaging: keyword arguments for imaging likelihood
:param kwargs_position: keyword arguments for positional likelihood
:param kwargs_flux: keyword arguments for flux ratio likelihood
:param kwargs_time_delay: keyword arguments for time delay likelihood
:param kinematic_data: kinematic class for kinematic likelihood
:return: updated model instances of this class
"""
# TODO: in case lens model or point source models are only applied on partial images, then this current class
# has ambiguities when it comes to position likelihood, time-delay likelihood and flux ratio likelihood
(
lens_model_class,
_,
lens_light_model_class,
point_source_class,
_,
) = class_creator.create_class_instances(all_models=True, **kwargs_model)
self.PointSource = point_source_class
if self._time_delay_likelihood is True:
self.time_delay_likelihood = TimeDelayLikelihood(
lens_model_class=lens_model_class,
point_source_class=point_source_class,
**kwargs_time_delay
)
if self._image_likelihood is True:
kwargs_imaging = {**kwargs_image_likelihood, **kwargs_image_sim}
self.image_likelihood = ImageLikelihood(
kwargs_model=kwargs_model, **kwargs_imaging
)
if self._tracer_likelihood is True:
self.tracer_likelihood = TracerLikelihood(
kwargs_model=kwargs_model,
kwargs_imaging=kwargs_image_sim,
**kwargs_tracer
)
self._position_likelihood = PositionLikelihood(
point_source_class, **kwargs_position
)
if self._flux_ratio_likelihood is True:
self.flux_ratio_likelihood = FluxRatioLikelihood(
lens_model_class, **kwargs_flux
)
if self._kinematic_2D_likelihood is True:
kwargs_imaging = {**kwargs_image_likelihood, **kwargs_image_sim}
self.kinematic_2D_likelihood = KinLikelihood(
kinematic_data,
lens_model_class,
lens_light_model_class,
kwargs_imaging["multi_band_list"][0][0],
self._kin_lens_idx,
self._kin_lens_light_idx,
)
def __call__(self, a):
return self.logL(a)
[docs]
def logL(self, args, verbose=False):
"""Routine to compute X2 given variable parameters for a MCMC/PSO chain.
:param args: ordered parameter values that are being sampled
:type args: tuple or list of floats
:param verbose: if True, makes print statements about individual likelihood
components
:type verbose: boolean
:returns: log likelihood of the data given the model (natural logarithm)
"""
# extract parameters
kwargs_return = self.param.args2kwargs(args)
if self._check_bounds is True:
penalty, bound_hit = self.check_bounds(
args, self._lower_limit, self._upper_limit, verbose=verbose
)
if bound_hit is True:
return -(10**15)
return self.log_likelihood(kwargs_return, verbose=verbose)
[docs]
def log_likelihood(self, kwargs_return, verbose=False):
"""
:param kwargs_return: need to contain 'kwargs_lens', 'kwargs_source', 'kwargs_lens_light', 'kwargs_ps',
'kwargs_special'. These entries themselves are lists of keyword argument of the parameters entering the model
to be evaluated
:type kwargs_return: keyword arguments
:param verbose: if True, makes print statements about individual likelihood components
:type verbose: boolean
:returns:
- logL (float) log likelihood of the data given the model (natural logarithm)
"""
kwargs_lens, kwargs_source, kwargs_lens_light, kwargs_ps, kwargs_special = (
kwargs_return["kwargs_lens"],
kwargs_return["kwargs_source"],
kwargs_return["kwargs_lens_light"],
kwargs_return["kwargs_ps"],
kwargs_return["kwargs_special"],
)
kwargs_tracer_source = kwargs_return["kwargs_tracer_source"]
# update model instance in case of changes affecting it (i.e. redshift sampling in multi-plane)
self._update_model(kwargs_special)
# generate image and computes likelihood
self._reset_point_source_cache(bool_input=True)
logL = 0
if self._image_likelihood is True:
logL_image, param = self.image_likelihood.logL(**kwargs_return)
logL += logL_image
if verbose is True:
print("image logL = %s" % logL_image)
else:
param = None
if self._time_delay_likelihood is True:
logL_time_delay = self.time_delay_likelihood.logL(
kwargs_lens, kwargs_ps, kwargs_special
)
logL += logL_time_delay
if verbose is True:
print("time-delay logL = %s" % logL_time_delay)
if self._flux_ratio_likelihood is True:
ra_image_list, dec_image_list = self.PointSource.image_position(
kwargs_ps=kwargs_ps, kwargs_lens=kwargs_lens
)
x_pos, y_pos = ra_image_list[0], dec_image_list[0]
logL_flux_ratios = self.flux_ratio_likelihood.logL(
x_pos, y_pos, kwargs_lens, kwargs_special
)
logL += logL_flux_ratios
if verbose is True:
print("flux ratio logL = %s" % logL_flux_ratios)
if self._kinematic_2D_likelihood is True:
logL_kinematic_2d = self.kinematic_2D_likelihood.logL(
kwargs_lens, kwargs_lens_light, kwargs_special
)
logL += logL_kinematic_2d
if verbose is True:
print("kinematic logL = %s" % logL_kinematic_2d)
logL += self._position_likelihood.logL(
kwargs_lens, kwargs_ps, kwargs_special, verbose=verbose
)
if self._tracer_likelihood is True:
logL_tracer = self.tracer_likelihood.logL(param=param, **kwargs_return)
if verbose is True:
print("tracer logL = %s" % logL_tracer)
logL += logL_tracer
logL_prior = self._prior_likelihood.logL(**kwargs_return)
logL += logL_prior
if verbose is True:
print("Prior likelihood = %s" % logL_prior)
if self._custom_logL_addition is not None:
logL_cond = self._custom_logL_addition(**kwargs_return)
logL += logL_cond
if verbose is True:
print("custom added logL = %s" % logL_cond)
self._reset_point_source_cache(bool_input=False)
return logL # , None
[docs]
@staticmethod
def check_bounds(args, lowerLimit, upperLimit, verbose=False):
"""Checks whether the parameter vector has left its bound, if so, adds a big
number."""
penalty = 0.0
bound_hit = False
args = np.atleast_1d(args)
for i in range(0, len(args)):
if args[i] < lowerLimit[i] or args[i] > upperLimit[i]:
penalty = 10.0**5
bound_hit = True
if verbose is True:
print(
"parameter %s with value %s hit the bounds [%s, %s] "
% (i, args[i], lowerLimit[i], upperLimit[i])
)
return penalty, bound_hit
return penalty, bound_hit
@property
def num_data(self):
"""
:return: number of independent data points in the combined fitting
"""
num_data = 0
if self._image_likelihood is True:
num_data += self.image_likelihood.num_data
if self._time_delay_likelihood is True:
num_data += self.time_delay_likelihood.num_data
if self._flux_ratio_likelihood is True:
num_data += self.flux_ratio_likelihood.num_data
num_data += self._position_likelihood.num_data
if self._tracer_likelihood is True:
num_data += self.tracer_likelihood.num_data
return num_data
@property
def param_limits(self):
return self._lower_limit, self._upper_limit
[docs]
def effective_num_data_points(self, **kwargs):
"""Returns the effective number of data points considered in the X2 estimation
to compute the reduced X2 value."""
num_linear = 0
if self._image_likelihood is True:
num_linear = self.image_likelihood.num_param_linear(**kwargs)
num_param, param_names = self.param.num_param()
return self.num_data - num_param - num_linear
[docs]
def likelihood(self, a):
return self.logL(a)
[docs]
def negativelogL(self, a):
"""For minimizer function, the negative value of the logl value is requested.
:param a: array of parameters
:return: -logL
"""
return -self.logL(a)
@staticmethod
def _unpack_data(
multi_band_list=None,
multi_band_type="multi-linear",
time_delays_measured=None,
time_delays_uncertainties=None,
flux_ratios=None,
flux_ratio_errors=None,
ra_image_list=None,
dec_image_list=None,
kinematic_data=None,
tracer_data=None,
):
"""
:param multi_band_list: list of [[kwargs_data, kwargs_psf, kwargs_numerics], [], ...]
:param multi_band_type: string, type of multi-plane settings (multi-linear or joint-linear)
:param time_delays_measured: measured time delays (units of days)
:param time_delays_uncertainties: uncertainties in time-delay measurement
:param flux_ratios: flux ratios of point sources
:param flux_ratio_errors: error in flux ratio measurement
:return:
"""
if multi_band_list is None:
multi_band_list = []
if ra_image_list is None:
ra_image_list = []
if dec_image_list is None:
dec_image_list = []
return (
multi_band_list,
multi_band_type,
time_delays_measured,
time_delays_uncertainties,
flux_ratios,
flux_ratio_errors,
ra_image_list,
dec_image_list,
kinematic_data,
tracer_data,
)
def _reset_point_source_cache(self, bool_input=True):
self.PointSource.delete_lens_model_cache()
self.PointSource.set_save_cache(bool_input)
if self._image_likelihood is True:
self.image_likelihood.reset_point_source_cache(bool_input)
if self._tracer_likelihood is True:
self.tracer_likelihood.reset_point_source_cache(bool_input)
def _update_model(self, kwargs_special):
"""Updates lens model instance of this class (and all class instances related to
it) when an update to the modeled redshifts of the deflector and/or source
planes are made.
:param kwargs_special: keyword arguments from SpecialParam() class return of
sampling arguments
:return: None, all class instances updated to recent model
"""
kwargs_model, update_bool = self.param.update_kwargs_model(kwargs_special)
if update_bool is True:
self._class_instances(
kwargs_model=kwargs_model,
kwargs_image_sim=self._kwargs_image_sim,
kwargs_image_likelihood=self._kwargs_image_likelihood,
kwargs_position=self._kwargs_position,
kwargs_flux=self._kwargs_flux,
kwargs_time_delay=self._kwargs_time_delay,
kinematic_data=self.kinematic_data,
kwargs_tracer=self._kwargs_tracer,
)
# TODO remove redundancies with Param() calls updates