Source code for lenstronomy.ImSim.Numerics.numba_convolution

import numpy as np

from lenstronomy.Util import numba_util
from lenstronomy.ImSim.Numerics.partial_image import PartialImage
from lenstronomy.Util import image_util

__all__ = ["NumbaConvolution"]


[docs] class NumbaConvolution(object): """Class to convolve explicit pixels only. the convolution is inspired by pyautolens: https://github.com/Jammy2211/PyAutoLens """
[docs] def __init__( self, kernel, conv_pixels, compute_pixels=None, nopython=True, cache=True, parallel=False, memory_raise=True, ): """ :param kernel: convolution kernel in units of the image pixels provided, odd length per axis :param conv_pixels: bool array same size as data, pixels to be convolved and their light to be blurred :param compute_pixels: bool array of size of image, these pixels (if True) will get blurred light from other pixels :param nopython: bool, numba jit setting to use python or compiled. :param cache: bool, numba jit setting to use cache :param parallel: bool, numba jit setting to use parallel mode :param memory_raise: bool, if True, checks whether memory required to store the convolution kernel is within certain bounds """ # numba_util.nopython = nopython # numba_util.cache = cache # numba_util.parallel = parallel self._memory_raise = memory_raise self._kernel = kernel self._conv_pixels = conv_pixels self._nx, self._ny = np.shape(conv_pixels) if compute_pixels is None: compute_pixels = np.ones_like(conv_pixels) compute_pixels = np.array(compute_pixels, dtype=bool) assert np.shape(conv_pixels) == np.shape(compute_pixels) self._mask = compute_pixels self._partialInput = PartialImage(partial_read_bools=conv_pixels) self._partialOutput = PartialImage(partial_read_bools=compute_pixels) index_array_out = self._partialOutput.index_array index_array_input = self._partialInput.index_array kernel_shape = kernel.shape self.kernel_max_size = kernel_shape[0] * kernel_shape[1] image_index = 0 if ( self._partialInput.num_partial * self.kernel_max_size > 10**9 and self._memory_raise is True ): raise ValueError( "kernel length %s combined with data size %s requires %s memory elements, which might" "exceed the memory limit and thus gives a raise. If you wish to ignore this raise, set" " memory_raise=False" % ( self.kernel_max_size, self._partialInput.num_partial, self._partialInput.num_partial * self.kernel_max_size, ) ) self._image_frame_indexes = np.zeros( (self._partialInput.num_partial, self.kernel_max_size), dtype="int" ) self._image_frame_psfs = np.zeros( (self._partialInput.num_partial, self.kernel_max_size) ) self._image_frame_lengths = np.zeros( (self._partialInput.num_partial), dtype="int" ) for x in range(index_array_input.shape[0]): for y in range(index_array_input.shape[1]): if conv_pixels[x][y]: ( image_frame_psfs, image_frame_indexes, frame_length, ) = self._pre_compute_frame_kernel( (x, y), self._kernel[:, :], compute_pixels, index_array_out ) self._image_frame_indexes[image_index, :] = image_frame_indexes self._image_frame_psfs[image_index, :] = image_frame_psfs self._image_frame_lengths[image_index] = frame_length image_index += 1
[docs] def convolve2d(self, image): """2d convolution. :param image: 2d numpy array, image to be convolved :return: convolved image, 2d numpy array """ image_array_partial = self._partialInput.partial_array(image) conv_array = self._convolve_jit( image_array_partial, num_data=self._partialOutput.num_partial, image_frame_kernels=self._image_frame_psfs, image_frame_indexes=self._image_frame_indexes, image_frame_lengths=self._image_frame_lengths, ) conv_image = self._partialOutput.image_from_partial(conv_array) return conv_image
@staticmethod @numba_util.jit() def _pre_compute_frame_kernel(image_index, kernel, mask, index_array): """ :param image_index: (int, int) index of pixels :param kernel: kernel, 2d rectangular array :param mask: mask (size of full image) :return: frame_kernels: values of kernel frame_indexes: (int) 1d index corresponding to the pixel receiving the kernel value frame_counter: number of pixels with non-zero addition due to kernel convolution """ kernel_shape = kernel.shape i0, j0 = image_index kx, ky = kernel_shape[0], kernel_shape[1] mask_shape = index_array.shape nx, ny = mask_shape[0], mask_shape[1] kx2 = int((kx - 1) / 2) ky2 = int((ky - 1) / 2) frame_counter = 0 frame_kernels = np.zeros(kx * ky) frame_indexes = np.zeros(kx * ky) for i in range(kx): for j in range(ky): x = i0 + i - kx2 y = j0 + j - ky2 if 0 <= x < nx and 0 <= y < ny: if mask[x, y]: frame_indexes[frame_counter] = index_array[x, y] frame_kernels[frame_counter] = kernel[i, j] frame_counter += 1 return frame_kernels, frame_indexes, frame_counter @staticmethod @numba_util.jit() def _convolve_jit( image_array, num_data, image_frame_kernels, image_frame_indexes, image_frame_lengths, ): """ :param image_array: selected subset of image in 1d array conventions :param num_data: number of 1d data that get convolved light and are output :param image_frame_kernels: list of indexes that have a response for certain pixel (as a list :param image_frame_lengths: length of image_frame_kernels :return: """ conv_array = np.zeros(num_data) for image_index in range( len(image_array) ): # loop through pixels that are to be blurred value = image_array[image_index] # value of pixel that gets blurred frame_length = image_frame_lengths[ image_index ] # number of pixels that gets assigned a fraction of the convolution frame_indexes = image_frame_indexes[ image_index ] # list of 1d indexes that get added flux from the blurred image frame_kernels = image_frame_kernels[ image_index ] # values of kernel for each frame indexes for kernel_index in range( frame_length ): # loop through all pixels that are impacted by the kernel of the pixel being blurred vector_index = frame_indexes[ kernel_index ] # 1d coordinate of pixel to be added value kernel = frame_kernels[kernel_index] # kernel response of pixel conv_array[vector_index] += value * kernel # ad value to pixel return conv_array
class SubgridNumbaConvolution(object): """Class that inputs a supersampled grid and convolution kernel and computes the response on the regular grid This makes use of the regualr NumbaConvolution class as a loop through the different sub-pixel positions.""" def __init__( self, kernel_super, supersampling_factor, conv_pixels, compute_pixels=None, kernel_size=None, nopython=True, cache=True, parallel=False, ): """ :param kernel_super: convolution kernel in units of super sampled pixels provided, odd length per axis :param supersampling_factor: factor of supersampling relative to pixel grid :param conv_pixels: bool array same size as data, pixels to be convolved and their light to be blurred :param compute_pixels: bool array of size of image, these pixels (if True) will get blurred light from other pixels :param nopython: bool, numba jit setting to use python or compiled. :param cache: bool, numba jit setting to use cache :param parallel: bool, numba jit setting to use parallel mode """ self._nx, self._ny = conv_pixels.shape self._supersampling_factor = supersampling_factor # loop through the different supersampling sectors self._numba_conv_list = [] if compute_pixels is None: compute_pixels = np.ones_like(conv_pixels) compute_pixels = np.array(compute_pixels, dtype=bool) for i in range(supersampling_factor): for j in range(supersampling_factor): # compute shifted psf kernel kernel = self._partial_kernel(kernel_super, i, j) if kernel_size is not None: kernel = image_util.cut_edges(kernel, kernel_size) numba_conv = NumbaConvolution( kernel, conv_pixels, compute_pixels=compute_pixels, nopython=nopython, cache=cache, parallel=parallel, ) self._numba_conv_list.append(numba_conv) def convolve2d(self, image_high_res): """ :param image_high_res: supersampled image/model to be convolved and re-bined to regular resolution :return: convolved and re-bind image """ conv_image = np.zeros((self._nx, self._ny)) count = 0 for i in range(self._supersampling_factor): for j in range(self._supersampling_factor): image_select = self._partial_image(image_high_res, i, j) conv_image += self._numba_conv_list[count].convolve2d(image_select) count += 1 return conv_image def _partial_image(self, image_high_res, i, j): """ :param image_high_res: 2d array supersampled :param i: index of super-sampled position in first axis :param j: index of super-sampled position in second axis :return: 2d array only selected the specific supersampled position within a regular pixel """ return image_high_res[ i :: self._supersampling_factor, j :: self._supersampling_factor ] def _partial_kernel(self, kernel_super, i, j): """ :param kernel_super: supersampled kernel :param i: index of super-sampled position in first axis :param j: index of super-sampled position in second axis :return: effective kernel rebinned to regular grid resulting from the subpersampled position (i,j) """ n = len(kernel_super) kernel_size = int(round(n / float(self._supersampling_factor) + 1.5)) if kernel_size % 2 == 0: kernel_size += 1 n_match = kernel_size * self._supersampling_factor kernel_super_match = np.zeros((n_match, n_match)) delta = int((n_match - n - self._supersampling_factor) / 2) + 1 i0 = delta # index where to start kernel for i=0 j0 = delta # index where to start kernel for j=0 (should be symmetric) kernel_super_match[i0 + i : i0 + i + n, j0 + j : j0 + j + n] = kernel_super # kernel_super_match = image_util.cut_edges(kernel_super_match, numPix=n) kernel = image_util.re_size( kernel_super_match, factor=self._supersampling_factor ) return kernel