Source code for martini.beams

"""Provide classes to represent the beam of a radio telescope."""

from abc import ABCMeta, abstractmethod
from collections.abc import Callable
import scipy.interpolate
import numpy as np
import astropy.units as U
from martini.datacube import DataCube


class _BaseBeam(object):
    """
    Implement a radio telescope beam model.

    Classes inheriting from _BaseBeam must implement three methods:
    :meth:`~martini.beams._BaseBeam.f_kernel`,
    :meth:`~martini.beams._BaseBeam.kernel_size_px` and
    :meth:`~martini.beams._BaseBeam.init_beam_header`.

    :meth:`~martini.beams._BaseBeam.f_kernel` should return a function accepting
    two arguments, the RA and Dec offsets from the beam centroid (provided with
    units of arcsec), and returning the beam amplitude at that location.

    :meth:`~martini.beams._BaseBeam.kernel_px_size` should return a 2-tuple
    containing the half-size (x, y) of the beam image that will be initialized,
    in pixels.

    :meth:`~martini.beams._BaseBeam.init_beam_header` should be defined if the
    major/minor axis FWHM of the beam and its position angle are not defined when
    the beam object is initialized, for instance if modelling a particular telescope
    this function can be used to set the (constant) parameters of the beam of that
    particular facility.

    Parameters
    ----------
    bmaj : ~astropy.units.Quantity
        :class:`~astropy.units.Quantity`, with dimensions of angle.
        Beam major axis (FWHM) angular size.

    bmin : ~astropy.units.Quantity
        :class:`~astropy.units.Quantity`, with dimensions of angle.
        Beam minor axis (FWHM) angular size.

    bpa : ~astropy.units.Quantity
        :class:`~astropy.units.Quantity`, with dimesions of angle.
        Beam position angle (East from North).

    See Also
    --------
    martini.beams.GaussianBeam
    """

    __metaclass__ = ABCMeta

    bmaj: U.Quantity[U.arcsec]
    bmin: U.Quantity[U.arcsec]
    bpa: U.Quantity[U.deg]
    px_size: U.Quantity[U.arcsec] | None
    kernel: U.Quantity[U.dimensionless_unscaled] | None
    area: U.Quantity[U.arcsec**2]
    vel: U.Quantity[U.km / U.s]
    ra: U.Quantity[U.deg]
    dec: U.Quantity[U.deg]

    def __init__(
        self,
        bmaj: U.Quantity[U.arcsec] = 15.0 * U.arcsec,
        bmin: U.Quantity[U.arcsec] = 15.0 * U.arcsec,
        bpa: U.Quantity[U.deg] = 0.0 * U.deg,
    ) -> None:
        # some beams need information from the datacube; in this case make
        # their call to _BaseBeam.__init__ with bmaj == bmin == bpa == None
        # and define a init_beam_header, to be called after the ra, dec,
        # vel, etc. of the datacube are known
        self.bmaj = bmaj
        self.bmin = bmin
        self.bpa = bpa
        self.px_size = None
        self.kernel = None

        # since bmaj, bmin are FWHM, need to include conversion to
        # gaussian-equivalent width (2sqrt(2log2)sigma = FWHM), and then
        # A = 2pi * sigma_maj * sigma_min = pi * b_maj * b_min / 4 / log2
        self.area = (np.pi * self.bmaj * self.bmin) / 4 / np.log(2)

        return

    def needs_pad(self) -> tuple[int, int]:
        """
        Determine the padding of the datacube required by the beam.

        The beam should be padded enough to prevent edge effects during convolution.

        Returns
        -------
        tuple
            2-tuple, each element an integer, containing pad dimensions (x, y).
        """
        if self.kernel is None:
            raise RuntimeError("Beam kernel not initialized.")
        return self.kernel.shape[0] // 2, self.kernel.shape[1] // 2

    def init_kernel(self, datacube: DataCube) -> None:
        """
        Calculate the required size of the beam image.

        Parameters
        ----------
        datacube : ~martini.datacube.DataCube
            Data cube to use, cube size is required for pixel size, position &
            velocity centroids.
        """
        self.px_size = datacube.px_size
        self.vel = datacube.spectral_centre
        self.ra = datacube.ra
        self.dec = datacube.dec
        if (self.bmaj is None) or (self.bmin is None) or (self.bpa is None):
            self.init_beam_header()
        npx_x, npx_y = self.kernel_size_px()
        if self.px_size is not None:
            px_size_unit = self.px_size.unit
        else:
            raise RuntimeError("Beam pixel size not initialized.")
        px_edges_x = np.arange(-npx_x - 0.5, npx_x + 0.50001, 1) * self.px_size
        px_edges_y = np.arange(-npx_y - 0.5, npx_y + 0.50001, 1) * self.px_size
        # Elliptical Gaussian has no analytic surface integral in cartesian coordinates
        # and other beam shapes presumably much worse, so let's make a spline interpolator
        # based on a fine sampling of the beam function and integrate that.
        fine_sample_x = np.arange(-npx_x - 0.5, npx_x + 0.501, 0.1) * self.px_size
        fine_sample_y = np.arange(-npx_y - 0.5, npx_y + 0.501, 0.1) * self.px_size
        # set meshgrid indexing to avoid unintentional transpose
        rbs = scipy.interpolate.RectBivariateSpline(
            fine_sample_x.to_value(px_size_unit),
            fine_sample_y.to_value(px_size_unit),
            self.f_kernel()(
                *np.meshgrid(fine_sample_x, fine_sample_y, indexing="ij")
            ).to_value(px_size_unit**-2),
            kx=3,
            ky=3,
        )
        # rbs.integral only evaluates a point at a time, resort to np.vectorize
        xgrid, ygrid = np.meshgrid(
            px_edges_x.to_value(px_size_unit), px_edges_y.to_value(px_size_unit)
        )
        # f_kernel is in px_size_unit ** -2, xgrid and ygrid are in px_size_unit so 2D
        # integral is dimensionless:
        self.kernel = (
            np.vectorize(rbs.integral)(
                xgrid[1:, :-1], xgrid[1:, 1:], ygrid[:-1, 1:], ygrid[1:, 1:]
            )
            * U.dimensionless_unscaled
        )

        # can turn 2D beam into a 3D beam here; use above for central channel
        # then shift in frequency up and down for other channels
        # then probably need to adjust convolution step to do the 2D
        # convolution on a stack

        return

    @abstractmethod
    def f_kernel(
        self,
    ) -> Callable[[float | np.ndarray, float | np.ndarray], U.Quantity]:
        """
        Return a function defining the beam amplitude as a function of position.

        The function returned by this method should accept two parameters, the
        RA and Dec offset from the beam centroid, and return the beam amplitude
        at that position. The offsets are provided as :class:`~astropy.units.Quantity`
        objects with dimensions of angle (arcsec).
        """
        pass  # pragma: no cover

    @abstractmethod
    def kernel_size_px(self) -> tuple[int, int]:
        """
        Return a 2-tuple specifying the half-size of the beam image to be initialized.

        Size is in pixels.
        """
        pass  # pragma: no cover

    @abstractmethod
    def init_beam_header(self) -> None:
        """
        Set beam major/minor axis lengths and position angle.

        This method is optional, and only needs to be defined if these
        parameters are not specified in the call to the ``__init__`` method of the
        derived class.
        """
        pass  # pragma: no cover


[docs] class GaussianBeam(_BaseBeam): """ Implement a Gaussian beam model. Parameters ---------- bmaj : ~astropy.units.Quantity :class:`~astropy.units.Quantity`, with dimensions of angle. Beam major axis (FWHM) angular size. bmin : ~astropy.units.Quantity :class:`~astropy.units.Quantity`, with dimensions of angle. Beam minor axis (FWHM) angular size. bpa : ~astropy.units.Quantity :class:`~astropy.units.Quantity`, with dimensions of angle. Beam position angle (East of North). truncate : float Number of FWHM at which to truncate the beam image. """ truncate: float def __init__( self, bmaj: U.Quantity[U.arcsec] = 15.0 * U.arcsec, bmin: U.Quantity[U.arcsec] = 15.0 * U.arcsec, bpa: U.Quantity[U.deg] = 0.0 * U.deg, truncate: float = 4.0, ) -> None: self.truncate = truncate super().__init__(bmaj=bmaj, bmin=bmin, bpa=bpa) return
[docs] def f_kernel( self, ) -> Callable[[float | np.ndarray, float | np.ndarray], U.Quantity]: """ Return a function defining the beam amplitude as a function of position. The model implemented is a 2D Gaussian with FWHM's specified by ``bmaj`` and ``bmin`` and orientation by ``bpa``. Returns ------- callable Accepts 2 arguments (both of :class:`~numpy.ndarray` type) and return an :class:`~numpy.ndarray` of corresponding size. """ def fwhm_to_sigma(fwhm: U.Quantity[U.deg]) -> U.Quantity[U.deg]: """ Convert full-width at half-maximum to Gaussian sigma. Parameters ---------- fwhm : ~astropy.units.Quantity Full-width at half-maximum. """ return fwhm / (2.0 * np.sqrt(2.0 * np.log(2.0))) sigmamaj = fwhm_to_sigma(self.bmaj) # arcsec sigmamin = fwhm_to_sigma(self.bmin) # arcsec a = np.power(np.cos(self.bpa), 2) / (2.0 * np.power(sigmamin, 2)) + np.power( np.sin(self.bpa), 2 ) / (2.0 * np.power(sigmamaj, 2)) # signs set for CCW rotation (PA) b = -np.sin(2.0 * self.bpa) / (4 * np.power(sigmamin, 2)) + np.sin( 2.0 * self.bpa ) / (4 * np.power(sigmamaj, 2)) c = np.power(np.sin(self.bpa), 2) / (2.0 * np.power(sigmamin, 2)) + np.power( np.cos(self.bpa), 2 ) / (2.0 * np.power(sigmamaj, 2)) A = np.power(2.0 * np.pi * sigmamin * sigmamaj, -1) # arcsec^-2 return lambda x, y: ( A * np.exp(-a * np.power(x, 2) - 2.0 * b * x * y - c * np.power(y, 2)) )
[docs] def kernel_size_px(self) -> tuple[int, int]: """ Return a 2-tuple specifying the half-size of the beam image to be initialized. Size measured in pixels. Returns ------- tuple 2-tuple, each element an integer, specifying the kernel size (x, y) in pixels. """ size = np.ceil( (self.bmaj * self.truncate).to_value( U.pix, U.pixel_scale(self.px_size / U.pix) ) + 1 ) return size, size
[docs] def init_beam_header(self) -> None: """Do nothing - beam header initialized in __init__ for this class.""" pass # pragma: no cover