Source code for martini.martini

"""
Create mock observations of HI sources from cosmological hydrodynamical simulations.

Provides :class:`~martini.martini.Martini`, the main class of the package, and a
simplified :class:`~martini.martini.GlobalProfile` class for use when only a spectrum (no
spatial information) is desired.
"""

import warnings
import subprocess
import os
from scipy.signal import fftconvolve
import numpy as np
import astropy.units as U
from astropy.io import fits
from astropy.time import Time
from astropy.coordinates import Angle
from astropy import __version__ as astropy_version
from itertools import product
from .__version__ import __version__ as martini_version
from warnings import warn
from martini.beams import _BaseBeam
from martini.datacube import DataCube, _GlobalProfileDataCube
from martini.sources import SPHSource
from martini.sph_kernels import DiracDeltaKernel, _BaseSPHKernel
from martini.spectral_models import _BaseSpectrum
from martini.noise import _BaseNoise
from typing import TYPE_CHECKING

if TYPE_CHECKING:
    import h5py
    from matplotlib.figure import Figure

try:
    with open(os.devnull, "w") as _devnull:
        gc = subprocess.check_output(
            ["git", "describe", "--always"],
            stderr=_devnull,
            cwd=os.path.dirname(os.path.realpath(__file__)),
        )
except (subprocess.CalledProcessError, FileNotFoundError):  # pragma: no cover
    gc = b""
else:
    martini_version = martini_version + "_commit_" + gc.strip().decode()


class _BaseMartini:
    """
    Common methods for the core Martini class and related classes.

    Parameters
    ----------
    source : SPHSource
        An instance of a class derived from :class:`martini.sources.SPHSource`.
        A description of the HI emitting object, including position, geometry
        and an interface to the simulation data (SPH particle masses,
        positions, etc.). See :doc:`sub-module documentation </sources/index>`.

    datacube : ~martini.datacube.DataCube
        A :class:`~martini.datacube.DataCube` instance.
        A description of the datacube to create, including pixels, channels,
        sky position. See :doc:`sub-module documentation </datacube/index>`.

    beam : _BaseBeam, optional
        An instance of a class derived from `~martini.beams._BaseBeam`.
        A description of the beam for the simulated telescope. Given a
        description, either mathematical or as an image, the creation of a
        custom beam is straightforward. See
        :doc:`sub-module documentation </beams/index>`.

    noise : _BaseNoise, optional
        An instance of a class derived from :class:`~martini.noise._BaseNoise`.
        A description of the simulated noise. A simple Gaussian noise model is
        provided; implementation of other noise models is straightforward. See
        :doc:`sub-module documentation </noise/index>`.

    sph_kernel : _BaseSPHKernel
        An instance of a class derived from :class:`~martini.sph_kernels._BaseSPHKernel`.
        A description of the SPH smoothing kernel. Check simulation
        documentation for the kernel used in a particular simulation, and
        :doc:`SPH kernel sub-module documentation </sph_kernels/index>` for guidance.

    spectral_model : _BaseSpectrum
        An instance of a class derived from
        :class:`~martini.spectral_models._BaseSpectrum`.
        A description of the HI line produced by a particle of given
        properties. A Dirac-delta spectrum, and both fixed-width and
        temperature-dependent Gaussian line models are provided; implementing
        other models is straightforward. See
        :doc:`sub-module documentation </spectral_models/index>`.

    quiet : bool, optional
        If ``True``, suppress output to stdout.

    _prune_kwargs : dict
        Arguments to pass through to the :meth:`martini.martini.Martini._prune_particles`
        function, intended for internal use only.

    See Also
    --------
    martini.sources.sph_source.SPHSource
    martini.datacube.DataCube
    martini.beams
    martini.noise
    martini.sph_kernels
    martini.spectral_models
    martini.martini.GlobalProfile
    """

    source: SPHSource
    _datacube: DataCube
    beam: _BaseBeam | None
    noise: _BaseNoise | None
    sph_kernel: _BaseSPHKernel
    spectral_model: _BaseSpectrum
    quiet: bool

    def __init__(
        self,
        *,
        source: SPHSource,
        datacube: DataCube,
        beam: _BaseBeam | None = None,
        noise: _BaseNoise | None = None,
        sph_kernel: _BaseSPHKernel,
        spectral_model: _BaseSpectrum,
        quiet: bool = False,
        _prune_kwargs: dict = {},
    ) -> None:
        self.quiet = quiet
        self.source = source
        self._datacube = datacube
        self.beam = beam
        self.noise = noise
        self.sph_kernel = sph_kernel
        self.spectral_model = spectral_model

        if self.beam is not None:
            self.beam.init_kernel(self._datacube)
            self._datacube.add_pad(self.beam.needs_pad())

        self.source._init_skycoords()
        self.source._init_pixcoords(self._datacube)  # after datacube is padded

        self.sph_kernel._init_sm_lengths(source=self.source, datacube=self._datacube)
        self.sph_kernel._init_sm_ranges()
        self._prune_particles(
            **_prune_kwargs
        )  # prunes both source, and kernel if applicable

        return

    def init_spectra(self) -> None:
        """
        Explicitly trigger evaluation of particle spectra.

        Triggered when :meth:`~martini.martini.Martini.insert_source_in_cube` is called if
        not called explicitly before this.
        """
        if not self.quiet:
            print("Initializing spectra...")
        self.spectral_model.init_spectra(self.source, self._datacube)
        if not self.quiet:
            print("Spectra initialized.")

        return

    def _prune_particles(
        self,
        spatial: bool = True,
        spectral: bool = True,
        mass: bool = True,
        obj_type_str: str = "data cube",
    ) -> None:
        """
        Identify and remove particles that cannot contribute to the DataCube.

        This helps speed up the later calculations. Assumes the kernel is 0 at
        distances greater than the kernel size (which may differ from the
        SPH smoothing length).

        Parameters
        ----------
        spatial : bool
            If ``True``, prune particles that fall outside the spatial aperture.
        spectral : bool
            If ``True``, prune particles that fall outside the spectral bandwidth.
        mass : bool
            If ``True``, prune particles that have zero HI mass.
        obj_type_str : str
            String describing the object to be pruned for messages.
        """
        if not self.quiet:
            print(
                f"Source module contained {self.source.npart} particles with total HI"
                f" mass of {self.source.mHI_g.sum() << U.Msun:.2e}."
            )
        spectrum_half_width = self.spectral_model.half_width(self.source) / np.max(
            np.abs(np.diff(self._datacube.velocity_channel_edges))
        )
        spatial_reject_conditions = (
            (
                (
                    self.source.pixcoords[:2] + self.sph_kernel.sm_ranges[np.newaxis]
                    < 0 * U.pix
                ).any(axis=0),
                self.source.pixcoords[0] - self.sph_kernel.sm_ranges
                > (self._datacube.n_px_x + self._datacube.padx * 2) * U.pix,
                self.source.pixcoords[1] - self.sph_kernel.sm_ranges
                > (self._datacube.n_px_y + self._datacube.pady * 2) * U.pix,
                np.isnan(self.source.pixcoords[:2]).any(axis=0),
            )
            if spatial
            else tuple()
        )
        spectral_reject_conditions = (
            (
                self.source.pixcoords[2] + 4 * spectrum_half_width * U.pix < 0 * U.pix,
                self.source.pixcoords[2] - 4 * spectrum_half_width * U.pix
                > self._datacube.n_channels * U.pix,
            )
            if spectral
            else tuple()
        )
        mass_reject_conditions = (self.source.mHI_g == 0,) if mass else tuple()
        accept_mask = np.logical_not(
            np.logical_or.reduce(
                spatial_reject_conditions
                + spectral_reject_conditions
                + mass_reject_conditions
            )
        )
        self.source.apply_mask(accept_mask)
        self.sph_kernel._apply_mask(accept_mask)
        if not self.quiet:
            print(
                f"Pruned particles that will not contribute to {obj_type_str}, "
                f"{self.source.npart} particles remaining with total HI mass of "
                f"{self.source.mHI_g.sum() << U.Msun:.2e}."
            )
        return

    def _evaluate_pixel_spectrum(
        self, ij_px: tuple[int, int]
    ) -> U.Quantity[U.Jy / U.arcsec**2]:
        """
        Add up contributions of particles to the spectrum in a pixel.

        This is the main operation in the core loop of MARTINI. It is embarrassingly
        parallel. To support parallel excecution we accept storing up to a copy of the
        entire (future) datacube in one-pixel pieces. This avoids the need for concurrent
        access to the datacube by parallel processes, which would in the simplest case
        duplicate a copy of the datacube array per parallel process! In realistic use
        cases the memory overhead from the equivalent of a second datacube array should be
        minimal - memory-limited applications should be limited by the memory consumed by
        particle data, which is not duplicated in parallel execution.

        Parameters
        ----------
        ij_px : tuple
            A 2-tuple containing integers specifying the indices (i, j) of pixels in the
            grid where the spectrum should be calculated.

        Returns
        -------
        ~astropy.units.Quantity
            :class:`~astropy.units.Quantity` with dimensions of Jy per sq. arcsec.
            A 1D array containing the spectrum, whose length must match the length of the
            spectral axis of the datacube.
        """
        ij = np.array(ij_px)[..., np.newaxis] * U.pix
        mask = (
            np.abs(ij - self.source.pixcoords[:2]) <= self.sph_kernel.sm_ranges
        ).all(axis=0)
        weights = self.sph_kernel._px_weight(
            self.source.pixcoords[:2, mask] - ij, mask=mask
        )
        assert self.spectral_model.spectra is not None
        tmp = self.spectral_model.spectra[mask]
        np.multiply(tmp, weights[:, np.newaxis], out=tmp)
        insertion_values = np.sum(tmp, axis=-2)
        del tmp
        return insertion_values

    def _insert_source_in_cube(
        self,
        skip_validation: bool = False,
        progressbar: bool | None = None,
        ncpu: int = 1,
        quiet: bool | None = None,
    ) -> None:
        """
        Populate the :class:`~martini.datacube.DataCube` with flux from source particles.

        Parameters
        ----------
        skip_validation : bool, optional
            SPH kernel interpolation onto the DataCube is approximated for
            increased speed. For some combinations of pixel size, distance
            and SPH smoothing length, the approximation may break down. The
            kernel class will check whether this will occur and raise a
            RuntimeError if so. This validation can be skipped (at the cost
            of accuracy!) by setting this parameter True.

        progressbar : bool, optional
            A progress bar is shown by default. If martini was initialised with
            `quiet` set to `True`, progress bars are switched off unless explicitly
            turned on.

        ncpu : int
            Number of processes to use in main source insertion loop. Using more than
            one cpu requires the `multiprocess` module (n.b. not the same as
            `multiprocessing`).

        quiet : bool, optional
            If ``True``, suppress output to stdout. If specified, takes precedence over
            quiet parameter of class.
        """
        if self.spectral_model.spectra is None:
            self.init_spectra()

        if progressbar is None:
            progressbar = not self.quiet

        self.sph_kernel._confirm_validation(noraise=skip_validation, quiet=self.quiet)

        ij_pxs = list(
            product(
                np.arange(self._datacube._array.shape[0]),
                np.arange(self._datacube._array.shape[1]),
            )
        )

        # figure out which progressbar style to use
        from tqdm.autonotebook import tqdm

        if ncpu == 1:
            self._datacube._array += U.Quantity(
                [
                    self._evaluate_pixel_spectrum(ij_px)
                    for ij_px in tqdm(ij_pxs, disable=not progressbar)
                ]
            ).reshape(self._datacube._array.shape)
        else:
            from multiprocess.pool import ThreadPool

            # not multiprocessing, need serialization from dill not pickle

            total = len(ij_pxs)
            with ThreadPool(processes=ncpu) as pool:
                self._datacube._array += U.Quantity(
                    list(
                        tqdm(
                            pool.imap(
                                self._evaluate_pixel_spectrum,
                                ij_pxs,
                            ),
                            total=total,
                            disable=not progressbar,
                        )
                    )
                ).reshape(self._datacube._array.shape)

        self._datacube._array = self._datacube._array.to(
            U.Jy / U.arcsec**2, equivalencies=[self._datacube.arcsec2_to_pix]
        )
        pad_mask = (
            np.s_[
                self._datacube.padx : -self._datacube.padx,
                self._datacube.pady : -self._datacube.pady,
                ...,
            ]
            if self._datacube.padx > 0 and self._datacube.pady > 0
            else np.s_[...]
        )
        inserted_flux_density = np.sum(
            self._datacube._array[pad_mask] * self._datacube.px_size**2
        ).to(U.Jy)
        inserted_mass = (
            2.36e5
            * U.Msun
            * self.source.distance.to_value(U.Mpc) ** 2
            * np.sum(
                (self._datacube._array[pad_mask] * self._datacube.px_size**2)
                .sum((0, 1))
                .squeeze()
                .to_value(U.Jy)
                * np.abs(
                    np.diff(self._datacube.velocity_channel_edges.to_value(U.km / U.s))
                )
            )
        )
        if (quiet is None and not self.quiet) or (quiet is not None and not quiet):
            print(
                "Source inserted.",
                f"  Flux density in cube: {inserted_flux_density:.2e}",
                f"  Mass in cube (assuming distance {self.source.distance:.2f} and a"
                f" spatially resolved source):"
                f" {inserted_mass:.2e}",
                f"    [{inserted_mass / self.source.input_mass * 100 << 1:.0f}%"
                f" of initial source mass]",
                f"  Maximum pixel: {self._datacube._array.max():.2e}",
                "  Median non-zero pixel:"
                f" {np.median(self._datacube._array[self._datacube._array > 0]):.2e}",
                sep="\n",
            )
        return

    def reset(self) -> None:
        """Re-initialize the :class:`~martini.datacube.DataCube` with zero-values."""
        init_kwargs = {
            "n_px_x": self._datacube.n_px_x,
            "n_px_y": self._datacube.n_px_y,
            "n_channels": self._datacube.n_channels,
            "px_size": self._datacube.px_size,
            "channel_width": self._datacube.channel_width,
            "spectral_centre": self._datacube.spectral_centre,
            "ra": self._datacube.ra,
            "dec": self._datacube.dec,
            "stokes_axis": self._datacube.stokes_axis,
        }
        self._datacube = DataCube(**init_kwargs)
        if self.beam is not None:
            self._datacube.add_pad(self.beam.needs_pad())
        return

    def preview(
        self,
        max_points: int = 5000,
        fig: int = 1,
        lim: str | U.Quantity[U.deg] | None = None,
        vlim: str | U.Quantity[U.km / U.s] | None = None,
        point_scaling: str = "auto",
        title: str = "",
        save: str | None = None,
    ) -> "Figure":
        """
        Produce a figure showing the source particle coordinates and velocities.

        The data cube region is also outlined.

        Makes a 3-panel figure showing the projection of the source as it will appear in
        the mock observation. The first panel shows the particles in the y-z plane,
        coloured by the x-component of velocity (MARTINI projects the source along the
        x-axis). The second and third panels are position-velocity diagrams showing the
        x-component of velocity against the y and z coordinates, respectively. The red
        boxes drawn on the panels show the data cube extent in position and velocity at
        the distance and systemic velocity of the source.

        Parameters
        ----------
        max_points : int, optional
            Maximum number of points to draw per panel, the particles will be randomly
            subsampled if the source has more.

        fig : int, optional
            Number of the figure in matplotlib, it will be created as ``plt.figure(fig)``.

        lim : ~astropy.units.Quantity, optional
            :class:`~astropy.units.Quantity` with dimensions of angle.
            The coordinate axes extend from ``-lim`` to ``lim``. If unspecified, the
            maximum angular separation between the source centre defined by its ``ra`` and
            ``dec`` and the particle sky positions is used. Whether specified or not, the
            axes are expanded if necessary to contain the extent of the data cube.
            Alternatively, the string ``"datacube"`` can be passed. In this case the axis
            limits are set by the extent of the data cube.

        vlim : ~astropy.units.Quantity, optional
            :class:`~astropy.units.Quantity` with dimensions of speed.
            The velocity axes and colour bar extend from ``-vlim`` to ``vlim``. If
            unspecified, the maximum absolute velocity of particles in the source is
            used. Whether specified or not, the axes are expanded if necessary to contain
            the extent of the data cube. Alternatively, ``"datacube"`` can be
            passed. In this case the axis limits are set by the extent of the data cube.

        point_scaling : str, optional
            By default points are scaled in size and transparency according to their HI
            mass and the smoothing length (loosely proportional to their surface
            densities, but with different scaling to achieve a visually useful plot). For
            some sources the automatic scaling may not give useful results, using
            ``point_scaling="fixed"`` will plot points of constant size without opacity.

        title : str, optional
            A title for the figure can be provided.

        save : str, optional
            If provided, the figure is saved using ``plt.savefig(save)``. A `.png` or
            `.pdf` suffix is recommended.

        Returns
        -------
        matplotlib.figure.Figure
            The preview :class:`~matplotlib.figure.Figure`.
        """
        import matplotlib.pyplot as plt

        # pass through arguments, except save (which we will do later if desired)
        figure = self.source.preview(
            max_points=max_points,
            fig=fig,
            lim=lim if lim != "datacube" else None,
            vlim=vlim if vlim != "datacube" else None,
            point_scaling=point_scaling,
            title=title,
            save=None,
        )
        y_cube = (
            Angle(self._datacube.ra)
            .wrap_at(self.source.ra + 180 * U.deg)
            .to_value(U.deg)
        )
        z_cube = self._datacube.dec.to_value(U.deg)
        v_cube = self._datacube.spectral_centre.to_value(U.km / U.s)
        dy_cube = 0.5 * (self._datacube.px_size * self._datacube.n_px_x).to_value(
            U.deg
        )  # half-length
        dz_cube = 0.5 * (self._datacube.px_size * self._datacube.n_px_y).to_value(
            U.deg
        )  # half-length
        dv_cube = 0.5 * (
            self._datacube.channel_width * self._datacube.n_channels
        ).to_value(U.km / U.s)
        if dy_cube > 5:
            warn("DataCube RA range > 5deg, preview is very approximate!")
        if dz_cube > 5:
            warn("DataCube Dec range > 5deg, preview is very approximate!")
        if np.abs(self._datacube.dec.to_value(U.deg)) + 0.5 * dz_cube > 85:
            warn("DataCube extent within 5deg of pole, preview is very approximate!")
        sp1, sp2, sp3, cb = figure.get_axes()
        sp1.add_patch(
            plt.Rectangle(
                (
                    (y_cube - dy_cube),
                    (z_cube - dz_cube),
                ),
                2 * dy_cube,
                2 * dz_cube,
                linewidth=5,
                edgecolor="red",
                facecolor="None",
            )
        )
        sp2.add_patch(
            plt.Rectangle(
                (
                    (y_cube - dy_cube),
                    (v_cube - dv_cube),
                ),
                2 * dy_cube,
                2 * dv_cube,
                linewidth=5,
                edgecolor="red",
                facecolor="None",
            )
        )
        sp3.add_patch(
            plt.Rectangle(
                (
                    (z_cube - dz_cube),
                    (v_cube - dv_cube),
                ),
                2 * dz_cube,
                2 * dv_cube,
                linewidth=5,
                edgecolor="red",
                facecolor="None",
            )
        )
        if lim == "datacube":
            sp1.set_xlim((y_cube + dy_cube), (y_cube - dy_cube))
            sp1.set_ylim((z_cube - dz_cube), (z_cube + dz_cube))
            sp2.set_xlim((y_cube + dy_cube), (y_cube - dy_cube))
            sp3.set_xlim((z_cube - dz_cube), (z_cube + dz_cube))
        elif lim is None:
            sp1.set_xlim(
                (
                    max((y_cube + 1.05 * dy_cube), sp1.get_xlim()[0]),
                    min((y_cube - 1.05 * dy_cube), sp1.get_xlim()[1]),
                )
            )
            sp2.set_xlim(
                (
                    max((y_cube + 1.05 * dy_cube), sp2.get_xlim()[0]),
                    min((y_cube - 1.05 * dy_cube), sp2.get_xlim()[1]),
                )
            )
            sp1.set_ylim(
                (
                    min((z_cube - 1.05 * dz_cube), sp1.get_ylim()[0]),
                    max((z_cube + 1.05 * dz_cube), sp1.get_ylim()[1]),
                )
            )
            sp3.set_xlim(
                (
                    min((z_cube - 1.05 * dz_cube), sp3.get_xlim()[0]),
                    max((z_cube + 1.05 * dz_cube), sp3.get_xlim()[1]),
                )
            )
        if vlim == "datacube":
            sp2.set_ylim(
                (v_cube - dv_cube),
                (v_cube + dv_cube),
            )
            sp3.set_ylim(
                (v_cube - dv_cube),
                (v_cube + dv_cube),
            )
        elif vlim is None:
            sp2.set_ylim(
                (
                    min((v_cube - 1.05 * dv_cube), sp2.get_ylim()[0]),
                    max((v_cube + 1.05 * dv_cube), sp2.get_ylim()[1]),
                )
            )
            sp3.set_ylim(
                (
                    min((v_cube - 1.05 * dv_cube), sp3.get_ylim()[0]),
                    max((v_cube + 1.05 * dv_cube), sp3.get_ylim()[1]),
                )
            )

        if save is not None:
            plt.savefig(save)
        return figure


[docs] class Martini(_BaseMartini): """ Creates synthetic HI data cubes from simulation data. Usual use of martini involves first creating instances of classes from each of the required and optional sub-modules, then creating a :class:`~martini.martini.Martini` with these instances as arguments. The object can then be used to create synthetic observations, usually by calling :meth:`~martini.martini.Martini.insert_source_in_cube`, (optionally) :meth:`~martini.martini.Martini.add_noise`, (optionally) :meth:`~martini.martini.Martini.convolve_beam` and :meth:`~martini.martini.Martini.write_fits` in order. Parameters ---------- source : SPHSource An instance of a class derived from :class:`~martini.sources.sph_source.SPHSource`. A description of the HI emitting object, including position, geometry and an interface to the simulation data (SPH particle masses, positions, etc.). See :doc:`sub-module documentation </sources/index>`. datacube : ~martini.datacube.DataCube A :class:`~martini.datacube.DataCube` instance. A description of the datacube to create, including pixels, channels, sky position. See :doc:`sub-module documentation </datacube/index>`. beam : _BaseBeam, optional An instance of a class derived from `~martini.beams._BaseBeam`. A description of the beam for the simulated telescope. Given a description, either mathematical or as an image, the creation of a custom beam is straightforward. See :doc:`sub-module documentation </beams/index>`. noise : _BaseNoise, optional An instance of a class derived from :class:`~martini.noise._BaseNoise`. A description of the simulated noise. A simple Gaussian noise model is provided; implementation of other noise models is straightforward. See :doc:`sub-module documentation </noise/index>`. sph_kernel : _BaseSPHKernel An instance of a class derived from :class:`~martini.sph_kernels._BaseSPHKernel`. A description of the SPH smoothing kernel. Check simulation documentation for the kernel used in a particular simulation, and :doc:`SPH kernel sub-module documentation </sph_kernels/index>` for guidance. spectral_model : _BaseSpectrum An instance of a class derived from :class:`~martini.spectral_models._BaseSpectrum`. A description of the HI line produced by a particle of given properties. A Dirac-delta spectrum, and both fixed-width and temperature-dependent Gaussian line models are provided; implementing other models is straightforward. See :doc:`sub-module documentation </spectral_models/index>`. quiet : bool, optional If ``True``, suppress output to stdout. See Also -------- martini.sources.sph_source.SPHSource martini.datacube.DataCube martini.beams martini.noise martini.sph_kernels martini.spectral_models martini.martini.GlobalProfile Examples -------- More detailed examples can be found in the examples directory in the github distribution of the package. The following example illustrates basic use of MARTINI, using a (very!) crude model of a gas disk. This example can be run by doing 'from martini import demo; demo()':: # ------make a toy galaxy---------- N = 500 phi = np.random.rand(N) * 2 * np.pi r = [] for L in np.random.rand(N): def f(r): return L - 0.5 * (2 - np.exp(-r) * (np.power(r, 2) + 2 * r + 2)) r.append(fsolve(f, 1.0)[0]) r = np.array(r) # exponential disk r *= 3 / np.sort(r)[N // 2] z = -np.log(np.random.rand(N)) # exponential scale height z *= 0.5 / np.sort(z)[N // 2] * np.sign(np.random.rand(N) - 0.5) x = r * np.cos(phi) y = r * np.sin(phi) xyz_g = np.vstack((x, y, z)) * U.kpc # linear rotation curve vphi = 100 * r / 6.0 vx = -vphi * np.sin(phi) vy = vphi * np.cos(phi) # small pure random z velocities vz = (np.random.rand(N) * 2.0 - 1.0) * 5 vxyz_g = np.vstack((vx, vy, vz)) * U.km * U.s**-1 T_g = np.ones(N) * 8e3 * U.K mHI_g = np.ones(N) / N * 5.0e9 * U.Msun # ~mean interparticle spacing smoothing hsm_g = np.ones(N) * 4 / np.sqrt(N) * U.kpc # --------------------------------- source = SPHSource( distance=3.0 * U.Mpc, rotation={"L_coords": (60.0 * U.deg, 0.0 * U.deg)}, ra=0.0 * U.deg, dec=0.0 * U.deg, h=0.7, T_g=T_g, mHI_g=mHI_g, xyz_g=xyz_g, vxyz_g=vxyz_g, hsm_g=hsm_g, ) datacube = DataCube( n_px_x=128, n_px_y=128, n_channels=32, px_size=10.0 * U.arcsec, channel_width=10.0 * U.km * U.s**-1, spectral_centre=source.vsys, ) beam = GaussianBeam( bmaj=30.0 * U.arcsec, bmin=30.0 * U.arcsec, bpa=0.0 * U.deg, truncate=4.0 ) noise = GaussianNoise(rms=3.0e-5 * U.Jy * U.beam**-1) spectral_model = GaussianSpectrum(sigma=7 * U.km * U.s**-1) sph_kernel = CubicSplineKernel() M = Martini( source=source, datacube=datacube, beam=beam, noise=noise, spectral_model=spectral_model, sph_kernel=sph_kernel, ) M.insert_source_in_cube() M.add_noise() M.convolve_beam() M.write_beam_fits(beamfile) M.write_fits(cubefile) print(f"Wrote demo fits output to {cubefile}, and beam image to {beamfile}.") try: M.write_hdf5(hdf5file) except ModuleNotFoundError: print("h5py package not present, skipping hdf5 output demo.") else: print(f"Wrote demo hdf5 output to {hdf5file}.") """ def __init__( self, *, source: SPHSource, datacube: DataCube, beam: _BaseBeam | None = None, noise: _BaseNoise | None = None, sph_kernel: _BaseSPHKernel, spectral_model: _BaseSpectrum, quiet: bool = False, ) -> None: super().__init__( source=source, datacube=datacube, beam=beam, noise=noise, sph_kernel=sph_kernel, spectral_model=spectral_model, quiet=quiet, ) return @property def datacube(self) -> DataCube: """ The :class:`~martini.datacube.DataCube` object for this mock observation. Returns ------- ~martini.datacube.DataCube The :class:`~martini.datacube.DataCube` contained by this :class:`~martini.martini.Martini` instance. """ return self._datacube
[docs] def insert_source_in_cube( self, skip_validation: bool = False, progressbar: bool | None = None, ncpu: int = 1, ) -> None: """ Populate the DataCube with flux from the particles in the source. Parameters ---------- skip_validation : bool, optional SPH kernel interpolation onto the :class:`~martini.datacube.DataCube` is approximated for increased speed. For some combinations of pixel size, distance and SPH smoothing length, the approximation may break down. The kernel class will check whether this will occur and raise a ``RuntimeError`` if so. This validation can be skipped (at the cost of accuracy!) by setting this parameter ``True``. progressbar : bool, optional A progress bar is shown by default. Progress bars work, with perhaps some visual glitches, in parallel. If :class:`~martini.martini.Martini` was initialised with ``quiet`` set to ``True``, progress bars are switched off unless explicitly turned on. ncpu : int Number of processes to use in main source insertion loop. Using more than one cpu requires the :mod:`multiprocess` module (n.b. not the same as ``multiprocessing``). """ super()._insert_source_in_cube( skip_validation=skip_validation, progressbar=progressbar, ncpu=ncpu ) return
[docs] def convolve_beam(self) -> None: """Convolve the beam and data cube.""" if self.beam is None: warn("Skipping beam convolution, no beam object provided to Martini.") return minimum_padding = self.beam.needs_pad() if (self._datacube.padx < minimum_padding[0]) or ( self._datacube.pady < minimum_padding[1] ): raise ValueError( "datacube padding insufficient for beam convolution (perhaps you loaded a" " datacube state with datacube.load_state that was previously initialized" " by martini with a smaller beam?)" ) unit = self._datacube._array.unit assert self.beam.kernel is not None # placate mypy for spatial_slice in self._datacube.spatial_slices: # use a view [...] to force in-place modification spatial_slice[...] = ( fftconvolve(spatial_slice, self.beam.kernel, mode="same") * unit ) self._datacube.drop_pad() self._datacube._array = self._datacube._array.to( U.Jy * U.beam**-1, equivalencies=U.beam_angular_area(self.beam.area), ) if not self.quiet: print( "Beam convolved.", " Data cube RMS after beam convolution:" f" {np.std(self._datacube._array):.2e}", f" Maximum pixel: {self._datacube._array.max():.2e}", " Median non-zero pixel:" f" {np.median(self._datacube._array[self._datacube._array > 0]):.2e}", sep="\n", ) return
[docs] def add_noise(self) -> None: """Insert noise into the data cube.""" if self.noise is None: warn("Skipping noise, no noise object provided to Martini.") return if self.beam is None: warn( "Skipping noise, no beam object (required to estimate post-convolution" " noise level) provided to Martini." ) return # this unit conversion means noise can be added before or after source insertion: noise_cube = ( self.noise.generate(self._datacube, self.beam) .to( U.Jy * U.arcsec**-2, equivalencies=U.beam_angular_area(self.beam.area), ) .to( self._datacube._array.unit, equivalencies=[self._datacube.arcsec2_to_pix], ) ) self._datacube._array = self._datacube._array + noise_cube if not self.quiet: print( "Noise added.", f" Noise cube RMS: {np.std(noise_cube):.2e} (before beam convolution).", " Data cube RMS after noise addition (before beam convolution): " f"{np.std(self._datacube._array):.2e}", sep="\n", ) return
[docs] def write_fits( self, filename: str, overwrite: bool = True, obj_name: str = "MOCK", channels: None = None, # deprecated dtype: str | np.dtype | None = None, ) -> None: """ Output the data cube to a FITS-format file. Parameters ---------- filename : str Name of the file to write. ``'.fits'`` will be appended if not already present in the filename. ``'.fits.gz'`` etc. will be recognized. overwrite : bool, optional Whether to allow overwriting existing files. obj_name : str Name to write in the ``OBJECT`` FITS header field (max 16 characters). channels : str, deprecated Deprecated, channels and their units now fixed at :class:`~martini.datacube.DataCube` initialization. dtype : str or dtype Typecode or data-type to which the array is cast. Should be supported by fits. Default to not do data type conversion. """ if channels is not None: # pragma: no cover warnings.warn( DeprecationWarning( "`channels` argument to `write_fits` ignored, channels and their" " units now fixed at DataCube initialization." ) ) self._datacube.drop_pad() filename = str(filename) filename = filename + ".fits" if "fits" not in filename.split(".") else filename wcs_header = self._datacube.wcs.to_header() wcs_header.rename_keyword("WCSAXES", "NAXIS") header = fits.Header() header.append(("SIMPLE", "T")) header.append(("BITPIX", 16)) header.append(("NAXIS", wcs_header["NAXIS"])) header.append(("NAXIS1", self._datacube.n_px_x)) header.append(("NAXIS2", self._datacube.n_px_y)) header.append(("NAXIS3", self._datacube.n_channels)) if self._datacube.stokes_axis: header.append(("NAXIS4", 1)) header.append(("EXTEND", "T")) header.append(("CDELT1", wcs_header["CDELT1"])) header.append(("CRPIX1", wcs_header["CRPIX1"])) header.append(("CRVAL1", wcs_header["CRVAL1"])) header.append(("CTYPE1", wcs_header["CTYPE1"])) header.append(("CUNIT1", wcs_header["CUNIT1"])) header.append(("CDELT2", wcs_header["CDELT2"])) header.append(("CRPIX2", wcs_header["CRPIX2"])) header.append(("CRVAL2", wcs_header["CRVAL2"])) header.append(("CTYPE2", wcs_header["CTYPE2"])) header.append(("CUNIT2", wcs_header["CUNIT2"])) header.append(("CDELT3", wcs_header["CDELT3"])) header.append(("CRPIX3", wcs_header["CRPIX3"])) header.append(("CRVAL3", wcs_header["CRVAL3"])) header.append(("CTYPE3", wcs_header["CTYPE3"])) header.append(("CUNIT3", wcs_header["CUNIT3"])) if self._datacube.stokes_axis: header.append(("CDELT4", wcs_header["CDELT4"])) header.append(("CRPIX4", wcs_header["CRPIX4"])) header.append(("CRVAL4", wcs_header["CRVAL4"])) header.append(("CTYPE4", wcs_header["CTYPE4"])) header.append(("CUNIT4", "PAR")) header.append(("EPOCH", 2000)) header.append(("INSTRUME", "MARTINI", martini_version)) header.append(("BSCALE", 1.0)) header.append(("BZERO", 0.0)) datacube_array_units = self._datacube._array.unit header.append( ("DATAMAX", np.max(self._datacube._array.to_value(datacube_array_units))) ) header.append( ("DATAMIN", np.min(self._datacube._array.to_value(datacube_array_units))) ) header.append(("ORIGIN", "astropy v" + astropy_version)) # long names break fits format, don't let the user set this if len(obj_name) > 16: warnings.warn( "obj_name longer than 16 characters, truncating", RuntimeWarning ) obj_name = obj_name[:16] header.append(("OBJECT", obj_name)) if self.beam is not None: header.append(("BPA", self.beam.bpa.to_value(U.deg))) header.append(("OBSERVER", "K. Oman")) header.append(("BUNIT", datacube_array_units.to_string("fits"))) header.append(("DATE-OBS", Time.now().to_value("fits"))) header.append(("MJD-OBS", Time.now().to_value("mjd"))) if self.beam is not None: header.append(("BMAJ", self.beam.bmaj.to_value(U.deg))) header.append(("BMIN", self.beam.bmin.to_value(U.deg))) header.append(("BTYPE", "Intensity")) header.append(("SPECSYS", wcs_header["SPECSYS"])) header.append(("RESTFRQ", wcs_header["RESTFRQ"])) # flip axes to write data = self._datacube._array.to_value(datacube_array_units).T if dtype is not None: data = data.astype(dtype, copy=False) hdu = fits.PrimaryHDU(header=header, data=data) hdu.writeto(filename, overwrite=overwrite) return
[docs] def write_beam_fits( self, filename: str, overwrite: bool = True, channels: None = None, # deprecated ) -> None: """ Output the beam to a FITS-format file. The beam is written to file, with pixel sizes, coordinate system, etc. similar to those used for the :class:`~martini.datacube.DataCube`. Parameters ---------- filename : str Name of the file to write. ``".fits"`` will be appended if not already present. overwrite : bool, optional Whether to allow overwriting existing files. channels : str, deprecated Deprecated, channels and their units now fixed at :class:`~martini.datacube.DataCube` initialization. Raises ------ ValueError If :class:`~martini.martini.Martini` was initialized without a ``beam``. """ if channels is not None: # pragma: no cover warnings.warn( DeprecationWarning( "`channels` argument to `write_fits` ignored, channels and their" " units now fixed at DataCube initialization." ) ) if self.beam is None: raise ValueError("Martini.write_beam_fits: Called with beam set to 'None'.") assert self.beam.kernel is not None filename = filename if filename[-5:] == ".fits" else filename + ".fits" wcs_header = self._datacube.wcs.to_header() beam_kernel_units = self.beam.kernel.unit header = fits.Header() header.append(("SIMPLE", "T")) header.append(("BITPIX", 16)) header.append(("NAXIS", 3)) header.append(("NAXIS1", self.beam.kernel.shape[0])) header.append(("NAXIS2", self.beam.kernel.shape[1])) header.append(("NAXIS3", 1)) header.append(("EXTEND", "T")) header.append(("BSCALE", 1.0)) header.append(("BZERO", 0.0)) # this is 1/arcsec^2, is this right? header.append(("BUNIT", beam_kernel_units.to_string("fits"))) header.append(("CRPIX1", self.beam.kernel.shape[0] // 2 + 1)) header.append(("CDELT1", wcs_header["CDELT1"])) header.append(("CRVAL1", wcs_header["CRVAL1"])) header.append(("CTYPE1", wcs_header["CTYPE1"])) header.append(("CUNIT1", wcs_header["CUNIT1"])) header.append(("CRPIX2", self.beam.kernel.shape[1] // 2 + 1)) header.append(("CDELT2", wcs_header["CDELT2"])) header.append(("CRVAL2", wcs_header["CRVAL2"])) header.append(("CTYPE2", wcs_header["CTYPE2"])) header.append(("CUNIT2", wcs_header["CUNIT2"])) header.append(("CRPIX3", 1)) header.append(("CDELT3", wcs_header["CDELT3"])) header.append(("CRVAL3", wcs_header["CRVAL3"])) header.append(("CTYPE3", wcs_header["CTYPE3"])) header.append(("CUNIT3", wcs_header["CUNIT3"])) header.append(("SPECSYS", wcs_header["SPECSYS"])) header.append(("BMAJ", self.beam.bmaj.to_value(U.deg))) header.append(("BMIN", self.beam.bmin.to_value(U.deg))) header.append(("BPA", self.beam.bpa.to_value(U.deg))) header.append(("BTYPE", "beam ")) header.append(("EPOCH", 2000)) header.append(("OBSERVER", "K. Oman")) # long names break fits format header.append(("OBJECT", "MOCKBEAM")) header.append(("INSTRUME", "MARTINI", martini_version)) header.append(("DATAMAX", np.max(self.beam.kernel.to_value(beam_kernel_units)))) header.append(("DATAMIN", np.min(self.beam.kernel.to_value(beam_kernel_units)))) header.append(("ORIGIN", "astropy v" + astropy_version)) # flip axes to write hdu = fits.PrimaryHDU( header=header, data=self.beam.kernel.to_value(beam_kernel_units)[..., np.newaxis].T, ) hdu.writeto(filename, overwrite=True) return
[docs] def write_hdf5( self, filename: str, overwrite: bool = True, memmap: bool = False, compact: bool = False, channels: None = None, # deprecated ) -> "h5py.File | None": """ Output the data cube and beam to a HDF5-format file. Requires the :mod:`h5py` package. Parameters ---------- filename : str Name of the file to write. ``'.hdf5'`` will be appended if not already present. overwrite : bool, optional Whether to allow overwriting existing files. memmap : bool, optional If ``True``, create a file-like object in memory and return it instead of writing file to disk. compact : bool, optional If ``True``, omit pixel coordinate arrays to save disk space. In this case pixel coordinates can still be reconstructed from FITS-style keywords stored in the FluxCube attributes. channels : str, deprecated Deprecated, channels and their units now fixed at :class:`~martini.datacube.DataCube` initialization. """ if channels is not None: # pragma: no cover warnings.warn( DeprecationWarning( "`channels` argument to `write_fits` ignored, channels and their" " units now fixed at DataCube initialization." ) ) import h5py self._datacube.drop_pad() filename = filename if filename[-5:] == ".hdf5" else filename + ".hdf5" wcs_header = self._datacube.wcs.to_header() mode = "w" if overwrite else "x" driver = "core" if memmap else None h5_kwargs = {"backing_store": False} if memmap else {} f = h5py.File(filename, mode, driver=driver, **h5_kwargs) datacube_array_units = self._datacube._array.unit f["FluxCube"] = self._datacube._array.to_value(datacube_array_units).squeeze() c = f["FluxCube"] origin = 0 # index from 0 like numpy, not from 1 if not compact: # voxel centre coordinates: xgrid, ygrid, vgrid = np.meshgrid( np.arange(self._datacube._array.shape[0]), np.arange(self._datacube._array.shape[1]), np.arange(self._datacube._array.shape[2]), indexing="ij", ) cgrid = ( np.vstack( ( xgrid.flatten(), ygrid.flatten(), vgrid.flatten(), np.zeros(vgrid.shape).flatten(), ) ).T if self._datacube.stokes_axis else np.vstack( ( xgrid.flatten(), ygrid.flatten(), vgrid.flatten(), ) ).T ) wgrid = self._datacube.wcs.all_pix2world(cgrid, origin) grid_shape = ( self.datacube.n_px_x, self.datacube.n_px_y, self.datacube.n_channels, ) ragrid = wgrid[:, 0].reshape(grid_shape) decgrid = wgrid[:, 1].reshape(grid_shape) chgrid = wgrid[:, 2].reshape(grid_shape) f["RA"] = ragrid f["RA"].attrs["Unit"] = wcs_header["CUNIT1"] f["Dec"] = decgrid f["Dec"].attrs["Unit"] = wcs_header["CUNIT2"] f["channel_mids"] = chgrid f["channel_mids"].attrs["Unit"] = wcs_header["CUNIT3"] # voxel vertex coordinates (for e.g. pyplot.pcolormesh): xgrid_vertices, ygrid_vertices, vgrid_vertices = np.meshgrid( np.arange(self._datacube._array.shape[0] + 1) - 0.5, np.arange(self._datacube._array.shape[1] + 1) - 0.5, np.arange(self._datacube._array.shape[2] + 1) - 0.5, indexing="ij", ) cgrid_vertices = ( np.vstack( ( xgrid_vertices.flatten(), ygrid_vertices.flatten(), vgrid_vertices.flatten(), np.zeros(vgrid_vertices.shape).flatten(), ) ).T if self._datacube.stokes_axis else np.vstack( ( xgrid_vertices.flatten(), ygrid_vertices.flatten(), vgrid_vertices.flatten(), ) ).T ) wgrid_vertices = self._datacube.wcs.all_pix2world(cgrid_vertices, origin) vertices_grid_shape = ( self.datacube.n_px_x + 1, self.datacube.n_px_y + 1, self.datacube.n_channels + 1, ) ragrid_vertices = wgrid_vertices[:, 0].reshape(vertices_grid_shape) decgrid_vertices = wgrid_vertices[:, 1].reshape(vertices_grid_shape) chgrid_vertices = wgrid_vertices[:, 2].reshape(vertices_grid_shape) f["RA_vertices"] = ragrid_vertices f["RA_vertices"].attrs["Unit"] = wcs_header["CUNIT1"] f["Dec_vertices"] = decgrid_vertices f["Dec_vertices"].attrs["Unit"] = wcs_header["CUNIT2"] f["channel_vertices"] = chgrid_vertices f["channel_vertices"].attrs["Unit"] = wcs_header["CUNIT3"] # channels: for dataset_name in ( "velocity_channel_mids", "velocity_channel_edges", "frequency_channel_mids", "frequency_channel_edges", ): f[dataset_name] = getattr(self._datacube, dataset_name) f[dataset_name].attrs["Unit"] = str( getattr(self._datacube, dataset_name).unit ) c.attrs["AxisOrder"] = "(RA,Dec,Channels)" c.attrs["FluxCubeUnit"] = str(self._datacube._array.unit) c.attrs["deltaRA_in_RAUnit"] = wcs_header["CDELT1"] c.attrs["RA0_in_px"] = wcs_header["CRPIX1"] - 1 c.attrs["RA0_in_RAUnit"] = wcs_header["CRVAL1"] c.attrs["RAUnit"] = wcs_header["CUNIT1"] c.attrs["RAProjType"] = wcs_header["CTYPE1"] c.attrs["deltaDec_in_DecUnit"] = wcs_header["CDELT2"] c.attrs["Dec0_in_px"] = wcs_header["CRPIX2"] - 1 c.attrs["Dec0_in_DecUnit"] = wcs_header["CRVAL2"] c.attrs["DecUnit"] = wcs_header["CUNIT2"] c.attrs["DecProjType"] = wcs_header["CTYPE2"] c.attrs["deltaV_in_VUnit"] = wcs_header["CDELT3"] c.attrs["V0_in_px"] = wcs_header["CRPIX3"] - 1 c.attrs["V0_in_VUnit"] = wcs_header["CRVAL3"] c.attrs["VUnit"] = wcs_header["CUNIT3"] c.attrs["VProjType"] = wcs_header["CTYPE3"] c.attrs["SpecSys"] = wcs_header["SPECSYS"] if self.beam is not None: c.attrs["BeamPA"] = self.beam.bpa.to_value(U.deg) c.attrs["BeamMajor_in_deg"] = self.beam.bmaj.to_value(U.deg) c.attrs["BeamMinor_in_deg"] = self.beam.bmin.to_value(U.deg) c.attrs["DateCreated"] = str(Time.now()) c.attrs["MartiniVersion"] = martini_version c.attrs["AstropyVersion"] = astropy_version if self.beam is not None: if self.beam.kernel is None: raise ValueError( "Martini.write_hdf5: Called with beam present but beam kernel" " uninitialized." ) beam_kernel_units = self.beam.kernel.unit f["Beam"] = self.beam.kernel.to_value(beam_kernel_units)[..., np.newaxis] b = f["Beam"] b.attrs["BeamUnit"] = self.beam.kernel.unit.to_string("fits") b.attrs["deltaRA_in_RAUnit"] = wcs_header["CDELT1"] b.attrs["RA0_in_px"] = self.beam.kernel.shape[0] // 2 b.attrs["RA0_in_RAUnit"] = wcs_header["CRVAL1"] b.attrs["RAUnit"] = wcs_header["CUNIT1"] b.attrs["RAProjType"] = wcs_header["CTYPE1"] b.attrs["deltaDec_in_DecUnit"] = wcs_header["CDELT2"] b.attrs["Dec0_in_px"] = self.beam.kernel.shape[1] // 2 b.attrs["Dec0_in_DecUnit"] = wcs_header["CRVAL2"] b.attrs["DecUnit"] = wcs_header["CUNIT2"] b.attrs["DecProjType"] = wcs_header["CTYPE2"] b.attrs["deltaV_in_VUnit"] = wcs_header["CDELT3"] b.attrs["V0_in_px"] = 0 b.attrs["V0_in_VUnit"] = wcs_header["CRVAL3"] b.attrs["VUnit"] = wcs_header["CUNIT3"] b.attrs["VProjType"] = wcs_header["CTYPE3"] b.attrs["BeamPA"] = self.beam.bpa.to_value(U.deg) b.attrs["BeamMajor_in_deg"] = self.beam.bmaj.to_value(U.deg) b.attrs["BeamMinor_in_deg"] = self.beam.bmin.to_value(U.deg) b.attrs["DateCreated"] = str(Time.now()) b.attrs["MartiniVersion"] = martini_version b.attrs["AstropyVersion"] = astropy_version if memmap: return f else: f.close() return None
[docs] class GlobalProfile(_BaseMartini): """ Just produce a global spectrum of an HI source. A simplified version of the main :class:`~martini.martini.Martini` class for those times when you just want a spectrum. Sometimes only the spatially integrated spectrum of a source is wanted, which makes many parts of the standard MARTINI process unnecessary. This class streamlines the process of producing a spatially-integrated spectrum, or "global profile", with some specific assumptions: - The :class:`~martini.martini.GlobalProfile` class does not assume any spatial aperture, instead every particle in the source contributes to the spectrum (unless it falls outside of the spectral bandwidth). - The positions of particles are still used to calculate the line-of-sight vector and the velocity along this direction. There is therefore no need or way to specify a beam or SPH kernel as with the main :class:`~martini.martini.Martini` class. It is also not possible to use MARTINI's noise modules with this class. If these restrictions are found to be too limiting (for example, if the spectrum within a spatial mask defined by a signal-to-noise or other cut is desired), the best course of action is to produce a spatially-resolved mock observation and derive the spectrum from those data as would be done with "real" observations. This class is mainly intended to efficiently provide a "quick look" at the spectrum, or a reference "ideal" spectrum. Parameters ---------- source : ~martini.sources.sph_source.SPHSource An instance of a class derived from :class:`~martini.sources.sph_source.SPHSource`. A description of the HI emitting object, including position, geometry and an interface to the simulation data (SPH particle masses, positions, etc.). See :doc:`sub-module documentation </sources/index>`. spectral_model : ~martini.spectral_models._BaseSpectrum An instance of a class derived from :class:`~martini.spectral_models._BaseSpectrum`. A description of the HI line produced by a particle of given properties. A Dirac-delta spectrum, and both fixed-width and temperature-dependent Gaussian line models are provided; implementing other models is straightforward. See :doc:`sub-module documentation </spectral_models/index>`. n_channels : int Number of channels along the spectral axis. channel_width : ~astropy.units.Quantity :class:`~astropy.units.Quantity`, with dimensions of velocity or frequency. Step size along the spectral axis. Can be provided as a velocity or a frequency. spectral_centre : ~astropy.units.Quantity, optional :class:`~astropy.units.Quantity` with dimensions of velocity or frequency. Velocity (or frequency) of the centre along the spectral axis. Defaults to 0 km/s, but should normally be set to the systemic velocity of the source. quiet : bool, optional If ``True``, suppress output to stdout. channels : str, deprecated Deprecated, channels and their units now fixed at :class:`~martini.datacube.DataCube` initialization. See Also -------- martini.sources.sph_source.SPHSource martini.spectral_models martini.martini.Martini Examples -------- :: # ------make a toy galaxy---------- N = 500 phi = np.random.rand(N) * 2 * np.pi r = [] for L in np.random.rand(N): def f(r): return L - 0.5 * (2 - np.exp(-r) * (np.power(r, 2) + 2 * r + 2)) r.append(fsolve(f, 1.0)[0]) r = np.array(r) # exponential disk r *= 3 / np.sort(r)[N // 2] z = -np.log(np.random.rand(N)) # exponential scale height z *= 0.5 / np.sort(z)[N // 2] * np.sign(np.random.rand(N) - 0.5) x = r * np.cos(phi) y = r * np.sin(phi) xyz_g = np.vstack((x, y, z)) * U.kpc # linear rotation curve vphi = 100 * r / 6.0 vx = -vphi * np.sin(phi) vy = vphi * np.cos(phi) # small pure random z velocities vz = (np.random.rand(N) * 2.0 - 1.0) * 5 vxyz_g = np.vstack((vx, vy, vz)) * U.km * U.s**-1 T_g = np.ones(N) * 8e3 * U.K mHI_g = np.ones(N) / N * 5.0e9 * U.Msun # ~mean interparticle spacing smoothing hsm_g = np.ones(N) * 4 / np.sqrt(N) * U.kpc # --------------------------------- source = SPHSource( distance=3.0 * U.Mpc, rotation={"L_coords": (60.0 * U.deg, 0.0 * U.deg)}, ra=0.0 * U.deg, dec=0.0 * U.deg, h=0.7, T_g=T_g, mHI_g=mHI_g, xyz_g=xyz_g, vxyz_g=vxyz_g, hsm_g=hsm_g, ) spectral_model = GaussianSpectrum(sigma=7 * U.km * U.s**-1) M = GlobalProfile( source=source, spectral_model=spectral_model, n_channels=32, channel_width=10 * U.km * U.s**-1, spectral_centre=source.vsys, ) # spectrum and channel edges and centres can be accessed as: M.spectrum # spectrum is evaluated at first access if not already done M.channel_edges M.channel_mids # the spectrum can be explicitly evaluated with: M.insert_source_in_spectrum() """ def __init__( self, *, source: SPHSource, spectral_model: _BaseSpectrum, n_channels: int, channel_width: U.Quantity[U.km / U.s] | U.Quantity[U.Hz], spectral_centre: U.Quantity[U.km / U.s] | U.Quantity[U.Hz] = 0 * U.km * U.s**-1, quiet: bool = False, channels: None = None, # deprecated ) -> None: if channels is not None: # pragma: no cover warnings.warn( DeprecationWarning( "The `channels` argument to `GlobalProfile.__init__` is deprecated" " and has been ignored. If `channel_width` has velocity units" " channels are evenly spaced in velocity, and if it has frequency" " units they are evenly spaced in frequency." ) ) super().__init__( source=source, datacube=_GlobalProfileDataCube( n_channels=n_channels, channel_width=channel_width, spectral_centre=spectral_centre, ), beam=None, noise=None, sph_kernel=DiracDeltaKernel(size_in_fwhm=np.inf), spectral_model=spectral_model, _prune_kwargs={ "spatial": False, "spectral": True, "mass": True, "obj_type_str": "spectrum", }, quiet=quiet, ) self.source.pixcoords[:2] = 0 return
[docs] def insert_source_in_spectrum(self) -> None: """ Populate the :class:`~martini.datacube.DataCube` with flux from source particles. For the :class:`~martini.martini.GlobalProfile` class we assume that every particle in the source contributes (if it falls in the spectral bandwidth) regardless of position on the sky. The line-of-sight vector still depends on the particle positions, so the direction to the individual particles is still taken into account. """ # skip_validation=True: all particles can contribute their kernel to the pixel; # ncpu=1 since we have 1 pixel and source insertion is parallel over pixels; # no progressbar since there's only 1 pixel of progress; # quiet=True because messages assume a resolved source, replace with new ones super()._insert_source_in_cube( skip_validation=True, progressbar=False, ncpu=1, quiet=True ) # The datacube in Jy/arcsec^2 is a bit misleading because the source is # (presumably) completely unresolved so extrapolating its surface brightness # across the entire pixel is incorrect. Correctly integrate out spatial # information and convert to Jy: self._spectrum = ( (self._datacube._array.squeeze()).to( U.Jy / U.pix**2, equivalencies=[self._datacube.arcsec2_to_pix] ) * U.pix**2 ).to(U.Jy) if not self.quiet: # Need a slightly different calculation for a completely unresolved source. inserted_flux_density = self.spectrum.sum() inserted_mass = ( 2.36e5 * U.Msun * self.source.distance.to_value(U.Mpc) ** 2 * inserted_flux_density.to_value(U.Jy) * self._datacube.channel_width.to_value(U.km / U.s) ) print( "Source inserted.", f" Flux density in spectrum: {inserted_flux_density:.2e}", f" Mass in spectrum (assuming distance {self.source.distance:.2f}):" f" {inserted_mass:.2e}", f" [{inserted_mass / self.source.input_mass * 100:.0f}%" f" of initial source mass]", sep="\n", )
@property def spectrum(self) -> U.Quantity[U.Jy]: """ The spectrum of the source with spatial information integrated out. The spectrum is calculated when this property is accessed if this hasn't already been done. Returns ------- ~astropy.units.Quantity :class:`~astropy.units.Quantity` with dimensions of flux density. Spatially-integrated spectrum of the source. """ if not hasattr(self, "_spectrum"): self.insert_source_in_spectrum() return self._spectrum @property def channel_edges(self) -> U.Quantity[U.Hz] | U.Quantity[U.km / U.s]: """ The edges of the channels for the spectrum. Returns ------- ~astropy.units.Quantity :class:`~astropy.units.Quantity` with dimensions of frequency or velocity. Edges of the channels with units depending on :class:`~martini.martini.GlobalProfile`'s native channel spacing. See Also -------- martini.martini.GlobalProfile.channel_mids martini.martini.GlobalProfile.velocity_channel_mids martini.martini.GlobalProfile.velocity_channel_edges martini.martini.GlobalProfile.frequency_channel_mids martini.martini.GlobalProfile.frequency_channel_edges """ return self._datacube.channel_edges @property def channel_mids(self) -> U.Quantity[U.Hz] | U.Quantity[U.km / U.s]: """ The centres of the channels for the spectrum. Returns ------- ~astropy.units.Quantity :class:`~astropy.units.Quantity` with dimensions of frequency or velocity. Edges of the channels with units depending on :class:`~martini.martini.GlobalProfile`'s native channel spacing. See Also -------- martini.martini.GlobalProfile.channel_edges martini.martini.GlobalProfile.velocity_channel_mids martini.martini.GlobalProfile.velocity_channel_edges martini.martini.GlobalProfile.frequency_channel_mids martini.martini.GlobalProfile.frequency_channel_edges """ return self._datacube.channel_mids @property def frequency_channel_edges(self) -> U.Quantity[U.Hz]: """ The edges of the frequency channels for the spectrum. Returns ------- ~astropy.units.Quantity :class:`~astropy.units.Quantity` with dimensions of frequency. Edges of the channels with frequency units. See Also -------- martini.martini.GlobalProfile.velocity_channel_mids martini.martini.GlobalProfile.velocity_channel_edges martini.martini.GlobalProfile.frequency_channel_mids """ return self._datacube.channel_edges @property def frequency_channel_mids(self) -> U.Quantity[U.Hz]: """ The centres of the frequency channels for the spectrum. Returns ------- ~astropy.units.Quantity :class:`~astropy.units.Quantity` with dimensions of frequency. Edges of the channels with frequency units. See Also -------- martini.martini.GlobalProfile.velocity_channel_mids martini.martini.GlobalProfile.velocity_channel_edges martini.martini.GlobalProfile.frequency_channel_edges """ return self._datacube.channel_mids @property def velocity_channel_edges(self) -> U.Quantity[U.km / U.s]: """ The edges of the channels for the spectrum in velocity units. Returns ------- ~astropy.units.Quantity :class:`~astropy.units.Quantity` with dimensions of velocity. Edges of the channels with velocity units. See Also -------- martini.martini.GlobalProfile.velocity_channel_mids martini.martini.GlobalProfile.frequency_channel_mids martini.martini.GlobalProfile.frequency_channel_edges """ return self._datacube.channel_edges @property def velocity_channel_mids(self) -> U.Quantity[U.km / U.s]: """ The centres of the channels for the spectrum. Returns ------- ~astropy.units.Quantity :class:`~astropy.units.Quantity` with dimensions of velocity. Edges of the channels with velocity units. See Also -------- martini.martini.GlobalProfile.velocity_channel_edges martini.martini.GlobalProfile.frequency_channel_mids martini.martini.GlobalProfile.frequency_channel_edges """ return self._datacube.channel_mids @property def channel_width(self) -> U.Quantity[U.Hz] | U.Quantity[U.km / U.s]: """ The width of the channels for the spectrum. Returns ------- ~astropy.units.Quantity :class:`~astropy.units.Quantity` with dimentions of frequency or velocity. Width of the channels with units depending on :class:`~martini.martini.GlobalProfile`'s ``channels`` argument. """ return self._datacube.channel_width
[docs] def reset(self) -> None: """Re-initialize the spectrum with zero-values.""" super().reset() if hasattr(self, "_spectrum"): del self._spectrum return
[docs] def plot_spectrum( self, fig: int = 1, title: str = "", channels: str = "velocity", show_vsys: bool = True, save: str | None = None, ) -> "Figure": """ Produce a figure showing the spectrum. Parameters ---------- fig : int, optional Number of the figure in matplotlib, it will be created as ``plt.figure(fig)``. title : str, optional A title for the figure can be provided. channels : str, optional The type of spectral axis for the plot, either ``"velocity"`` or ``"frequency"``. show_vsys : bool, optional If ``True``, draw a vertical line at the source systemic velocity. save : str, optional If provided, the figure is saved using ``plt.savefig(save)``. A `.png` or `.pdf` suffix is recommended. Returns ------- matplotlib.figure.Figure The spectrum :class:`~matplotlib.figure.Figure`. """ import matplotlib.pyplot as plt if channels == "velocity": channel_mids = self.velocity_channel_mids elif channels == "frequency": channel_mids = self.frequency_channel_mids else: raise ValueError( "`plot_spectrum` argument `channels` must be 'velocity' or 'frequency'." ) figure = plt.figure(fig, figsize=(4, 3)) figure.clf() figure.suptitle(title) xunit = {"velocity": U.km * U.s**-1, "frequency": U.GHz}[channels] ax = figure.add_subplot(1, 1, 1) ax.plot( channel_mids.to_value(xunit), self.spectrum.to_value(U.Jy), ls="solid", color="black", lw=2, ) if show_vsys: ax.axvline( self.source.vsys.to_value(xunit), linestyle="dotted", lw=1.5, color="black", ) ax.set_ylabel(r"Flux density $[\mathrm{Jy}]$") if channels == "velocity": ax.set_xlabel(r"Velocity $[\mathrm{km}\,\mathrm{s}^{-1}]$") elif channels == "frequency": ax.set_xlabel(r"Frequency $[\mathrm{GHz}]$") if save is not None: plt.savefig(save) return figure