import numpy as np
from tqdm import tqdm
import lenstronomy.Util.util as util
__all__ = ["PixelatedSourceReconstruction"]
[docs]
class PixelatedSourceReconstruction(object):
"""This class defines useful functions for pixelated source plane reconstruction in
gravitational lensing. It handles initialization of data, lens model, and PSF
kernel, and provides methods for generating the M and b matrices (definition see
'placeholder for Nan Zhang's paper) for source reconstruction based on different
likelihood methods.
All sparse matrices (sp) in this class are represented as a list of lists:
[[y_coord, x_coord, pixel_value], ...], where [y_coord, x_coord] are
the pixel coordinates and pixel_value is the corresponding pixel's value.
"""
[docs]
def __init__(
self, data_class, psf_class, lens_model_class, source_pixel_grid_class
):
"""Initializes the PixelatedSourceReconstruction class. This sets up the
necessary data, PSF, lens model, and source grid for subsequent source
reconstruction matrix generation.
:param data_class: ImageData() class instance (for the observed image data)
:param psf_class: PSF() class instance (for the observed image data)
:param lens_model_class: LensModel class instance
:param source_pixel_grid_class: PixelGrid() class instance (defining the source plane grid)
:raises ValueError:
- If the source pixel grid has rotational components or non-uniform pixel widths.
- If the PSF kernel size is improperly sized for interferometric likelihood methods.
"""
self._num_pix = data_class.num_pixel_axes[0]
self._image_data = data_class.data
self._noise_rms = data_class.background_rms
self._C_D = data_class.C_D
self._primary_beam = data_class.primary_beam
self._logL_method = data_class.likelihood_method()
# prepare for the rayshooting
self._x_grid_data, self._y_grid_data = data_class.pixel_coordinates
self._lens_model_class = lens_model_class
self._source_grid_class = source_pixel_grid_class
# Validate source grid properties: no rotation and uniform pixel width
transform_pix2angle_source = self._source_grid_class.transform_pix2angle
if (
transform_pix2angle_source[0, 1] != 0
or transform_pix2angle_source[1, 0] != 0
or transform_pix2angle_source[0, 0] != transform_pix2angle_source[1, 1]
):
raise ValueError(
"Source grid must be non-rotational and have uniform pixel width along x and y axes. "
"Ensure off-diagonal elements of 'transform_pix2angle_source' are zero "
"and diagonal elements are equal."
)
self._nx_source, self._ny_source = self._source_grid_class.num_pixel_axes
self._num_pixel_source = self._source_grid_class.num_pixel
self._pixel_width_source = self._source_grid_class.pixel_width
self._source_min_x, self._source_min_y = self._source_grid_class.radec_at_xy_0
self._ratio_data_pixel_source_pixel = (
data_class.pixel_area / source_pixel_grid_class.pixel_area
)
self._kernel = psf_class.kernel_point_source
self._shape_kernel = self._kernel.shape
# Validate PSF kernel size specifically for interferometric likelihood
if self._logL_method == "interferometry_natwt":
for check_dim in range(2):
if self._shape_kernel[check_dim] < 2 * self._num_pix - 1:
raise ValueError(
"PSF kernel size must be at least (2 * num_pix - 1) "
"in each dimension for interferometry_natwt likelihood."
)
[docs]
def generate_M_b(self, kwargs_lens, verbose=False, show_progress=True):
"""Generates the M matrix and the b vector for source reconstruction based on
the selected likelihood method. Definitions of the M and b is given by
(placeholder for Nan Zhang's paper).
:param kwargs_lens: List of keyword arguments for the lens_model_class.
:param verbose: If True, print progress messages during matrix generation steps.
Defaults to False.
:param show_progress: If True, show progress bar of generating the M matrix.
Defaults to True.
:returns: (M, b) tuple, where M is the matrix and b is the vector.
"""
if verbose:
# Print the total number of source pixels in the defined rectangular source region.
print(
"number of source pixels:",
self._source_grid_class.num_pixel,
"(x axis:",
self._source_grid_class.num_pixel_axes[0],
"pixels; ",
"y axis:",
self._source_grid_class.num_pixel_axes[1],
"pixels)",
)
print("likelihood method:", self._logL_method)
if self._logL_method == "diagonal":
M, b = self.generate_M_b_diagonal_likelihood(
kwargs_lens, verbose, show_progress
)
elif self._logL_method == "interferometry_natwt":
M, b = self.generate_M_b_interferometry_natwt_likelihood(
kwargs_lens, verbose, show_progress
)
return M, b
[docs]
def generate_M_b_diagonal_likelihood(
self, kwargs_lens, verbose=False, show_progress=True
):
"""Generates M and b matrices assuming spatially uncorrelated noise with uniform
RMS across the image. This approach is typically used for CCD image data.
This method performs lensing, convolution, and then computes M and b.
:param kwargs_lens: List of keyword arguments for the lens_model_class.
:param verbose: If True, print progress messages during matrix generation steps.
Defaults to False.
:param show_progress: If True, show progress bar of generating the M matrix.
Defaults to True.
:returns: (M, b) tuple, where M is the matrix and b is the vector.
"""
if verbose:
print("Step 1: Lensing the source pixels")
lensed_sp = self.lens_pixel_source_of_a_rectangular_region(kwargs_lens)
if verbose:
print("Step 1: Finished!")
if verbose:
print("Step 2: Convolve the lensed pixels")
N_lensed = len(lensed_sp)
lensed_pixel_conv_set = np.zeros((N_lensed, self._num_pix, self._num_pix))
for i in range(N_lensed):
lensed_pixel_conv_set[i] = self.sparse_convolution(
lensed_sp[i], self._kernel
)
if verbose:
print("Step 2: Finished!")
if verbose:
print("Step 3: Compute the matrix M and vector b")
M = np.zeros((N_lensed, N_lensed))
b = np.zeros((N_lensed))
for i in tqdm(
range(N_lensed),
desc="Running (iteration times vary)",
disable=not show_progress,
):
b[i] = np.sum(lensed_pixel_conv_set[i] * self._image_data / self._C_D)
for j in range(N_lensed):
if j < i:
M[i, j] = M[j, i] # Exploit symmetry
else:
M[i, j] = np.sum(
lensed_pixel_conv_set[i] * lensed_pixel_conv_set[j] / self._C_D
)
if verbose:
print("Step 3: Finished!")
return M, b
[docs]
def generate_M_b_interferometry_natwt_likelihood(
self, kwargs_lens, verbose=False, show_progress=True
):
"""Generates the M and b matrices for interferometric data with natural
weighting.
This method integrates the convolution step implicitly via specialized sparse
product functions.
:param kwargs_lens: List of keyword arguments for the lens_model_class.
:param verbose: If True, print progress messages during matrix generation steps.
Defaults to False.
:param show_progress: If True, show progress bar of generating the M matrix.
Defaults to True.
:returns: (M, b) tuple, where M is the matrix and b is the vector.
"""
if verbose:
print("Step 1: Lensing the source pixels")
lensed_sp = self.lens_pixel_source_of_a_rectangular_region(kwargs_lens)
if verbose:
print("Step 1: Finished!")
if verbose:
print(
"Step 2: Compute the matrix M and vector b (including the convolution step)"
)
N_lensed = len(lensed_sp)
M = np.zeros((N_lensed, N_lensed))
b = np.zeros((N_lensed))
for i in tqdm(
range(N_lensed),
desc="Running (iteration times vary)",
disable=not show_progress,
):
b[i] = self.sum_sparse_elementwise_product(lensed_sp[i], self._image_data)
pixel_lensed_convolved = self.sparse_convolution(
lensed_sp[i], self._kernel
) # convolution
for j in range(N_lensed):
if j < i:
M[i, j] = M[j, i] # Exploit symmetry
else:
M[i, j] = self.sum_sparse_elementwise_product(
lensed_sp[j], pixel_lensed_convolved
)
b /= self._noise_rms**2
M /= self._noise_rms**2
if verbose:
print("Step 2: Finished!")
return M, b
[docs]
def lens_pixel_source_of_a_rectangular_region(self, kwargs_lens):
"""Maps image plane pixels to source plane pixels within a specified rectangular
source grid, considering lensing deflections and applying bilinear
interpolation.
:param kwargs_lens: List of keyword arguments for the lens_model_class.
:returns: A list of lists. Each element in the outer list corresponds to a single source pixel
within the defined source grid (ordered by their linear index). Each inner list contains
`[y_coord, x_coord, weight]` tuples, indicating that the source pixel at this index
contributes with `weight` to the image plane pixel at `(y_coord, x_coord)` when lensed.
:rtype: list
"""
lensed_pixel_sp = [[] for _ in range(self._num_pixel_source)]
beta_x_grid_2d, beta_y_grid_2d = self._lens_model_class.ray_shooting(
self._x_grid_data, self._y_grid_data, kwargs=kwargs_lens
)
# Calculate integer pixel indices (floor/ceiling) in the source plane
x_floor = np.floor(
(beta_x_grid_2d - self._source_min_x) / self._pixel_width_source
).astype(int)
x_ceiling = x_floor + 1
y_floor = np.floor(
(beta_y_grid_2d - self._source_min_y) / self._pixel_width_source
).astype(int)
y_ceiling = y_floor + 1
# Calculate fractional pixel offsets for bilinear interpolation
delta_x_pixel = (
beta_x_grid_2d - self._source_min_x
) / self._pixel_width_source - x_floor
delta_y_pixel = (
beta_y_grid_2d - self._source_min_y
) / self._pixel_width_source - y_floor
# Compute bilinear interpolation weights
w00 = (1 - delta_x_pixel) * (1 - delta_y_pixel)
w10 = delta_x_pixel * (1 - delta_y_pixel)
w01 = delta_y_pixel * (1 - delta_x_pixel)
w11 = delta_x_pixel * delta_y_pixel
# Apply the ratio (data image pixel area / source grid pixel area) to ensure the flux conservation
w00 *= self._ratio_data_pixel_source_pixel
w10 *= self._ratio_data_pixel_source_pixel
w01 *= self._ratio_data_pixel_source_pixel
w11 *= self._ratio_data_pixel_source_pixel
# Apply primary beam modulation if specified
if self._primary_beam is not None:
w00 *= self._primary_beam
w10 *= self._primary_beam
w01 *= self._primary_beam
w11 *= self._primary_beam
for i in range(self._num_pix):
for j in range(self._num_pix):
# Skip if the lensed image pixel falls entirely outside the source region
if (
x_ceiling[i][j] < 0
or x_floor[i][j] > (self._nx_source - 1)
or y_ceiling[i][j] < 0
or y_floor[i][j] > (self._ny_source - 1)
):
continue
# Calculate linear indices of the four potential source pixels
source_pixel_index00 = self._nx_source * (y_floor[i][j]) + x_floor[i][j]
source_pixel_index10 = source_pixel_index00 + 1
source_pixel_index01 = source_pixel_index00 + self._nx_source
source_pixel_index11 = source_pixel_index01 + 1
# Flags for boundary checking
check00, check01, check10, check11 = True, True, True, True
# Adjust flags for contributions near the boundary
if x_floor[i][j] == -1:
check00 = False
check01 = False
elif x_ceiling[i][j] == self._nx_source:
check10 = False
check11 = False
if y_floor[i][j] == -1:
check00 = False
check10 = False
elif y_ceiling[i][j] == self._ny_source:
check11 = False
check01 = False
# Append the contribution of each source pixel to the image pixel (i,j) with weight.
if check00:
lensed_pixel_sp[source_pixel_index00].append([i, j, w00[i][j]])
if check10:
lensed_pixel_sp[source_pixel_index10].append([i, j, w10[i][j]])
if check01:
lensed_pixel_sp[source_pixel_index01].append([i, j, w01[i][j]])
if check11:
lensed_pixel_sp[source_pixel_index11].append([i, j, w11[i][j]])
return lensed_pixel_sp
[docs]
def lens_an_image_by_rayshooting(self, kwargs_lens, source_image):
"""Lenses a pixelated source plane image to the image plane using ray-shooting
and bilinear interpolation. The imput image should have the same dimension and
coordinates defined by source_pixel_grid_class.
This method works by iterating through each pixel in the image plane, ray-shooting back to the source
plane to find the corresponding source coordinate, and then interpolating the flux from the input
source image at that coordinate. This provides an approximate lensed image, as flux between exact
source pixels is derived via interpolation. Note that the primary beam will NOT be applied on the
lensed image in this function.
:param kwargs_lens: List of keyword arguments for the lens_model_class.
:param image: 2D NumPy array representing the pixelated source plane image. Expected to have
dimensions defined by source_pixel_grid_class.
:type image: numpy.ndarray
:returns: 2D NumPy array representing the lensed image in the image plane.
:rtype: numpy.ndarray
:raises ValueError: If the input `image` dimensions do not match the dimensions of the
defined source pixel grid (`self._ny_source`, `self._nx_source`).
"""
ny_source_check, nx_source_check = np.shape(source_image)
if nx_source_check != self._nx_source or ny_source_check != self._ny_source:
raise ValueError(
f"Input image size ({ny_source_check}, {nx_source_check}) must match the defined "
f"source grid class dimension ({self._ny_source}, {self._nx_source})."
)
lensed_image = np.zeros((self._num_pix, self._num_pix))
beta_x_grid_2d, beta_y_grid_2d = self._lens_model_class.ray_shooting(
self._x_grid_data, self._y_grid_data, kwargs=kwargs_lens
)
beta_x_grid = util.image2array(beta_x_grid_2d)
beta_y_grid = util.image2array(beta_y_grid_2d)
# Iterate through each pixel in the image plane (i, j)
for i in range(self._num_pix):
for j in range(self._num_pix):
# Calculate the linear index for accessing pre-computed ray-shot coordinates
n_beta = (
i * self._num_pix + j
) # Assuming beta_x_grid and beta_y_grid are flattened row-major
# Get the source plane angular coordinates (beta_x, beta_y) corresponding
# to the current image plane pixel (i,j) after ray-shooting.
cor_beta_x = beta_x_grid[n_beta]
cor_beta_y = beta_y_grid[n_beta]
# Convert source plane angular coordinates to integer pixel indices (n_y, n_x)
# within the source grid, relative to self._minx, self._miny.
n_x = np.floor(
(cor_beta_x - self._source_min_x) / self._pixel_width_source
).astype(int)
n_y = np.floor(
(cor_beta_y - self._source_min_y) / self._pixel_width_source
).astype(int)
# If the ray shoots outside the defined source image boundaries, set flux to zero
if (
n_x > self._nx_source - 1
or n_y > self._ny_source - 1
or n_x < 0
or n_y < 0
):
lensed_image[i, j] = 0
else:
# Calculate bilinear interpolation weights for the four surrounding source pixels.
# These weights depend on the sub-pixel position of (cor_beta_y, cor_beta_x)
# within the source pixel (n_y, n_x).
weight_upper_left = (
np.abs(
self._source_min_y
+ n_y * self._pixel_width_source
+ self._pixel_width_source
- cor_beta_y
)
* (
np.abs(
self._source_min_x
+ n_x * self._pixel_width_source
+ self._pixel_width_source
- cor_beta_x
)
)
/ (self._pixel_width_source**2)
)
weight_upper_right = (
np.abs(
self._source_min_y
+ n_y * self._pixel_width_source
+ self._pixel_width_source
- cor_beta_y
)
* (
np.abs(
self._source_min_x
+ n_x * self._pixel_width_source
- cor_beta_x
)
)
/ (self._pixel_width_source**2)
)
weight_lower_left = (
np.abs(
self._source_min_x
+ n_x * self._pixel_width_source
+ self._pixel_width_source
- cor_beta_x
)
* (
np.abs(
self._source_min_y
+ n_y * self._pixel_width_source
- cor_beta_y
)
)
/ (self._pixel_width_source**2)
)
weight_lower_right = (
np.abs(
self._source_min_x
+ n_x * self._pixel_width_source
- cor_beta_x
)
* (
np.abs(
self._source_min_y
+ n_y * self._pixel_width_source
- cor_beta_y
)
)
/ (self._pixel_width_source**2)
)
# Interpolate flux from the source image using the calculated weights
lensed_image[i, j] = source_image[n_y, n_x] * weight_upper_left
if n_x + 1 < self._nx_source:
lensed_image[i, j] += (
source_image[n_y, n_x + 1] * weight_upper_right
)
if n_y + 1 < self._ny_source:
lensed_image[i, j] += (
source_image[n_y + 1, n_x] * weight_lower_left
)
if n_x + 1 < self._nx_source:
lensed_image[i, j] += (
source_image[n_y + 1, n_x + 1] * weight_lower_right
)
# Apply the ratio (data image pixel area / source grid pixel area) to ensure the flux conservation
lensed_image *= self._ratio_data_pixel_source_pixel
return lensed_image
[docs]
def sparse_to_array(self, sparse):
"""Converts a sparse image representation (list of `[y_coord, x_coord, value]`
tuples) to a 2D NumPy array.
:param sparse: A list representing non-zero elements of the sparse image.
:returns: A 2D NumPy array representing the full image.
:rtype: numpy.ndarray
"""
image = np.zeros((self._num_pix, self._num_pix))
num_of_elements = len(sparse)
for i in range(num_of_elements):
image[sparse[i][0], sparse[i][1]] = sparse[i][2]
return image
[docs]
def sum_sparse_elementwise_product(self, sparse, ordinary):
"""Computes the element-wise sum of products between a sparse matrix and a dense
2D NumPy array image.
:param sparse: Sparse matrix representation (list of `[y_coord, x_coord, value]`
tuples).
:param ordinary: A 2D NumPy array (dense matrix).
:returns: The sum of the element-wise products.
:rtype: float
"""
sum_temp = 0
num_element = len(sparse)
for i in range(num_element):
sum_temp += sparse[i][2] * ordinary[sparse[i][0], sparse[i][1]]
return sum_temp
[docs]
def sparse_convolve_and_dot_product(self, sp1, sp2, kernel=None):
"""Computes the convolution product of two sparse matrices using a given kernel.
Equivalent to `(sp1 * kernel) . (sp2)` where '*' is convolution and '.' is dot
product.
:param sp1: First sparse matrix representation.
:param sp2: Second sparse matrix representation.
:param kernel: The 2D PSF kernel (NumPy array). Assumed to be square with odd dimensions,
with its center at the central pixel. If None, `self._kernel` is used
:returns: The result of the convolution product.
:rtype: float
"""
if kernel is None:
kernel = self._kernel
inner_product = 0
num_elements_1 = len(sp1)
num_elements_2 = len(sp2)
kernel_center = int(
len(kernel) / 2
) # Assumes kernel is square and has odd dimensions
for i in range(num_elements_1):
for j in range(num_elements_2):
delta_y = sp2[j][0] - sp1[i][0]
delta_x = sp2[j][1] - sp1[i][1]
if (
kernel_center + delta_y >= 0
and kernel_center + delta_y < self._shape_kernel[0]
and kernel_center + delta_x >= 0
and kernel_center + delta_x < self._shape_kernel[1]
):
inner_product += (
sp1[i][2]
* sp2[j][2]
* kernel[kernel_center + delta_y, kernel_center + delta_x]
)
return inner_product
[docs]
def sparse_convolution(self, sp, kernel=None):
"""Performs convolution of a sparse matrix with a given kernel.
:param sp: Sparse matrix representation.
:param kernel: The 2D PSF kernel (NumPy array). Assumed to be square with odd dimensions,
with its center at the central pixel. If None, `self._kernel` is used
:returns: A 2D NumPy array representing the convolved image.
:rtype: numpy.ndarray
"""
if kernel is None:
kernel = self._kernel
kernel_center = int(
len(kernel) / 2
) # Assumes kernel is square and has odd dimensions
convolved = np.zeros((self._num_pix, self._num_pix))
num_element_sparse = len(sp)
for i in range(num_element_sparse):
y_sp, x_sp, val_sp = sp[i][0], sp[i][1], sp[i][2]
# Calculate slice indices for the kernel relative to the sparse element
slice_y_start = np.max([kernel_center - y_sp, 0])
slice_y_end = np.min(
[kernel_center - y_sp + self._num_pix, self._shape_kernel[0]]
)
slice_x_start = np.max([kernel_center - x_sp, 0])
slice_x_end = np.min(
[kernel_center - x_sp + self._num_pix, self._shape_kernel[1]]
)
convolved_image_y_start = y_sp - np.min([y_sp, kernel_center])
convolved_image_y_end = y_sp + np.min(
[self._num_pix - y_sp, self._shape_kernel[0] - kernel_center]
)
convolved_image_x_start = x_sp - np.min([x_sp, kernel_center])
convolved_image_x_end = x_sp + np.min(
[self._num_pix - x_sp, self._shape_kernel[1] - kernel_center]
)
convolved[
convolved_image_y_start:convolved_image_y_end,
convolved_image_x_start:convolved_image_x_end,
] += (
val_sp * kernel[slice_y_start:slice_y_end, slice_x_start:slice_x_end]
)
return convolved