Source code for lenstronomy.Util.kernel_util

"""Routines that manipulate convolution kernels."""
import numpy as np
import copy
from scipy import ndimage
import lenstronomy.Util.util as util
import lenstronomy.Util.image_util as image_util
from lenstronomy.LightModel.Profiles.gaussian import Gaussian
from lenstronomy.LightModel.Profiles.moffat import Moffat
import lenstronomy.Util.multi_gauss_expansion as mge
from lenstronomy.GalKin import velocity_util

from lenstronomy.Util.package_util import exporter

export, __all__ = exporter()


[docs] @export def de_shift_kernel(kernel, shift_x, shift_y, iterations=20, fractional_step_size=1): """De-shifts a shifted kernel to the center of a pixel. This is performed iteratively. The input kernel is the solution of a linear interpolated shift of a sharper kernel centered in the middle of the pixel. To find the de-shifted kernel, we perform an iterative correction of proposed de-shifted kernels and compare its shifted version with the input kernel. :param kernel: (shifted) kernel, e.g. a star in an image that is not centered in the pixel grid :param shift_x: x-offset relative to the center of the pixel (sub-pixel shift) :param shift_y: y-offset relative to the center of the pixel (sub-pixel shift) :param iterations: number of repeated iterations of shifting a new de-shifted kernel and apply corrections :param fractional_step_size: float (0, 1] correction factor relative to previous proposal (can be used for stability :return: de-shifted kernel such that the interpolated shift boy (shift_x, shift_y) results in the input kernel """ nx, ny = np.shape(kernel) kernel_new = ( np.zeros((nx + 2, ny + 2)) + (kernel[0, 0] + kernel[0, -1] + kernel[-1, 0] + kernel[-1, -1]) / 4.0 ) kernel_new[1:-1, 1:-1] = kernel int_shift_x = int(round(shift_x)) frac_x_shift = shift_x - int_shift_x int_shift_y = int(round(shift_y)) frac_y_shift = shift_y - int_shift_y kernel_init = copy.deepcopy(kernel_new) kernel_init_shifted = copy.deepcopy( ndimage.shift(kernel_init, shift=[int_shift_y, int_shift_x], order=1) ) kernel_new = ndimage.shift(kernel_new, shift=[int_shift_y, int_shift_x], order=1) norm = np.sum(kernel_init_shifted) for i in range(iterations): kernel_shifted_inv = ndimage.shift( kernel_new, shift=[-frac_y_shift, -frac_x_shift], order=1 ) delta = kernel_init_shifted - kernel_norm(kernel_shifted_inv) * norm kernel_new += delta * fractional_step_size kernel_new = kernel_norm(kernel_new) * norm return kernel_new[1:-1, 1:-1]
[docs] @export def center_kernel(kernel, iterations=20): """Given a kernel that might not be perfectly centered, this routine computes its light weighted center and then moves the center in an iterative process such that it is centered. :param kernel: 2d array (odd numbers) :param iterations: int, number of iterations :return: centered kernel """ kernel = kernel_norm(kernel) nx, ny = np.shape(kernel) if nx % 2 == 0: raise ValueError("kernel needs odd number of pixels") # make coordinate grid of kernel x_grid, y_grid = util.make_grid(nx, deltapix=1, left_lower=False) # compute 1st moments to get light weighted center x_w = np.sum(kernel * util.array2image(x_grid)) y_w = np.sum(kernel * util.array2image(y_grid)) # de-shift kernel kernel_centered = de_shift_kernel( kernel, shift_x=-x_w, shift_y=-y_w, iterations=iterations ) return kernel_norm(kernel_centered)
[docs] @export def kernel_norm(kernel): """ :param kernel: :return: normalisation of the psf kernel """ norm = np.sum(np.array(kernel)) kernel /= norm return kernel
[docs] @export def subgrid_kernel(kernel, subgrid_res, odd=False, num_iter=100): """Creates a higher resolution kernel with subgrid resolution as an interpolation of the original kernel in an iterative approach. :param kernel: initial kernel :type kernel: 2d numpy array with square odd size :param subgrid_res: subgrid resolution required :type subgrid_res: integer :param odd: forces odd axis size return (-1 in size if even) :type odd: boolean :param num_iter: number of iterations in the de-shifting and enhancement :type num_iter: integer :return: kernel with higher resolution (larger) :rtype: 2d numpy array with n x subgrid size (-1 if result is even and odd=True) """ subgrid_res = int(subgrid_res) if subgrid_res == 1: return kernel nx, ny = np.shape(kernel) d_x = 1.0 / nx x_in = np.linspace(d_x / 2, 1 - d_x / 2, nx) d_y = 1.0 / nx y_in = np.linspace(d_y / 2, 1 - d_y / 2, ny) nx_new = nx * subgrid_res ny_new = ny * subgrid_res if odd is True: if nx_new % 2 == 0: nx_new -= 1 if ny_new % 2 == 0: ny_new -= 1 d_x_new = 1.0 / nx_new d_y_new = 1.0 / ny_new x_out = np.linspace(d_x_new / 2.0, 1 - d_x_new / 2.0, nx_new) y_out = np.linspace(d_y_new / 2.0, 1 - d_y_new / 2.0, ny_new) kernel_input = copy.deepcopy(kernel) kernel_subgrid = image_util.re_size_array(x_in, y_in, kernel_input, x_out, y_out) kernel_subgrid = kernel_norm(kernel_subgrid) for i in range(max(num_iter, 1)): # given a proposition, re-size it to original pixel size if subgrid_res % 2 == 0: kernel_pixel = averaging_even_kernel(kernel_subgrid, subgrid_res) else: kernel_pixel = util.averaging(kernel_subgrid, numGrid=nx_new, numPix=nx) delta = kernel - kernel_pixel temp_kernel = kernel_input + delta kernel_subgrid = image_util.re_size_array( x_in, y_in, temp_kernel, x_out, y_out ) # /norm_subgrid kernel_subgrid = kernel_norm(kernel_subgrid) kernel_input = temp_kernel # from scipy.ndimage import zoom # ratio = subgrid_res # kernel_subgrid = zoom(kernel, ratio, order=4) / ratio ** 2 # print(np.shape(kernel_subgrid)) # whatever has not been matched is added to zeroth order (in squares of the undersampled PSF) if subgrid_res % 2 == 0: return kernel_subgrid kernel_pixel = util.averaging(kernel_subgrid, numGrid=nx_new, numPix=nx) kernel_pixel = kernel_norm(kernel_pixel) delta_kernel = kernel_pixel - kernel_norm(kernel) id = np.ones((subgrid_res, subgrid_res)) delta_kernel_sub = np.kron(delta_kernel, id) / subgrid_res**2 return kernel_norm(kernel_subgrid - delta_kernel_sub)
[docs] @export def kernel_pixelsize_change(kernel, deltaPix_in, deltaPix_out): """Change the pixel size of a given kernel. :param kernel: :param deltaPix_in: :param deltaPix_out: :return: """ numPix = len(kernel) numPix_new = int(round(numPix * deltaPix_in / deltaPix_out)) if numPix_new % 2 == 0: numPix_new -= 1 x_in = np.linspace( -(numPix - 1) / 2 * deltaPix_in, (numPix - 1) / 2 * deltaPix_in, numPix ) x_out = np.linspace( -(numPix_new - 1) / 2 * deltaPix_out, (numPix_new - 1) / 2 * deltaPix_out, numPix_new, ) kernel_out = image_util.re_size_array(x_in, x_in, kernel, x_out, x_out) kernel_out = kernel_norm(kernel_out) return kernel_out
[docs] @export def cut_psf(psf_data, psf_size, normalisation=True): """Cut the psf properly. :param psf_data: image of PSF :param psf_size: size of psf :return: re-sized and re-normalized PSF """ kernel = image_util.cut_edges(psf_data, psf_size) if normalisation is True: kernel = kernel_norm(kernel) return kernel
[docs] @export def pixel_kernel(point_source_kernel, subgrid_res=7): """Converts a pixelised kernel of a point source to a kernel representing a uniform extended pixel. :param point_source_kernel: :param subgrid_res: :return: convolution kernel for an extended pixel """ kernel_subgrid = subgrid_kernel(point_source_kernel, subgrid_res, num_iter=10) kernel_size = len(point_source_kernel) kernel_pixel = np.zeros((kernel_size * subgrid_res, kernel_size * subgrid_res)) for i in range(subgrid_res): k_x = int((kernel_size - 1) / 2 * subgrid_res + i) for j in range(subgrid_res): k_y = int((kernel_size - 1) / 2 * subgrid_res + j) kernel_pixel = image_util.add_layer2image( kernel_pixel, k_x, k_y, kernel_subgrid ) kernel_pixel = util.averaging( kernel_pixel, numGrid=kernel_size * subgrid_res, numPix=kernel_size ) return kernel_norm(kernel_pixel)
[docs] @export def kernel_average_pixel(kernel_super, supersampling_factor): """Computes the effective convolution kernel assuming a uniform surface brightness on the scale of a pixel. :param kernel_super: supersampled PSF of a point source (odd number per axis :param supersampling_factor: supersampling factor (int) :return: """ kernel_sum = np.sum(kernel_super) kernel_size = int(round(len(kernel_super) / float(supersampling_factor) + 0.5)) if kernel_size % 2 == 0: kernel_size += 1 n_high = kernel_size * supersampling_factor if n_high % 2 == 0: n_high += 1 kernel_pixel = np.zeros((n_high, n_high)) for i in range(supersampling_factor): k_x = int((kernel_size - 1) / 2 * supersampling_factor + i) for j in range(supersampling_factor): k_y = int((kernel_size - 1) / 2 * supersampling_factor + j) kernel_pixel = image_util.add_layer2image( kernel_pixel, k_x, k_y, kernel_super ) if supersampling_factor % 2 == 0: kernel_pixel = averaging_even_kernel(kernel_pixel, supersampling_factor) else: kernel_pixel = util.averaging(kernel_pixel, numGrid=n_high, numPix=kernel_size) kernel_pixel /= np.sum(kernel_pixel) return kernel_pixel * kernel_sum
[docs] @export def kernel_gaussian_grid(x_grid, y_grid, sigma): """Gaussian kernel. :param x: x-coordinate grid :param y: y-coordinate grid :param sigma: standard deviation of Gaussian :return: 2d kernel """ gaussian = Gaussian() kernel = gaussian.function( x_grid, y_grid, amp=1.0, sigma=sigma, center_x=0, center_y=0 ) kernel /= np.sum(kernel) return kernel
[docs] @export def kernel_gaussian(num_pix, delta_pix, fwhm): """Gaussian kernel. :param num_pix: number of pixels :param delta_pix: pixel scale :param fwhm: full width at half maximum :return: 2d kernel """ sigma = util.fwhm2sigma(fwhm) # if kernel_numPix % 2 == 0: # kernel_numPix += 1 x_grid, y_grid = util.make_grid(num_pix, delta_pix) kernel = kernel_gaussian_grid(x_grid, y_grid, sigma) kernel = util.array2image(kernel) return kernel
[docs] @export def kernel_moffat_grid(x, y, fwhm, moffat_beta): """Moffat kernel. :param x: x-coordinate grid :param y: y-coordinate grid :param fwhm: full width at half maximum :param moffat_beta: beta of Moffat profile :return: 2d kernel """ alpha = velocity_util.moffat_fwhm_alpha(fwhm, moffat_beta) moffat = Moffat() kernel = moffat.function(x=x, y=y, amp=1, alpha=alpha, beta=moffat_beta) kernel /= np.sum(kernel) return kernel
[docs] @export def kernel_moffat(num_pix, delta_pix, fwhm, moffat_beta): """Moffat kernel. :param delta_pix: pixel scale of kernel :param num_pix: number of pixels per axis of the kernel :param fwhm: full width at half maximum :param moffat_beta: beta of Moffat profile :return: 2d kernel """ x, y = util.make_grid(numPix=num_pix, deltapix=delta_pix) kernel = kernel_moffat_grid(x, y, fwhm, moffat_beta) return kernel
[docs] @export def split_kernel( kernel_super, supersampling_kernel_size, supersampling_factor, normalized=True ): """Pixel kernel and subsampling kernel such that the convolution of both applied on an image can be performed, i.e. smaller subsampling PSF and hole in larger PSF. :param kernel_super: super-sampled kernel :param supersampling_kernel_size: size of super-sampled PSF in units of degraded pixels :param normalized: boolean, if True returns a split kernel that is area normalized=1 representing a convolution kernel :return: degraded kernel with hole and super-sampled kernel """ if supersampling_factor <= 1: raise ValueError( "To split a kernel, the supersampling_factor needs to be > 1, given %s" % supersampling_factor ) if supersampling_kernel_size % 2 == 0: raise ValueError("supersampling_kernel_size needs to be an odd number!") n_super = len(kernel_super) n_sub = supersampling_kernel_size * supersampling_factor if n_sub % 2 == 0: n_sub += 1 if n_sub > n_super: n_sub = n_super kernel_hole = copy.deepcopy(kernel_super) n_min = int((n_super - 1) / 2 - (n_sub - 1) / 2) n_max = int((n_super - 1) / 2 + (n_sub - 1) / 2 + 1) kernel_hole[n_min:n_max, n_min:n_max] = 0 kernel_hole_resized = degrade_kernel( kernel_hole, degrading_factor=supersampling_factor ) kernel_subgrid_cut = kernel_super[n_min:n_max, n_min:n_max] if normalized is True: flux_subsampled = np.sum(kernel_subgrid_cut) flux_hole = np.sum(kernel_hole_resized) if flux_hole > 0: kernel_hole_resized *= (1.0 - flux_subsampled) / np.sum(kernel_hole_resized) else: kernel_subgrid_cut /= np.sum(kernel_subgrid_cut) else: kernel_hole_resized /= supersampling_factor**2 return kernel_hole_resized, kernel_subgrid_cut
[docs] @export def degrade_kernel(kernel_super, degrading_factor): """ :param kernel_super: higher resolution kernel (odd number per axis) :param degrading_factor: degrading factor (effectively the super-sampling resolution of the kernel given :return: degraded kernel with odd axis number with the sum of the flux/values in the kernel being preserved """ if degrading_factor == 1: return kernel_super if degrading_factor % 2 == 0: kernel_low_res = averaging_even_kernel(kernel_super, degrading_factor) else: kernel_low_res = averaging_odd_kernel(kernel_super, degrading_factor) # degrading_factor**2 # multiplicative factor added when providing flux conservation kernel_low_res *= degrading_factor**2 return kernel_low_res
def averaging_odd_kernel(kernel_super, degrading_factor): """""" n_kernel = len(kernel_super) numPix = int(round(n_kernel / degrading_factor + 0.5)) if numPix % 2 == 0: numPix += 1 n_high = numPix * degrading_factor kernel_super_ = np.zeros((n_high, n_high)) i_start = int((n_high - n_kernel) / 2) kernel_super_[ i_start : i_start + n_kernel, i_start : i_start + n_kernel ] = kernel_super kernel_low_res = util.averaging(kernel_super_, numGrid=n_high, numPix=numPix) return kernel_low_res
[docs] @export def averaging_even_kernel(kernel_high_res, subgrid_res): """Makes a lower resolution kernel based on the kernel_high_res (odd numbers) and the subgrid_res (even number), both meant to be centered. :param kernel_high_res: high resolution kernel with even subsampling resolution, centered :param subgrid_res: subsampling resolution (even number) :return: averaged undersampling kernel """ n_kernel_high_res = len(kernel_high_res) n_low = int(round(n_kernel_high_res / subgrid_res + 0.5)) if n_low % 2 == 0: n_low += 1 n_high = int(n_low * subgrid_res - 1) assert n_high % 2 == 1 if n_high == n_kernel_high_res: kernel_high_res_edges = kernel_high_res else: i_start = int((n_high - n_kernel_high_res) / 2) kernel_high_res_edges = np.zeros((n_high, n_high)) kernel_high_res_edges[i_start:-i_start, i_start:-i_start] = kernel_high_res kernel_low_res = np.zeros((n_low, n_low)) # adding pixels that are fully within a single re-binned pixel for i in range(subgrid_res - 1): for j in range(subgrid_res - 1): kernel_low_res += kernel_high_res_edges[i::subgrid_res, j::subgrid_res] # adding half of a pixel that has over-lap with two pixels i = subgrid_res - 1 for j in range(subgrid_res - 1): kernel_low_res[1:, :] += ( kernel_high_res_edges[i::subgrid_res, j::subgrid_res] / 2 ) kernel_low_res[:-1, :] += ( kernel_high_res_edges[i::subgrid_res, j::subgrid_res] / 2 ) j = subgrid_res - 1 for i in range(subgrid_res - 1): kernel_low_res[:, 1:] += ( kernel_high_res_edges[i::subgrid_res, j::subgrid_res] / 2 ) kernel_low_res[:, :-1] += ( kernel_high_res_edges[i::subgrid_res, j::subgrid_res] / 2 ) # adding a quarter of a pixel value that is at the boarder of four pixels i = subgrid_res - 1 j = subgrid_res - 1 kernel_edge = kernel_high_res_edges[i::subgrid_res, j::subgrid_res] kernel_low_res[1:, 1:] += kernel_edge / 4 kernel_low_res[:-1, 1:] += kernel_edge / 4 kernel_low_res[1:, :-1] += kernel_edge / 4 kernel_low_res[:-1, :-1] += kernel_edge / 4 return kernel_low_res / subgrid_res**2
[docs] @export def cutout_source(x_pos, y_pos, image, kernelsize, shift=True): """Cuts out point source (e.g. PSF estimate) out of image and shift it to the center of a pixel. :param x_pos: :param y_pos: :param image: :param kernelsize: :return: """ if kernelsize % 2 == 0: raise ValueError("even pixel number kernel size not supported!") x_int = int(round(x_pos)) y_int = int(round(y_pos)) n = len(image) d = (kernelsize - 1) / 2 x_max = int(np.minimum(x_int + d + 1, n)) x_min = int(np.maximum(x_int - d, 0)) y_max = int(np.minimum(y_int + d + 1, n)) y_min = int(np.maximum(y_int - d, 0)) image_cut = copy.deepcopy(image[y_min:y_max, x_min:x_max]) shift_x = x_int - x_pos shift_y = y_int - y_pos if shift is True: kernel_shift = de_shift_kernel(image_cut, shift_x, shift_y, iterations=50) else: kernel_shift = image_cut kernel_final = np.zeros((kernelsize, kernelsize)) k_l2_x = int((kernelsize - 1) / 2) k_l2_y = int((kernelsize - 1) / 2) xk_min = np.maximum(0, -x_int + k_l2_x) yk_min = np.maximum(0, -y_int + k_l2_y) xk_max = np.minimum(kernelsize, -x_int + k_l2_x + n) yk_max = np.minimum(kernelsize, -y_int + k_l2_y + n) kernel_final[yk_min:yk_max, xk_min:xk_max] = kernel_shift return kernel_final
[docs] @export def fwhm_kernel(kernel): """ :param kernel: :return: """ n = len(kernel) center = (n - 1) / 2.0 I_r = image_util.radial_profile(kernel, center=[center, center]) if n % 2 == 0: raise ValueError("only works with odd number of pixels in kernel!") max_flux = kernel[int((n - 1) / 2), int((n - 1) / 2)] I_2 = max_flux / 2.0 r = np.linspace(0, (n - 1) / 2, int((n + 1) / 2)) + 0.33 for i in range(1, len(I_r)): if I_r[i] < I_2: fwhm_2 = (I_2 - I_r[i - 1]) / (I_r[i] - I_r[i - 1]) + r[i - 1] return fwhm_2 * 2 raise ValueError( "The kernel did not drop to half the max value - fwhm not determined!" )
[docs] @export def estimate_amp(data, x_pos, y_pos, psf_kernel): """Estimates the amplitude of a point source located at x_pos, y_pos. :param data: :param x_pos: :param y_pos: :param psf_kernel: :return: """ numPix_x, numPix_y = np.shape(data) x_int = int(round(x_pos - 0.49999)) y_int = int(round(y_pos - 0.49999)) # TODO: make amplitude estimate not sucebtible to rounding effects on which pixels to chose to estimate the amplitude if x_int > 2 and x_int < numPix_x - 2 and y_int > 2 and y_int < numPix_y - 2: mean_image = max(np.sum(data[y_int - 2 : y_int + 3, x_int - 2 : x_int + 3]), 0) num = len(psf_kernel) center = int((num - 0.5) / 2) mean_kernel = np.sum( psf_kernel[center - 2 : center + 3, center - 2 : center + 3] ) amp_estimated = mean_image / mean_kernel else: amp_estimated = 0 return amp_estimated
[docs] @export def mge_kernel(kernel, order=5): """Azimutal Multi-Gaussian expansion of a pixelized kernel. :param kernel: 2d numpy array :return: """ # radial average n = len(kernel) center = (n - 1) / 2.0 psf_r = image_util.radial_profile(kernel, center=[center, center]) # MGE of radial average n_r = len(psf_r) r_array = np.linspace(start=0.0, stop=n_r - 1, num=n_r) amps, sigmas, norm = mge.mge_1d(r_array, psf_r, N=order, linspace=True) return amps, sigmas, norm
[docs] @export def match_kernel_size(image, size): """Matching kernel/image to a dedicated size by either expanding the image with zeros at the edges or chopping of the edges. :param image: 2d array (square with odd number of pixels) :param size: integer (odd number) :return: image with matched size, either by cutting or by adding zeros in the outskirts """ n = len(image) if n == size: return image image_copy = copy.deepcopy(image) if n > size: return image_copy[ int((n - size) / 2) : int(n - (n - size) / 2), int((n - size) / 2) : int(n - (n - size) / 2), ] else: image_add = np.zeros((size, size)) image_add[ int((size - n) / 2) : int(size - (size - n) / 2), int((size - n) / 2) : int(size - (size - n) / 2), ] = image_copy return image_add