from scipy import fftpack, ndimage, signal
import numpy as np
import threading
import lenstronomy.Util.kernel_util as kernel_util
import lenstronomy.Util.util as util
import lenstronomy.Util.image_util as image_util
from lenstronomy.Util.package_util import exporter
export, __all__ = exporter()
_rfft_mt_safe = True # (NumpyVersion(np.__version__) >= '1.9.0.dev-e24486e')
_rfft_lock = threading.Lock()
def _centered(arr, newshape):
# Return the center newshape portion of the array.
newshape = np.asarray(newshape)
currshape = np.array(arr.shape)
startind = (currshape - newshape) // 2
endind = startind + newshape
myslice = [slice(startind[k], endind[k]) for k in range(len(endind))]
return arr[tuple(myslice)]
[docs]
@export
class PixelKernelConvolution(object):
"""Class to compute convolutions for a given pixelized kernel (fft, grid)"""
[docs]
def __init__(self, kernel, convolution_type="fft_static"):
"""
:param kernel: 2d array, convolution kernel
:param convolution_type: string, 'fft', 'grid', 'fft_static' mode of 2d convolution
"""
self._kernel = kernel
if convolution_type not in ["fft", "grid", "fft_static"]:
raise ValueError("convolution_type %s not supported!" % convolution_type)
self._type = convolution_type
self._pre_computed = False
[docs]
def pixel_kernel(self, num_pix=None):
"""Access pixelated kernel.
:param num_pix: size of returned kernel (odd number per axis). If None, return
the original kernel.
:return: pixel kernel centered
"""
if num_pix is not None:
return kernel_util.cut_psf(self._kernel, num_pix)
return self._kernel
[docs]
def copy_transpose(self):
"""
:return: copy of the class with kernel set to the transpose of original one
"""
return PixelKernelConvolution(self._kernel.T, convolution_type=self._type)
[docs]
def convolution2d(self, image):
"""
:param image: 2d array (image) to be convolved
:return: fft convolution
"""
if self._type == "fft":
image_conv = signal.fftconvolve(image, self._kernel, mode="same")
elif self._type == "fft_static":
image_conv = self._static_fft(image, mode="same")
elif self._type == "grid":
image_conv = signal.convolve2d(image, self._kernel, mode="same")
else:
raise ValueError("convolution_type %s not supported!" % self._type)
return image_conv
def _static_fft(self, image, mode="same"):
"""Scipy fft convolution with saved static fft kernel.
:param image: 2d numpy array to be convolved
:return:
"""
in1 = image
in1 = np.asarray(in1)
if self._pre_computed is False:
(
self._s1,
self._s2,
self._complex_result,
self._shape,
self._fshape,
self._fslice,
self._sp2,
) = self._static_pre_compute(image)
self._pre_computed = True
s1, s2, complex_result, shape, fshape, fslice, sp2 = (
self._s1,
self._s2,
self._complex_result,
self._shape,
self._fshape,
self._fslice,
self._sp2,
)
# if in1.ndim == in2.ndim == 0: # scalar inputs
# return in1 * in2
# elif not in1.ndim == in2.ndim:
# raise ValueError("in1 and in2 should have the same dimensionality")
# elif in1.size == 0 or in2.size == 0: # empty arrays
# return np.array([])
# Check that input sizes are compatible with 'valid' mode
# if _inputs_swap_needed(mode, s1, s2):
# Convolution is commutative; order doesn't have any effect on output
# only applicable for 'valid' mode
# in1, s1, in2, s2 = in2, s2, in1, s1
# Pre-1.9 NumPy FFT routines are not threadsafe. For older NumPys, make
# sure we only call rfftn/irfftn from one thread at a time.
if not complex_result and (_rfft_mt_safe or _rfft_lock.acquire(False)):
try:
sp1 = np.fft.rfftn(in1, fshape)
ret = np.fft.irfftn(sp1 * sp2, fshape)[fslice].copy()
finally:
if not _rfft_mt_safe:
_rfft_lock.release()
else:
# If we're here, it's either because we need a complex result, or we
# failed to acquire _rfft_lock (meaning rfftn isn't threadsafe and
# is already in use by another thread). In either case, use the
# (threadsafe but slower) SciPy complex-FFT routines instead.
sp1 = fftpack.fftn(in1, fshape)
ret = fftpack.ifftn(sp1 * sp2)[fslice].copy()
if not complex_result:
ret = ret.real
if mode == "full":
return ret
elif mode == "same":
return _centered(ret, s1)
elif mode == "valid":
return _centered(ret, s1 - s2 + 1)
else:
raise ValueError("Acceptable mode flags are 'valid'," " 'same', or 'full'.")
def _static_pre_compute(self, image):
"""Pre-compute Fourier transformed kernel and shape quantities to speed up
convolution.
:param image: 2d numpy array
:return:
"""
in1 = image
in2 = self._kernel
s1 = np.array(in1.shape)
s2 = np.array(in2.shape)
complex_result = np.issubdtype(in1.dtype, np.complexfloating) or np.issubdtype(
in2.dtype, np.complexfloating
)
shape = s1 + s2 - 1
# Check that input sizes are compatible with 'valid' mode
# if _inputs_swap_needed(mode, s1, s2):
# Convolution is commutative; order doesn't have any effect on output
# only applicable for 'valid' mode
# in1, s1, in2, s2 = in2, s2, in1, s1
# Speed up FFT by padding to optimal size for FFTPACK
fshape = [fftpack.next_fast_len(int(d)) for d in shape]
fslice = tuple([slice(0, int(sz)) for sz in shape])
# Pre-1.9 NumPy FFT routines are not threadsafe. For older NumPys, make
# sure we only call rfftn/irfftn from one thread at a time.
if not complex_result and (_rfft_mt_safe or _rfft_lock.acquire(False)):
try:
sp2 = np.fft.rfftn(in2, fshape)
finally:
if not _rfft_mt_safe:
_rfft_lock.release()
else:
# If we're here, it's either because we need a complex result, or we
# failed to acquire _rfft_lock (meaning rfftn isn't threadsafe and
# is already in use by another thread). In either case, use the
# (threadsafe but slower) SciPy complex-FFT routines instead.
sp2 = fftpack.fftn(in2, fshape)
return s1, s2, complex_result, shape, fshape, fslice, sp2
[docs]
def re_size_convolve(self, image_low_res, image_high_res=None):
"""
:param image_low_res: regular sampled image/model
:param image_high_res: supersampled image/model to be convolved on a regular pixel grid
:return: convolved and re-sized image
"""
return self.convolution2d(image_low_res)
[docs]
@export
class SubgridKernelConvolution(object):
"""Class to compute the convolution on a supersampled grid with partial convolution
computed on the regular grid."""
[docs]
def __init__(
self,
kernel_supersampled,
supersampling_factor,
supersampling_kernel_size=None,
convolution_type="fft_static",
):
"""
:param kernel_supersampled: kernel in supersampled pixels
:param supersampling_factor: supersampling factor relative to the image pixel grid
:param supersampling_kernel_size: number of pixels (in units of the image pixels) that are convolved with the
supersampled kernel
"""
# n_high = len(kernel_supersampled)
self._supersampling_factor = supersampling_factor
# numPix = int(n_high / self._supersampling_factor)
# if self._supersampling_factor % 2 == 0:
# self._kernel = kernel_util.averaging_even_kernel(kernel_supersampled, self._supersampling_factor)
# else:
# self._kernel = util.averaging(kernel_supersampled, numGrid=n_high, numPix=numPix)
if supersampling_kernel_size is None:
kernel_low_res, kernel_high_res = np.zeros((3, 3)), kernel_supersampled
self._low_res_convolution = False
else:
kernel_low_res, kernel_high_res = kernel_util.split_kernel(
kernel_supersampled,
supersampling_kernel_size,
self._supersampling_factor,
)
self._low_res_convolution = True
self._low_res_conv = PixelKernelConvolution(
kernel_low_res, convolution_type=convolution_type
)
self._high_res_conv = PixelKernelConvolution(
kernel_high_res, convolution_type=convolution_type
)
[docs]
def convolution2d(self, image):
"""
:param image: 2d array (high resoluton image) to be convolved and re-sized
:return: convolved image
"""
image_high_res_conv = self._high_res_conv.convolution2d(image)
image_resized_conv = image_util.re_size(
image_high_res_conv, self._supersampling_factor
)
if self._low_res_convolution is True:
image_resized = image_util.re_size(image, self._supersampling_factor)
image_resized_conv += self._low_res_conv.convolution2d(image_resized)
return image_resized_conv
[docs]
def re_size_convolve(self, image_low_res, image_high_res):
"""
:param image_high_res: supersampled image/model to be convolved on a regular pixel grid
:return: convolved and re-sized image
"""
image_high_res_conv = self._high_res_conv.convolution2d(image_high_res)
image_resized_conv = image_util.re_size(
image_high_res_conv, self._supersampling_factor
)
if self._low_res_convolution is True:
image_resized_conv += self._low_res_conv.convolution2d(image_low_res)
return image_resized_conv
[docs]
@export
class MultiGaussianConvolution(object):
"""Class to perform a convolution consisting of multiple 2d Gaussians This is aimed
to lead to a speed-up without significant loss of accuracy do to the simplified
convolution kernel relative to a pixelized kernel."""
[docs]
def __init__(
self,
sigma_list,
fraction_list,
pixel_scale,
supersampling_factor=1,
supersampling_convolution=False,
truncation=2,
):
"""
:param sigma_list: list of std value of Gaussian kernel
:param fraction_list: fraction of flux to be convoled with each Gaussian kernel
:param pixel_scale: scale of pixel width (to convert sigmas into units of pixels)
:param truncation: float. Truncate the filter at this many standard deviations.
Default is 4.0.
"""
self._num_gaussians = len(sigma_list)
self._sigmas_scaled = np.array(sigma_list) / pixel_scale
if supersampling_convolution is True:
self._sigmas_scaled *= supersampling_factor
self._fraction_list = fraction_list / np.sum(fraction_list)
assert len(self._sigmas_scaled) == len(self._fraction_list)
self._truncation = truncation
self._pixel_scale = pixel_scale
self._supersampling_factor = supersampling_factor
self._supersampling_convolution = supersampling_convolution
[docs]
def convolution2d(self, image):
"""2d convolution.
:param image: 2d numpy array, image to be convolved
:return: convolved image, 2d numpy array
"""
image_conv = None
for i in range(self._num_gaussians):
if image_conv is None:
image_conv = (
ndimage.gaussian_filter(
image,
self._sigmas_scaled[i],
mode="nearest",
truncate=self._truncation,
)
* self._fraction_list[i]
)
else:
image_conv += (
ndimage.gaussian_filter(
image,
self._sigmas_scaled[i],
mode="nearest",
truncate=self._truncation,
)
* self._fraction_list[i]
)
return image_conv
[docs]
def re_size_convolve(self, image_low_res, image_high_res):
"""
:param image_high_res: supersampled image/model to be convolved on a regular pixel grid
:return: convolved and re-sized image
"""
if self._supersampling_convolution is True:
image_high_res_conv = self.convolution2d(image_high_res)
image_resized_conv = image_util.re_size(
image_high_res_conv, self._supersampling_factor
)
else:
image_resized_conv = self.convolution2d(image_low_res)
return image_resized_conv
[docs]
def pixel_kernel(self, num_pix):
"""Computes a pixelized kernel from the MGE parameters.
:param num_pix: int, size of kernel (odd number per axis)
:return: pixel kernel centered
"""
from lenstronomy.LightModel.Profiles.gaussian import MultiGaussian
mg = MultiGaussian()
x, y = util.make_grid(numPix=num_pix, deltapix=self._pixel_scale)
kernel = mg.function(x, y, amp=self._fraction_list, sigma=self._sigmas_scaled)
kernel = util.array2image(kernel)
return kernel / np.sum(kernel)
[docs]
@export
class FWHMGaussianConvolution(object):
"""Uses a two-dimensional Gaussian function with same FWHM of given kernel as
approximation."""
[docs]
def __init__(self, kernel, truncation=4):
"""
:param kernel: 2d kernel
:param truncation: sigma scaling of kernel truncation
"""
fwhm = kernel_util.fwhm_kernel(kernel)
self._sigma = util.fwhm2sigma(fwhm)
self._truncation = truncation
[docs]
def convolution2d(self, image):
"""2d convolution.
:param image: 2d numpy array, image to be convolved
:return: convolved image, 2d numpy array
"""
image_conv = ndimage.filters.gaussian_filter(
image, self._sigma, mode="nearest", truncate=self._truncation
)
return image_conv
[docs]
@export
class MGEConvolution(object):
"""Approximates a 2d kernel with an azimuthal Multi-Gaussian expansion."""
[docs]
def __init__(self, kernel, pixel_scale, order=1):
"""
:param kernel: 2d convolution kernel (centered, odd axis number)
:param order: order of Multi-Gaussian Expansion
"""
amps, sigmas, norm = kernel_util.mge_kernel(kernel, order=order)
# make instance o MultiGaussian convolution kernel
self._mge_conv = MultiGaussianConvolution(
sigma_list=sigmas * pixel_scale,
fraction_list=np.array(amps) / np.sum(amps),
pixel_scale=pixel_scale,
truncation=4,
)
self._kernel = kernel
# store difference between MGE approximation and real kernel
[docs]
def convolution2d(self, image):
"""
:param image:
:return:
"""
return self._mge_conv.convolution2d(image)
[docs]
def kernel_difference(self):
"""
:return: difference between true kernel and MGE approximation
"""
kernel_mge = self._mge_conv.pixel_kernel(num_pix=len(self._kernel))
return self._kernel - kernel_mge