Source code for lenstronomy.LensModel.QuadOptimizer.optimizer

__author__ = "dgilman"

from scipy.optimize import minimize
import numpy as np
from lenstronomy.Sampling.Samplers.pso import ParticleSwarmOptimizer
from lenstronomy.LensModel.QuadOptimizer.multi_plane_fast import MultiplaneFast
from lenstronomy.Sampling.Pool.pool import choose_pool

__all__ = ["Optimizer"]


[docs] class Optimizer(object): """Class which executes the optimization routines. Currently implemented as a particle swarm optimization followed by a downhill simplex routine. Particle swarm optimizer is modified from the CosmoHammer particle swarm routine with different convergence criteria implemented. """
[docs] def __init__( self, x_image, y_image, lens_model_list, redshift_list, z_lens, z_source, parameter_class, astropy_instance=None, numerical_alpha_class=None, particle_swarm=True, re_optimize=False, re_optimize_scale=1.0, pso_convergence_mean=50000, foreground_rays=None, tol_source=1e-5, tol_simplex_func=1e-3, simplex_n_iterations=400, ): """ :param x_image: x_image to fit (should be length 4) :param y_image: y_image to fit (should be length 4) :param lens_model_list: list of lens models for the system :param redshift_list: list of lens redshifts for the system :param z_lens: the main deflector redshift, the lens models being optimizer must be at this redshift :param z_source: the source redshift :param parameter_class: an instance of ParamClass (see documentation in QuadOptimizer.param_manager) :param astropy_instance: an instance of astropy to pass to the lens model :param numerical_alpha_class: a class to compute numerical deflection angles to pass to the lens model :param particle_swarm: bool, whether or not to use a PSO fit first :param re_optimize: bool, if True the initial spread of particles will be very tight :param re_optimize_scale: float, controls how tight the initial spread of particles is :param pso_convergence_mean: when to terminate the PSO fit :param foreground_rays: (optional) can pass in pre-computed foreground light rays from a previous fit so as to not waste time recomputing them :param tol_source: sigma in the source plane chi^2 :param tol_simplex_func: tolerance for the downhill simplex optimization :param simplex_n_iterations: number of iterations per dimension for the downhill simplex optimization """ self.fast_rayshooting = MultiplaneFast( x_image, y_image, z_lens, z_source, lens_model_list, redshift_list, astropy_instance, parameter_class, foreground_rays, tol_source, numerical_alpha_class, ) self._tol_source = tol_source self._pso_convergence_mean = pso_convergence_mean self._param_class = parameter_class self._tol_simplex_func = tol_simplex_func self._simplex_n_iterations = simplex_n_iterations self._particle_swarm = particle_swarm self._re_optimize = re_optimize self._re_optimize_scale = re_optimize_scale
[docs] def optimize(self, n_particles=50, n_iterations=250, verbose=False, threadCount=1): """ :param n_particles: number of PSO particles, will be ignored if self._particle_swarm is False :param n_iterations: number of PSO iterations, will be ignored if self._particle_swarm is False :param verbose: whether to print stuff :param threadCount: integer; number of threads in multi-threading mode :return: keyword arguments that map (x_image, y_image) to the same source coordinate (source_x, source_y) """ if self._particle_swarm: if threadCount > 1: pool = choose_pool(mpi=False, processes=threadCount) else: pool = None kwargs = self._fit_pso(n_particles, n_iterations, pool, verbose) else: kwargs = self._param_class.kwargs_lens kwargs_lens_final, source_penalty = self._fit_amoeba(kwargs, verbose) args_lens_final = self._param_class.kwargs_to_args(kwargs_lens_final) source_x_array, source_y_array = self.fast_rayshooting.ray_shooting_fast( args_lens_final ) source_x, source_y = np.mean(source_x_array), np.mean(source_y_array) if verbose: print("optimization done.") print("Recovered source position: ", (source_x_array, source_y_array)) return kwargs_lens_final, [source_x, source_y]
def _fit_pso(self, n_particles, n_iterations, pool, verbose): """Executes the PSO.""" low_bounds, high_bounds = self._param_class.bounds( self._re_optimize, self._re_optimize_scale ) pso = ParticleSwarmOptimizer( self.fast_rayshooting.logL, low_bounds, high_bounds, n_particles, pool, args=[self._tol_source], ) best, info = pso.optimize( n_iterations, verbose, early_stop_tolerance=self._pso_convergence_mean ) if verbose: print("PSO done... ") print( "source plane chi^2: ", self.fast_rayshooting.source_plane_chi_square(best), ) print("total chi^2: ", self.fast_rayshooting.chi_square(best)) kwargs = self._param_class.args_to_kwargs(best) return kwargs def _fit_amoeba(self, kwargs, verbose): """Executes the downhill simplex.""" args_init = self._param_class.kwargs_to_args(kwargs) options = { "adaptive": True, "fatol": self._tol_simplex_func, "maxiter": self._simplex_n_iterations * len(args_init), } method = "Nelder-Mead" if verbose: print("starting amoeba... ") opt = minimize( self.fast_rayshooting.chi_square, x0=args_init, method=method, options=options, ) kwargs = self._param_class.args_to_kwargs(opt["x"]) source_penalty = opt["fun"] return kwargs, source_penalty