from lenstronomy.Workflow.psf_fitting import PsfFitting
from lenstronomy.Workflow.alignment_matching import AlignmentFitting
from lenstronomy.Workflow.flux_calibration import FluxCalibration
from lenstronomy.ImSim.MultiBand.single_band_multi_model import SingleBandMultiModel
from lenstronomy.Workflow.multi_band_manager import MultiBandUpdateManager
from lenstronomy.Sampling.likelihood import LikelihoodModule
from lenstronomy.Sampling.sampler import Sampler
from lenstronomy.Sampling.Samplers.multinest_sampler import MultiNestSampler
from lenstronomy.Sampling.Samplers.polychord_sampler import DyPolyChordSampler
from lenstronomy.Sampling.Samplers.dynesty_sampler import DynestySampler
from lenstronomy.Sampling.Samplers.nautilus_sampler import NautilusSampler
from lenstronomy.Sampling.Samplers.cobaya_sampler import CobayaSampler
import numpy as np
import lenstronomy.Util.analysis_util as analysis_util
__all__ = ["FittingSequence"]
[docs]
class FittingSequence(object):
"""Class to define a sequence of fitting applied, inherit the Fitting class this is
a Workflow manager that allows to update model configurations before executing
another step in the modelling The user can take this module as an example of how to
create their own workflows or build their own around the FittingSequence."""
[docs]
def __init__(
self,
kwargs_data_joint,
kwargs_model,
kwargs_constraints,
kwargs_likelihood,
kwargs_params,
mpi=False,
verbose=True,
):
"""
:param kwargs_data_joint: keyword argument specifying the data according to LikelihoodModule
:param kwargs_model: keyword arguments to describe all model components used in
class_creator.create_class_instances()
:param kwargs_constraints: keyword arguments of the Param() class to handle parameter constraints during the
sampling (except upper and lower limits and sampling input mean and width)
:param kwargs_likelihood: keyword arguments of the Likelihood() class to handle parameters and settings of the
likelihood
:param kwargs_params: setting of the sampling bounds and initial guess mean and spread.
The argument is organized as:
'lens_model': [kwargs_init, kwargs_sigma, kwargs_fixed, kwargs_lower, kwargs_upper]
'source_model': [kwargs_init, kwargs_sigma, kwargs_fixed, kwargs_lower, kwargs_upper]
'lens_light_model': [kwargs_init, kwargs_sigma, kwargs_fixed, kwargs_lower, kwargs_upper]
'point_source_model': [kwargs_init, kwargs_sigma, kwargs_fixed, kwargs_lower, kwargs_upper]
'extinction_model': [kwargs_init, kwargs_sigma, kwargs_fixed, kwargs_lower, kwargs_upper]
'special': [kwargs_init, kwargs_sigma, kwargs_fixed, kwargs_lower, kwargs_upper]
'tracer_source_model': [kwargs_init, kwargs_sigma, kwargs_fixed, kwargs_lower, kwargs_upper]
:param mpi: MPI option (bool), if True, will launch an MPI Pool job for the steps in the fitting sequence where
possible
:param verbose: bool, if True prints temporary results and indicators of the fitting process
"""
self.kwargs_data_joint = kwargs_data_joint
self.multi_band_list = kwargs_data_joint.get("multi_band_list", [])
self.multi_band_type = kwargs_data_joint.get("multi_band_type", "single-band")
self._verbose = verbose
self._mpi = mpi
self._updateManager = MultiBandUpdateManager(
kwargs_model,
kwargs_constraints,
kwargs_likelihood,
kwargs_params,
num_bands=len(self.multi_band_list),
)
self._mcmc_init_samples = None
@property
def kwargs_fixed(self):
"""Returns the updated kwargs_fixed from the update manager.
:return: list of fixed kwargs, see UpdateManager()
"""
return self._updateManager.fixed_kwargs
[docs]
def fit_sequence(self, fitting_list):
"""
:param fitting_list: list of [['string', {kwargs}], ..] with 'string being the specific fitting option and
kwargs being the arguments passed to this option
:return: fitting results
"""
chain_list = []
for i, fitting in enumerate(fitting_list):
fitting_type = fitting[0]
kwargs = fitting[1]
if fitting_type == "restart":
self._updateManager.set_init_state()
elif fitting_type == "update_settings":
self.update_settings(**kwargs)
elif fitting_type == "set_param_value":
self.set_param_value(**kwargs)
elif fitting_type == "fix_not_computed":
self.fix_not_computed(**kwargs)
elif fitting_type == "psf_iteration":
self.psf_iteration(**kwargs)
elif fitting_type == "align_images":
self.align_images(**kwargs)
elif fitting_type == "calibrate_images":
self.flux_calibration(**kwargs)
elif fitting_type == "PSO":
kwargs_result, chain, param = self.pso(**kwargs)
self._updateManager.update_param_state(**kwargs_result)
chain_list.append([fitting_type, chain, param])
elif fitting_type == "SIMPLEX":
kwargs_result = self.simplex(**kwargs)
self._updateManager.update_param_state(**kwargs_result)
chain_list.append([fitting_type, kwargs_result])
elif fitting_type in ["MCMC", "emcee", "zeus"]:
if fitting_type == "MCMC":
print("MCMC selected. Sampling with default option emcee.")
fitting_type = "emcee"
if "init_samples" not in kwargs:
kwargs["init_samples"] = self._mcmc_init_samples
elif kwargs["init_samples"] is None:
kwargs["init_samples"] = self._mcmc_init_samples
mcmc_output = self.mcmc(**kwargs, sampler_type=fitting_type)
kwargs_result = self._result_from_mcmc(mcmc_output)
self._updateManager.update_param_state(**kwargs_result)
chain_list.append(mcmc_output)
elif fitting_type == "Cobaya":
print("Using the Metropolis--Hastings MCMC sampler in Cobaya.")
param_class = self.param_class
kwargs_temp = self._updateManager.parameter_state
mean_start = param_class.kwargs2args(**kwargs_temp)
kwargs_sigma = self._updateManager.sigma_kwargs
sigma_start = np.array(param_class.kwargs2args(**kwargs_sigma))
# pass the likelihood and starting info to the sampler
sampler = CobayaSampler(self.likelihoodModule, mean_start, sigma_start)
# run the sampler
updated_info, sampler_type, best_fit_values = sampler.run(**kwargs)
# change the best-fit values returned by cobaya into lenstronomy kwargs format
best_fit_kwargs = self.param_class.args2kwargs(
best_fit_values, bijective=True
)
# collect the products
mh_output = [updated_info, sampler_type, best_fit_kwargs]
# append the products to the chain list
chain_list.append(mh_output)
elif fitting_type in [
"dynesty",
"dyPolyChord",
"MultiNest",
"nested_sampling",
]:
if fitting_type == "nested_sampling":
print(
"Nested sampling selected. Sampling with default option dynesty."
)
fitting_type = "dynesty"
ns_output = self.nested_sampling(**kwargs, sampler_type=fitting_type)
chain_list.append(ns_output)
elif fitting_type == "Nautilus":
# do importance nested sampling with Nautilus
nautilus = NautilusSampler(
likelihood_module=self.likelihoodModule, mpi=self._mpi, **kwargs
)
points, log_w, log_l, log_z = nautilus.run(**kwargs)
chain_list.append([points, log_w, log_l, log_z])
if kwargs.get("verbose", False):
print(len(points), "number of points sampled")
kwargs_result = self.best_fit_from_samples(points, log_l)
self._updateManager.update_param_state(**kwargs_result)
else:
raise ValueError(
"fitting_sequence {} is not supported. Please use: 'PSO', 'SIMPLEX', "
"'MCMC' or 'emcee', 'zeus', 'Cobaya', "
"'dynesty', 'dyPolyChord', 'Multinest', 'Nautilus, '"
"'psf_iteration', 'restart', 'update_settings', 'calibrate_images' or "
"'align_images'".format(fitting_type)
)
return chain_list
[docs]
def best_fit(self, bijective=False):
"""
:param bijective: bool, if True, the mapping of image2source_plane and the mass_scaling parameterisation are inverted. If you do not use those options, there is no effect.
:return: best fit model of the current state of the FittingSequence class
"""
return self._updateManager.best_fit(bijective=bijective)
[docs]
def update_state(self, kwargs_update):
"""Updates current best fit state to the input model keywords specified.
:param kwargs_update: format of kwargs_result
:return: None
"""
self._updateManager.update_param_state(**kwargs_update)
@property
def best_fit_likelihood(self):
"""Returns the log likelihood of the best fit model of the current state of this
class.
:return: log likelihood, float
"""
kwargs_result = self.best_fit(bijective=False)
param_class = self.param_class
likelihoodModule = self.likelihoodModule
logL = likelihoodModule.logL(param_class.kwargs2args(**kwargs_result))
return logL
@property
def bic(self):
"""Bayesian information criterion (BIC) of the model.
:return: bic value, float
"""
num_data = self.likelihoodModule.num_data
num_param_nonlinear = self.param_class.num_param()[0]
num_param_linear = self.param_class.num_param_linear()
num_param = num_param_nonlinear + num_param_linear
bic = analysis_util.bic_model(self.best_fit_likelihood, num_data, num_param)
return bic
@property
def param_class(self):
"""
:return: Param() class instance reflecting the current state of FittingSequence
"""
return self._updateManager.param_class
@property
def likelihoodModule(self):
"""
:return: Likelihood() class instance reflecting the current state of FittingSequence
"""
kwargs_model = self._updateManager.kwargs_model
kwargs_likelihood = self._updateManager.kwargs_likelihood
likelihoodModule = LikelihoodModule(
self.kwargs_data_joint, kwargs_model, self.param_class, **kwargs_likelihood
)
return likelihoodModule
[docs]
def simplex(self, n_iterations, method="Nelder-Mead"):
"""Downhill simplex optimization using the Nelder-Mead algorithm.
:param n_iterations: maximum number of iterations to perform
:param method: the optimization method used, see documentation in
scipy.optimize.minimize
:return: result of the best fit
"""
param_class = self.param_class
kwargs_temp = self._updateManager.parameter_state
init_pos = param_class.kwargs2args(**kwargs_temp)
sampler = Sampler(likelihoodModule=self.likelihoodModule)
result = sampler.simplex(init_pos, n_iterations, method)
kwargs_result = param_class.args2kwargs(result, bijective=True)
return kwargs_result
[docs]
def mcmc(
self,
n_burn,
n_run,
walkerRatio=None,
n_walkers=None,
sigma_scale=1,
threadCount=1,
init_samples=None,
re_use_samples=True,
sampler_type="EMCEE",
progress=True,
backend_filename=None,
start_from_backend=False,
**kwargs_zeus
):
"""MCMC routine.
:param n_burn: number of burn in iterations (will not be saved)
:param n_run: number of MCMC iterations that are saved
:param walkerRatio: ratio of walkers/number of free parameters
:param n_walkers: integer, number of walkers of emcee (optional, if set, overwrites the walkerRatio input
:param sigma_scale: scaling of the initial parameter spread relative to the width in the initial settings
:param threadCount: number of CPU threads. If MPI option is set, threadCount=1
:param init_samples: initial sample from where to start the MCMC process
:param re_use_samples: bool, if True, re-uses the samples described in init_samples.nOtherwise starts from
scratch.
:param sampler_type: string, which MCMC sampler to be used. Options are 'emcee' and 'zeus'
:param progress: boolean, if True shows progress bar in EMCEE
:param backend_filename: name of the HDF5 file where sampling state is saved (through emcee backend engine)
:type backend_filename: string
:param start_from_backend: if True, start from the state saved in `backup_filename`.
O therwise, create a new backup file with name `backup_filename` (any already existing file is overwritten!).
:type start_from_backend: bool
:param kwargs_zeus: zeus-specific kwargs
:return: list of output arguments, e.g. MCMC samples, parameter names, logL distances of all samples specified
by the specific sampler used
"""
param_class = self.param_class
# run PSO
mcmc_class = Sampler(likelihoodModule=self.likelihoodModule)
kwargs_temp = self._updateManager.parameter_state
mean_start = param_class.kwargs2args(**kwargs_temp)
kwargs_sigma = self._updateManager.sigma_kwargs
sigma_start = np.array(param_class.kwargs2args(**kwargs_sigma)) * sigma_scale
num_param, param_list = param_class.num_param()
if n_walkers is None:
if walkerRatio is None:
raise ValueError(
"MCMC sampler needs either n_walkers or walkerRatio as input argument"
)
n_walkers = num_param * walkerRatio
# run MCMC
if init_samples is not None and re_use_samples is True:
num_samples, num_param_prev = np.shape(init_samples)
if num_param_prev == num_param:
print("re-using previous samples to initialize the next MCMC run.")
idxs = np.random.choice(len(init_samples), n_walkers)
initpos = init_samples[idxs]
else:
raise ValueError(
"Can not re-use previous MCMC samples as number of parameters have changed!"
)
else:
initpos = None
if sampler_type == "zeus":
# check if zeus is specified, if not default to emcee
samples, dist = mcmc_class.mcmc_zeus(
n_walkers,
n_run,
n_burn,
mean_start,
sigma_start,
mpi=self._mpi,
threadCount=threadCount,
progress=progress,
initpos=initpos,
backend_filename=backend_filename,
**kwargs_zeus
)
output = [sampler_type, samples, param_list, dist]
else:
# sample with emcee
samples, dist = mcmc_class.mcmc_emcee(
n_walkers,
n_run,
n_burn,
mean_start,
sigma_start,
mpi=self._mpi,
threadCount=threadCount,
progress=progress,
initpos=initpos,
backend_filename=backend_filename,
start_from_backend=start_from_backend,
)
output = [sampler_type, samples, param_list, dist]
self._mcmc_init_samples = samples # overwrites previous samples to continue from there in the next MCMC run
return output
[docs]
def pso(
self, n_particles, n_iterations, sigma_scale=1, print_key="PSO", threadCount=1
):
"""Particle Swarm Optimization.
:param n_particles: number of particles in the Particle Swarm Optimization
:param n_iterations: number of iterations in the optimization process
:param sigma_scale: scaling of the initial parameter spread relative to the
width in the initial settings
:param print_key: string, printed text when executing this routine
:param threadCount: number of CPU threads. If MPI option is set, threadCount=1
:return: result of the best fit, the PSO chain of the best fit parameter after
each iteration [lnlikelihood, parameters, velocities], list of parameters in
same order as in chain
"""
param_class = self.param_class
kwargs_temp = self._updateManager.parameter_state
init_pos = param_class.kwargs2args(**kwargs_temp)
kwargs_sigma = self._updateManager.sigma_kwargs
sigma_start = param_class.kwargs2args(**kwargs_sigma)
lower_start = np.array(init_pos) - np.array(sigma_start) * sigma_scale
upper_start = np.array(init_pos) + np.array(sigma_start) * sigma_scale
num_param, param_list = param_class.num_param()
# run PSO
sampler = Sampler(likelihoodModule=self.likelihoodModule)
result, chain = sampler.pso(
n_particles,
n_iterations,
lower_start,
upper_start,
init_pos=init_pos,
threadCount=threadCount,
mpi=self._mpi,
print_key=print_key,
)
kwargs_result = param_class.args2kwargs(result, bijective=True)
return kwargs_result, chain, param_list
[docs]
def nested_sampling(
self,
sampler_type="dynesty",
kwargs_run={},
prior_type="uniform",
width_scale=1,
sigma_scale=1,
output_basename="chain",
remove_output_dir=True,
dypolychord_dynamic_goal=0.8,
polychord_settings={},
dypolychord_seed_increment=200,
output_dir="nested_sampling_chains",
dynesty_bound="multi",
dynesty_sample="auto",
):
"""Run (Dynamic) Nested Sampling algorithms, depending on the type of algorithm.
:param sampler_type: 'MULTINEST', 'DYPOLYCHORD', 'DYNESTY'
:param kwargs_run: keywords passed to the core sampling method
:param prior_type: 'uniform' of
'gaussian', for converting the unit hypercube to param cube :param width_scale:
scale the width (lower/upper limits) of the parameters space by this factor
:param sigma_scale: if prior_type is 'gaussian', scale the gaussian sigma by
this factor
:param output_basename: name of the folder in which the core
MultiNest/PolyChord code will save output files
:param remove_output_dir: if True, the above folder is removed after completion
:param dypolychord_dynamic_goal: dynamic goal for DyPolyChord (trade-off between
evidence (0) and posterior (1) computation) :param polychord_settings: settings
dictionary to send to pypolychord. Check dypolychord documentation for details.
:param dypolychord_seed_increment: seed increment for dypolychord with MPI.
Check dypolychord documentation for details.
:param dynesty_bound: see https://dynesty.readthedocs.io
:param sampler_type: 'MULTINEST', 'DYPOLYCHORD', 'DYNESTY'
:param kwargs_run: keywords passed to the core sampling method
:param prior_type: 'uniform' of 'gaussian', for converting the unit hypercube to
param cube
:param width_scale: scale the width (lower/upper limits) of the parameters space
by this factor
:param sigma_scale: if prior_type is 'gaussian', scale the gaussian sigma by
this factor
:param output_basename: name of the folder in which the core MultiNest/PolyChord
code will save output files
:param remove_output_dir: if True, the above folder is removed after completion
:param dypolychord_dynamic_goal: dynamic goal for DyPolyChord (trade-off between
evidence (0) and posterior (1) computation)
:param polychord_settings: settings dictionary to send to pypolychord. Check
dypolychord documentation for details.
:param dypolychord_seed_increment: seed increment for dypolychord with MPI.
Check dypolychord documentation for details.
:param dynesty_bound: see https://dynesty.readthedocs.io for details
:param dynesty_sample: see https://dynesty.readthedocs.io for details
:return: list of output arguments : samples, mean inferred values, log-
likelihood, log-evidence, error on log-evidence for each sample
"""
mean_start, sigma_start = self._prepare_sampling(prior_type)
if sampler_type == "dyPolyChord":
if "resume_dyn_run" in kwargs_run and kwargs_run["resume_dyn_run"] is True:
resume_dyn_run = True
else:
resume_dyn_run = False
sampler = DyPolyChordSampler(
self.likelihoodModule,
prior_type=prior_type,
prior_means=mean_start,
prior_sigmas=sigma_start,
width_scale=width_scale,
sigma_scale=sigma_scale,
output_dir=output_dir,
output_basename=output_basename,
polychord_settings=polychord_settings,
remove_output_dir=remove_output_dir,
resume_dyn_run=resume_dyn_run,
use_mpi=self._mpi,
)
samples, means, logZ, logZ_err, logL, results_object = sampler.run(
dypolychord_dynamic_goal, kwargs_run
)
elif sampler_type == "MultiNest":
sampler = MultiNestSampler(
self.likelihoodModule,
prior_type=prior_type,
prior_means=mean_start,
prior_sigmas=sigma_start,
width_scale=width_scale,
sigma_scale=sigma_scale,
output_dir=output_dir,
output_basename=output_basename,
remove_output_dir=remove_output_dir,
use_mpi=self._mpi,
)
samples, means, logZ, logZ_err, logL, results_object = sampler.run(
kwargs_run
)
else:
sampler = DynestySampler(
self.likelihoodModule,
prior_type=prior_type,
prior_means=mean_start,
prior_sigmas=sigma_start,
width_scale=width_scale,
sigma_scale=sigma_scale,
bound=dynesty_bound,
sample=dynesty_sample,
use_mpi=self._mpi,
)
samples, means, logZ, logZ_err, logL, results_object = sampler.run(
kwargs_run
)
# update current best fit values
self._update_state(samples[-1])
output = [
sampler_type,
samples,
sampler.param_names,
logL,
logZ,
logZ_err,
results_object,
]
return output
[docs]
def psf_iteration(self, compute_bands=None, **kwargs_psf_iter):
"""Iterative PSF reconstruction.
:param compute_bands: bool list, if multiple bands, this process can be limited
to a subset of bands
:param kwargs_psf_iter: keyword arguments as used or available in
PSFIteration.update_iterative() definition
:return: 0, updated PSF is stored in self.multi_band_list
"""
kwargs_model = self._updateManager.kwargs_model
kwargs_likelihood = self._updateManager.kwargs_likelihood
likelihood_mask_list = kwargs_likelihood.get("image_likelihood_mask_list", None)
kwargs_pixelbased = kwargs_likelihood.get("kwargs_pixelbased", None)
kwargs_temp = self.best_fit(bijective=False)
if compute_bands is None:
compute_bands = [True] * len(self.multi_band_list)
for band_index in range(len(self.multi_band_list)):
if compute_bands[band_index] is True:
kwargs_psf = self.multi_band_list[band_index][1]
image_model = SingleBandMultiModel(
self.multi_band_list,
kwargs_model,
likelihood_mask_list=likelihood_mask_list,
band_index=band_index,
kwargs_pixelbased=kwargs_pixelbased,
)
psf_iter = PsfFitting(image_model_class=image_model)
kwargs_psf = psf_iter.update_iterative(
kwargs_psf, kwargs_params=kwargs_temp, **kwargs_psf_iter
)
self.multi_band_list[band_index][1] = kwargs_psf
return 0
[docs]
def align_images(
self,
n_particles=10,
n_iterations=10,
align_offset=True,
align_rotation=False,
threadCount=1,
compute_bands=None,
delta_shift=0.2,
delta_rot=0.1,
):
"""Aligns the coordinate systems of different exposures within a fixed model
parameterisation by executing a PSO with relative coordinate shifts as free
parameters.
:param n_particles: number of particles in the Particle Swarm Optimization
:param n_iterations: number of iterations in the optimization process
:param align_offset: aligns shift in Ra and Dec
:type align_offset: boolean
:param align_rotation: aligns coordinate rotation
:type align_rotation: boolean
:param delta_shift: astrometric shift tolerance
:param delta_rot: rotation angle tolerance [in radian]
:param compute_bands: bool list, if multiple bands, this process can be limited
to a subset of bands for which the coordinate system is being fit for best
alignment to the model parameters
:return: 0, updated coordinate system for the band(s)
"""
kwargs_model = self._updateManager.kwargs_model
kwargs_likelihood = self._updateManager.kwargs_likelihood
likelihood_mask_list = kwargs_likelihood.get("image_likelihood_mask_list", None)
kwargs_temp = self.best_fit(bijective=False)
if compute_bands is None:
compute_bands = [True] * len(self.multi_band_list)
for i in range(len(self.multi_band_list)):
if compute_bands[i] is True:
alignmentFitting = AlignmentFitting(
self.multi_band_list,
kwargs_model,
kwargs_temp,
band_index=i,
likelihood_mask_list=likelihood_mask_list,
align_offset=align_offset,
align_rotation=align_rotation,
)
kwargs_data, chain = alignmentFitting.pso(
n_particles=n_particles,
n_iterations=n_iterations,
delta_shift=delta_shift,
delta_rot=delta_rot,
threadCount=threadCount,
mpi=self._mpi,
print_key="Alignment fitting for band %s ..." % i,
)
print("Align completed for band %s." % i)
print(
"ra_shift: %s, dec_shift: %s, phi_rot: %s"
% (
kwargs_data.get("ra_shift", 0),
kwargs_data.get("dec_shift", 0),
kwargs_data.get("phi_rot", 0),
)
)
self.multi_band_list[i][0] = kwargs_data
return 0
[docs]
def flux_calibration(
self,
n_particles=10,
n_iterations=10,
threadCount=1,
calibrate_bands=None,
scaling_lower_limit=0,
scaling_upper_limit=1000,
):
"""Calibrates flux_scaling between multiple images. This routine only works in
'join-linear' model when fluxes are meant to be identical for different bands.
:param n_particles: number of particles in the Particle Swarm Optimization
:param n_iterations: number of iterations in the optimization process
:param calibrate_bands: state which bands the flux calibration is applied to
:type calibrate_bands: list of booleans of length of the imaging bands
:param threadCount: number of CPU threads. If MPI option is set, threadCount=1
:type threadCount: integer
:param scaling_lower_limit: lower limit of flux_scaling
:param scaling_upper_limit: upper limit of flux scaling
:return: 0, updated coordinate system for the band(s)
"""
kwargs_model = self._updateManager.kwargs_model
kwargs_temp = self.best_fit(bijective=False)
multi_band_type = self.kwargs_data_joint.get("multi_band_type", "multi-linear")
kwargs_imaging = self.likelihoodModule.kwargs_imaging
calibration_fitting = FluxCalibration(
kwargs_imaging=kwargs_imaging,
kwargs_model=kwargs_model,
kwargs_params=kwargs_temp,
calibrate_bands=calibrate_bands,
)
multi_band_list, chain = calibration_fitting.pso(
n_particles=n_particles,
n_iterations=n_iterations,
threadCount=threadCount,
mpi=self._mpi,
scaling_lower_limit=scaling_lower_limit,
scaling_upper_limit=scaling_upper_limit,
)
self.multi_band_list = multi_band_list
return 0
[docs]
def update_settings(
self,
kwargs_model=None,
kwargs_constraints=None,
kwargs_likelihood=None,
lens_add_fixed=None,
source_add_fixed=None,
lens_light_add_fixed=None,
ps_add_fixed=None,
special_add_fixed=None,
tracer_source_add_fixed=None,
lens_remove_fixed=None,
source_remove_fixed=None,
lens_light_remove_fixed=None,
ps_remove_fixed=None,
special_remove_fixed=None,
tracer_source_remove_fixed=None,
change_source_lower_limit=None,
change_source_upper_limit=None,
change_lens_lower_limit=None,
change_lens_upper_limit=None,
change_sigma_lens=None,
change_sigma_source=None,
change_sigma_lens_light=None,
):
"""Updates lenstronomy settings "on the fly".
:param kwargs_model: kwargs, specified keyword arguments overwrite the existing ones
:param kwargs_constraints: kwargs, specified keyword arguments overwrite the existing ones
:param kwargs_likelihood: kwargs, specified keyword arguments overwrite the existing ones
:param lens_add_fixed: [[i_model, ['param1', 'param2',...], [...]]
:param source_add_fixed: [[i_model, ['param1', 'param2',...], [...]]
:param lens_light_add_fixed: [[i_model, ['param1', 'param2',...], [...]]
:param ps_add_fixed: [[i_model, ['param1', 'param2',...], [...]]
:param special_add_fixed: ['param1', 'param2',...]
:param special_add_fixed: ['param1', 'param2',...]
:param tracer_source_add_fixed: [[i_model, ['param1', 'param2',...], [...]]
:param lens_remove_fixed: [[i_model, ['param1', 'param2',...], [...]]
:param source_remove_fixed: [[i_model, ['param1', 'param2',...], [...]]
:param lens_light_remove_fixed: [[i_model, ['param1', 'param2',...], [...]]
:param ps_remove_fixed: [[i_model, ['param1', 'param2',...], [...]]
:param special_remove_fixed: ['param1', 'param2',...]
:param special_remove_fixed: ['param1', 'param2',...]
:param tracer_source_remove_fixed: [[i_model, ['param1', 'param2',...], [...]]
:param change_lens_lower_limit: [[i_model, ['param_name1', 'param_name2', ...], [value1, value2, ...]]]
:param change_lens_upper_limit: [[i_model, ['param_name1', 'param_name2', ...], [value1, value2, ...]]]
:param change_source_lower_limit: [[i_model, ['param_name1', 'param_name2', ...], [value1, value2, ...]]]
:param change_source_upper_limit: [[i_model, [''param_name1', 'param_name2', ...], [value1, value2, ...]]]
:param change_sigma_lens: [[i_model, ['param_name1', 'param_name2', ...], [value1, value2, ...]]]
:param change_sigma_source: [[i_model, ['param_name1', 'param_name2', ...], [value1, value2, ...]]]
:param change_sigma_lens_light: [[i_model, ['param_name1', 'param_name2', ...], [value1, value2, ...]]]
:return: 0, the settings are overwritten for the next fitting step to come
"""
self._updateManager.update_options(
kwargs_model, kwargs_constraints, kwargs_likelihood
)
self._updateManager.update_fixed(
lens_add_fixed,
source_add_fixed,
lens_light_add_fixed,
ps_add_fixed,
special_add_fixed,
tracer_source_add_fixed,
lens_remove_fixed,
source_remove_fixed,
lens_light_remove_fixed,
ps_remove_fixed,
special_remove_fixed,
tracer_source_remove_fixed,
)
self._updateManager.update_limits(
change_source_lower_limit,
change_source_upper_limit,
change_lens_lower_limit,
change_lens_upper_limit,
)
self._updateManager.update_sigmas(
change_sigma_lens=change_sigma_lens,
change_sigma_source=change_sigma_source,
change_sigma_lens_light=change_sigma_lens_light,
)
return 0
[docs]
def set_param_value(self, **kwargs):
"""Set a parameter to a specific value. `kwargs` are below.
:param lens: [[i_model, ['param1', 'param2',...], [...]]
:type lens:
:param source: [[i_model, ['param1', 'param2',...], [...]]
:type source:
:param lens_light: [[i_model, ['param1', 'param2',...], [...]]
:type lens_light:
:param ps: [[i_model, ['param1', 'param2',...], [...]]
:type ps:
:return: 0, the value of the param is overwritten
:rtype:
"""
self._updateManager.update_param_value(**kwargs)
[docs]
def fix_not_computed(self, free_bands):
"""Fixes lens model parameters of imaging bands/frames that are not computed and
frees the parameters of the other lens models to the initial kwargs_fixed
options.
:param free_bands: bool list of length of imaging bands in order of imaging
bands, if False: set fixed lens model
:return: None
"""
self._updateManager.fix_not_computed(free_bands=free_bands)
def _prepare_sampling(self, prior_type):
if prior_type == "gaussian":
mean_start = self.param_class.kwargs2args(
**self._updateManager.parameter_state
)
sigma_start = self.param_class.kwargs2args(
**self._updateManager.sigma_kwargs
)
mean_start = np.array(mean_start)
sigma_start = np.array(sigma_start)
else:
mean_start, sigma_start = None, None
return mean_start, sigma_start
def _update_state(self, result):
"""
:param result: array of parameters being sampled (e.g. result of MCMC chain)
:return: None, updates the parameter state of the class instance
"""
kwargs_result = self.param_class.args2kwargs(result, bijective=True)
self._updateManager.update_param_state(**kwargs_result)
def _result_from_mcmc(self, mcmc_output):
"""
:param mcmc_output: list returned by self.mcmc()
:return: kwargs_result like returned by self.pso(), from best logL MCMC sample
"""
_, samples, _, logL_values = mcmc_output
return self.best_fit_from_samples(samples, logL_values)
[docs]
def best_fit_from_samples(self, samples, logl):
"""Return best fit (max likelihood) value of samples in lenstronomy conventions.
:param samples: samples of multi-dimensional parameter space
:param logl: likelihood values for each sample
:return: kwargs_result in lenstronomy convention
"""
# get index of best logL sample
best_fit_index = np.argmax(logl)
best_fit_sample = samples[best_fit_index, :]
best_fit_result = best_fit_sample.tolist()
# get corresponding kwargs
kwargs_result = self.param_class.args2kwargs(best_fit_result, bijective=True)
return kwargs_result