__author__ = "sibirrer"
import time
import copy
from lenstronomy.Sampling.Pool.pool import choose_pool
from lenstronomy.Sampling.Likelihoods.image_likelihood import ImageLikelihood
from lenstronomy.Sampling.Samplers.pso import ParticleSwarmOptimizer
__all__ = ["FluxCalibration", "CalibrationLikelihood"]
[docs]
class FluxCalibration(object):
"""Class to fit coordinate system alignment and flux amplitude calibrations."""
[docs]
def __init__(self, kwargs_imaging, kwargs_model, kwargs_params, calibrate_bands):
"""Initialise the classes of the chain and for parameter options for the flux
calibration fitting.
:param kwargs_imaging: keyword argument related to imaging data and imaging
likelihood. Feeds into ImageLikelihood(\*\*kwargs_imaging)
:param kwargs_model: keyword argument of model components
:param kwargs_params: keyword argument of model parameters
:param calibrate_bands: state which bands the flux calibration is applied to
:type calibrate_bands: list of booleans of length of the imaging bands
"""
multi_band_list = kwargs_imaging["multi_band_list"]
multi_band_type = kwargs_imaging["multi_band_type"]
if calibrate_bands is None:
calibrate_bands = [False] * len(multi_band_list)
if multi_band_type != "joint-linear":
raise ValueError(
"flux calibration should only be done with join-linear data model!"
)
self._calibrate_bands = calibrate_bands
self.chain = CalibrationLikelihood(
kwargs_model,
kwargs_params,
calibrate_bands=calibrate_bands,
kwargs_imaging=kwargs_imaging,
)
[docs]
def pso(
self,
n_particles=10,
n_iterations=10,
threadCount=1,
mpi=False,
scaling_lower_limit=0,
scaling_upper_limit=1000,
print_key="flux calibration",
):
"""Returns the best fit for the lens model on catalogue basis with particle
swarm optimizer.
:param n_particles: number of particles in the PSO
:param n_iterations: number of iterations of the PSO
:param threadCount: number of threads
:param mpi: boolean, MPI mode
:param scaling_lower_limit: lower limit of the flux_scaling initialization
:param scaling_upper_limit: upper limit of the flux_scaling initialization
:param print_key: string, print statement
:return: multi_band_list, [chi2_list, pos_list, vel_list]
"""
init_pos = self.chain.get_args(self.chain.multi_band_list)
pool = choose_pool(mpi=mpi, processes=threadCount)
num_param = self.chain.num_param
lower_limit = [scaling_lower_limit] * num_param
upper_limit = [scaling_upper_limit] * num_param
pso = ParticleSwarmOptimizer(
self.chain, lower_limit, upper_limit, n_particles, pool=pool
)
pso.set_global_best(
init_pos, [0] * len(init_pos), self.chain.likelihood(init_pos)
)
if pool.is_master():
print("Computing the %s ..." % print_key)
time_start = time.time()
result, [chi2_list, pos_list, vel_list] = pso.optimize(n_iterations)
multi_band_list = self.chain.update_data(result)
if pool.is_master():
time_end = time.time()
print("parameters found: ", result)
print(time_end - time_start, "time used for ", print_key)
print("Calibration completed for bands %s." % self._calibrate_bands)
return multi_band_list, [chi2_list, pos_list, vel_list]
[docs]
class CalibrationLikelihood(object):
[docs]
def __init__(self, kwargs_model, kwargs_params, calibrate_bands, kwargs_imaging):
"""Initializes all the classes needed for the chain.
:param kwargs_model: keyword argument of model components
:param kwargs_params: keyword argument of model parameters
:param calibrate_bands: state which bands the flux calibration is applied to
:type calibrate_bands: list of booleans of length of the imaging bands
:param kwargs_imaging: keyword arguments of the imaging likelihood
"""
self._calibrate_bands = calibrate_bands
self._kwargs_model = kwargs_model
self._kwargs_params = kwargs_params
self._kwargs_imaging_likelihood = copy.deepcopy(kwargs_imaging)
self.multi_band_list = self._kwargs_imaging_likelihood["multi_band_list"]
def _likelihood(self, args):
"""Routine to compute X2 given variable parameters for a MCMC/PSO chainF."""
# generate image and computes likelihood
multi_band_list = self.update_data(args)
self._kwargs_imaging_likelihood["multi_band_list"] = multi_band_list
# this line is redundant since the self.multi_band_list variable got already updated
image_likelihood = ImageLikelihood(
kwargs_model=self._kwargs_model, **self._kwargs_imaging_likelihood
)
log_likelihood, _ = image_likelihood.logL(**self._kwargs_params)
return log_likelihood
def __call__(self, a):
return self._likelihood(a)
[docs]
def likelihood(self, a):
return self._likelihood(a)
[docs]
def update_data(self, args):
"""
:param args:
:return: updated multi_band_list
"""
k = 0
for i, band in enumerate(self.multi_band_list):
if self._calibrate_bands[i]:
kwargs_data = band[0]
kwargs_data["flux_scaling"] = args[k]
k += 1
return self.multi_band_list
[docs]
def get_args(self, multi_band_list):
"""
:param multi_band_list: list of multi_band [[kwargs_data, kwargs_psf, kwargs_numeric], [...], ...]
:return:
"""
args = []
for i, band in enumerate(multi_band_list):
if self._calibrate_bands[i]:
args.append(band[0].get("flux_scaling", 1))
return args
@property
def num_param(self):
n = 0
for i in range(len(self._calibrate_bands)):
if self._calibrate_bands[i]:
n += 1
return n