"""
Create mock observations of HI sources from cosmological hydrodynamical simulations.
Provides :class:`~martini.martini.Martini`, the main class of the package, and a
simplified :class:`~martini.martini.GlobalProfile` class for use when only a spectrum (no
spatial information) is desired.
"""
import warnings
import subprocess
import os
from scipy.signal import fftconvolve
import numpy as np
import astropy.units as U
from astropy.io import fits
from astropy.time import Time
from astropy.coordinates import Angle
from astropy import __version__ as astropy_version
from itertools import product
from .__version__ import __version__ as martini_version
from warnings import warn
from martini.beams import _BaseBeam
from martini.datacube import DataCube, _GlobalProfileDataCube
from martini.sources import SPHSource
from martini.sph_kernels import DiracDeltaKernel, _BaseSPHKernel
from martini.spectral_models import _BaseSpectrum
from martini.noise import _BaseNoise
from typing import TYPE_CHECKING
if TYPE_CHECKING:
import h5py
from matplotlib.figure import Figure
try:
with open(os.devnull, "w") as _devnull:
gc = subprocess.check_output(
["git", "describe", "--always"],
stderr=_devnull,
cwd=os.path.dirname(os.path.realpath(__file__)),
)
except (subprocess.CalledProcessError, FileNotFoundError): # pragma: no cover
gc = b""
else:
martini_version = martini_version + "_commit_" + gc.strip().decode()
class _BaseMartini:
"""
Common methods for the core Martini class and related classes.
Parameters
----------
source : SPHSource
An instance of a class derived from :class:`martini.sources.SPHSource`.
A description of the HI emitting object, including position, geometry
and an interface to the simulation data (SPH particle masses,
positions, etc.). See :doc:`sub-module documentation </sources/index>`.
datacube : ~martini.datacube.DataCube
A :class:`~martini.datacube.DataCube` instance.
A description of the datacube to create, including pixels, channels,
sky position. See :doc:`sub-module documentation </datacube/index>`.
beam : _BaseBeam, optional
An instance of a class derived from `~martini.beams._BaseBeam`.
A description of the beam for the simulated telescope. Given a
description, either mathematical or as an image, the creation of a
custom beam is straightforward. See
:doc:`sub-module documentation </beams/index>`.
noise : _BaseNoise, optional
An instance of a class derived from :class:`~martini.noise._BaseNoise`.
A description of the simulated noise. A simple Gaussian noise model is
provided; implementation of other noise models is straightforward. See
:doc:`sub-module documentation </noise/index>`.
sph_kernel : _BaseSPHKernel
An instance of a class derived from :class:`~martini.sph_kernels._BaseSPHKernel`.
A description of the SPH smoothing kernel. Check simulation
documentation for the kernel used in a particular simulation, and
:doc:`SPH kernel sub-module documentation </sph_kernels/index>` for guidance.
spectral_model : _BaseSpectrum
An instance of a class derived from
:class:`~martini.spectral_models._BaseSpectrum`.
A description of the HI line produced by a particle of given
properties. A Dirac-delta spectrum, and both fixed-width and
temperature-dependent Gaussian line models are provided; implementing
other models is straightforward. See
:doc:`sub-module documentation </spectral_models/index>`.
quiet : bool, optional
If ``True``, suppress output to stdout.
_prune_kwargs : dict
Arguments to pass through to the :meth:`martini.martini.Martini._prune_particles`
function, intended for internal use only.
See Also
--------
martini.sources.sph_source.SPHSource
martini.datacube.DataCube
martini.beams
martini.noise
martini.sph_kernels
martini.spectral_models
martini.martini.GlobalProfile
"""
source: SPHSource
_datacube: DataCube
beam: _BaseBeam | None
noise: _BaseNoise | None
sph_kernel: _BaseSPHKernel
spectral_model: _BaseSpectrum
quiet: bool
def __init__(
self,
*,
source: SPHSource,
datacube: DataCube,
beam: _BaseBeam | None = None,
noise: _BaseNoise | None = None,
sph_kernel: _BaseSPHKernel,
spectral_model: _BaseSpectrum,
quiet: bool = False,
_prune_kwargs: dict = {},
) -> None:
self.quiet = quiet
self.source = source
self._datacube = datacube
self.beam = beam
self.noise = noise
self.sph_kernel = sph_kernel
self.spectral_model = spectral_model
if self.beam is not None:
self.beam.init_kernel(self._datacube)
self._datacube.add_pad(self.beam.needs_pad())
self.source._init_skycoords()
self.source._init_pixcoords(self._datacube) # after datacube is padded
self.sph_kernel._init_sm_lengths(source=self.source, datacube=self._datacube)
self.sph_kernel._init_sm_ranges()
self._prune_particles(
**_prune_kwargs
) # prunes both source, and kernel if applicable
return
def init_spectra(self) -> None:
"""
Explicitly trigger evaluation of particle spectra.
Triggered when :meth:`~martini.martini.Martini.insert_source_in_cube` is called if
not called explicitly before this.
"""
if not self.quiet:
print("Initializing spectra...")
self.spectral_model.init_spectra(self.source, self._datacube)
if not self.quiet:
print("Spectra initialized.")
return
def _prune_particles(
self,
spatial: bool = True,
spectral: bool = True,
mass: bool = True,
obj_type_str: str = "data cube",
) -> None:
"""
Identify and remove particles that cannot contribute to the DataCube.
This helps speed up the later calculations. Assumes the kernel is 0 at
distances greater than the kernel size (which may differ from the
SPH smoothing length).
Parameters
----------
spatial : bool
If ``True``, prune particles that fall outside the spatial aperture.
spectral : bool
If ``True``, prune particles that fall outside the spectral bandwidth.
mass : bool
If ``True``, prune particles that have zero HI mass.
obj_type_str : str
String describing the object to be pruned for messages.
"""
if not self.quiet:
print(
f"Source module contained {self.source.npart} particles with total HI"
f" mass of {self.source.mHI_g.sum() << U.Msun:.2e}."
)
spectrum_half_width = self.spectral_model.half_width(self.source) / np.max(
np.abs(np.diff(self._datacube.velocity_channel_edges))
)
spatial_reject_conditions = (
(
(
self.source.pixcoords[:2] + self.sph_kernel.sm_ranges[np.newaxis]
< 0 * U.pix
).any(axis=0),
self.source.pixcoords[0] - self.sph_kernel.sm_ranges
> (self._datacube.n_px_x + self._datacube.padx * 2) * U.pix,
self.source.pixcoords[1] - self.sph_kernel.sm_ranges
> (self._datacube.n_px_y + self._datacube.pady * 2) * U.pix,
np.isnan(self.source.pixcoords[:2]).any(axis=0),
)
if spatial
else tuple()
)
spectral_reject_conditions = (
(
self.source.pixcoords[2] + 4 * spectrum_half_width * U.pix < 0 * U.pix,
self.source.pixcoords[2] - 4 * spectrum_half_width * U.pix
> self._datacube.n_channels * U.pix,
)
if spectral
else tuple()
)
mass_reject_conditions = (self.source.mHI_g == 0,) if mass else tuple()
accept_mask = np.logical_not(
np.logical_or.reduce(
spatial_reject_conditions
+ spectral_reject_conditions
+ mass_reject_conditions
)
)
self.source.apply_mask(accept_mask)
self.sph_kernel._apply_mask(accept_mask)
if not self.quiet:
print(
f"Pruned particles that will not contribute to {obj_type_str}, "
f"{self.source.npart} particles remaining with total HI mass of "
f"{self.source.mHI_g.sum() << U.Msun:.2e}."
)
return
def _evaluate_pixel_spectrum(
self, ij_px: tuple[int, int]
) -> U.Quantity[U.Jy / U.arcsec**2]:
"""
Add up contributions of particles to the spectrum in a pixel.
This is the main operation in the core loop of MARTINI. It is embarrassingly
parallel. To support parallel excecution we accept storing up to a copy of the
entire (future) datacube in one-pixel pieces. This avoids the need for concurrent
access to the datacube by parallel processes, which would in the simplest case
duplicate a copy of the datacube array per parallel process! In realistic use
cases the memory overhead from the equivalent of a second datacube array should be
minimal - memory-limited applications should be limited by the memory consumed by
particle data, which is not duplicated in parallel execution.
Parameters
----------
ij_px : tuple
A 2-tuple containing integers specifying the indices (i, j) of pixels in the
grid where the spectrum should be calculated.
Returns
-------
~astropy.units.Quantity
:class:`~astropy.units.Quantity` with dimensions of Jy per sq. arcsec.
A 1D array containing the spectrum, whose length must match the length of the
spectral axis of the datacube.
"""
ij = np.array(ij_px)[..., np.newaxis] * U.pix
mask = (
np.abs(ij - self.source.pixcoords[:2]) <= self.sph_kernel.sm_ranges
).all(axis=0)
weights = self.sph_kernel._px_weight(
self.source.pixcoords[:2, mask] - ij, mask=mask
)
assert self.spectral_model.spectra is not None
tmp = self.spectral_model.spectra[mask]
np.multiply(tmp, weights[:, np.newaxis], out=tmp)
insertion_values = np.sum(tmp, axis=-2)
del tmp
return insertion_values
def _insert_source_in_cube(
self,
skip_validation: bool = False,
progressbar: bool | None = None,
ncpu: int = 1,
quiet: bool | None = None,
) -> None:
"""
Populate the :class:`~martini.datacube.DataCube` with flux from source particles.
Parameters
----------
skip_validation : bool, optional
SPH kernel interpolation onto the DataCube is approximated for
increased speed. For some combinations of pixel size, distance
and SPH smoothing length, the approximation may break down. The
kernel class will check whether this will occur and raise a
RuntimeError if so. This validation can be skipped (at the cost
of accuracy!) by setting this parameter True.
progressbar : bool, optional
A progress bar is shown by default. If martini was initialised with
`quiet` set to `True`, progress bars are switched off unless explicitly
turned on.
ncpu : int
Number of processes to use in main source insertion loop. Using more than
one cpu requires the `multiprocess` module (n.b. not the same as
`multiprocessing`).
quiet : bool, optional
If ``True``, suppress output to stdout. If specified, takes precedence over
quiet parameter of class.
"""
if self.spectral_model.spectra is None:
self.init_spectra()
if progressbar is None:
progressbar = not self.quiet
self.sph_kernel._confirm_validation(noraise=skip_validation, quiet=self.quiet)
ij_pxs = list(
product(
np.arange(self._datacube._array.shape[0]),
np.arange(self._datacube._array.shape[1]),
)
)
# figure out which progressbar style to use
from tqdm.autonotebook import tqdm
if ncpu == 1:
self._datacube._array += U.Quantity(
[
self._evaluate_pixel_spectrum(ij_px)
for ij_px in tqdm(ij_pxs, disable=not progressbar)
]
).reshape(self._datacube._array.shape)
else:
from multiprocess.pool import ThreadPool
# not multiprocessing, need serialization from dill not pickle
total = len(ij_pxs)
with ThreadPool(processes=ncpu) as pool:
self._datacube._array += U.Quantity(
list(
tqdm(
pool.imap(
self._evaluate_pixel_spectrum,
ij_pxs,
),
total=total,
disable=not progressbar,
)
)
).reshape(self._datacube._array.shape)
self._datacube._array = self._datacube._array.to(
U.Jy / U.arcsec**2, equivalencies=[self._datacube.arcsec2_to_pix]
)
pad_mask = (
np.s_[
self._datacube.padx : -self._datacube.padx,
self._datacube.pady : -self._datacube.pady,
...,
]
if self._datacube.padx > 0 and self._datacube.pady > 0
else np.s_[...]
)
inserted_flux_density = np.sum(
self._datacube._array[pad_mask] * self._datacube.px_size**2
).to(U.Jy)
inserted_mass = (
2.36e5
* U.Msun
* self.source.distance.to_value(U.Mpc) ** 2
* np.sum(
(self._datacube._array[pad_mask] * self._datacube.px_size**2)
.sum((0, 1))
.squeeze()
.to_value(U.Jy)
* np.abs(
np.diff(self._datacube.velocity_channel_edges.to_value(U.km / U.s))
)
)
)
if (quiet is None and not self.quiet) or (quiet is not None and not quiet):
print(
"Source inserted.",
f" Flux density in cube: {inserted_flux_density:.2e}",
f" Mass in cube (assuming distance {self.source.distance:.2f} and a"
f" spatially resolved source):"
f" {inserted_mass:.2e}",
f" [{inserted_mass / self.source.input_mass * 100 << 1:.0f}%"
f" of initial source mass]",
f" Maximum pixel: {self._datacube._array.max():.2e}",
" Median non-zero pixel:"
f" {np.median(self._datacube._array[self._datacube._array > 0]):.2e}",
sep="\n",
)
return
def reset(self) -> None:
"""Re-initialize the :class:`~martini.datacube.DataCube` with zero-values."""
init_kwargs = {
"n_px_x": self._datacube.n_px_x,
"n_px_y": self._datacube.n_px_y,
"n_channels": self._datacube.n_channels,
"px_size": self._datacube.px_size,
"channel_width": self._datacube.channel_width,
"spectral_centre": self._datacube.spectral_centre,
"ra": self._datacube.ra,
"dec": self._datacube.dec,
"stokes_axis": self._datacube.stokes_axis,
}
self._datacube = DataCube(**init_kwargs)
if self.beam is not None:
self._datacube.add_pad(self.beam.needs_pad())
return
def preview(
self,
max_points: int = 5000,
fig: int = 1,
lim: str | U.Quantity[U.deg] | None = None,
vlim: str | U.Quantity[U.km / U.s] | None = None,
point_scaling: str = "auto",
title: str = "",
save: str | None = None,
) -> "Figure":
"""
Produce a figure showing the source particle coordinates and velocities.
The data cube region is also outlined.
Makes a 3-panel figure showing the projection of the source as it will appear in
the mock observation. The first panel shows the particles in the y-z plane,
coloured by the x-component of velocity (MARTINI projects the source along the
x-axis). The second and third panels are position-velocity diagrams showing the
x-component of velocity against the y and z coordinates, respectively. The red
boxes drawn on the panels show the data cube extent in position and velocity at
the distance and systemic velocity of the source.
Parameters
----------
max_points : int, optional
Maximum number of points to draw per panel, the particles will be randomly
subsampled if the source has more.
fig : int, optional
Number of the figure in matplotlib, it will be created as ``plt.figure(fig)``.
lim : ~astropy.units.Quantity, optional
:class:`~astropy.units.Quantity` with dimensions of angle.
The coordinate axes extend from ``-lim`` to ``lim``. If unspecified, the
maximum angular separation between the source centre defined by its ``ra`` and
``dec`` and the particle sky positions is used. Whether specified or not, the
axes are expanded if necessary to contain the extent of the data cube.
Alternatively, the string ``"datacube"`` can be passed. In this case the axis
limits are set by the extent of the data cube.
vlim : ~astropy.units.Quantity, optional
:class:`~astropy.units.Quantity` with dimensions of speed.
The velocity axes and colour bar extend from ``-vlim`` to ``vlim``. If
unspecified, the maximum absolute velocity of particles in the source is
used. Whether specified or not, the axes are expanded if necessary to contain
the extent of the data cube. Alternatively, ``"datacube"`` can be
passed. In this case the axis limits are set by the extent of the data cube.
point_scaling : str, optional
By default points are scaled in size and transparency according to their HI
mass and the smoothing length (loosely proportional to their surface
densities, but with different scaling to achieve a visually useful plot). For
some sources the automatic scaling may not give useful results, using
``point_scaling="fixed"`` will plot points of constant size without opacity.
title : str, optional
A title for the figure can be provided.
save : str, optional
If provided, the figure is saved using ``plt.savefig(save)``. A `.png` or
`.pdf` suffix is recommended.
Returns
-------
matplotlib.figure.Figure
The preview :class:`~matplotlib.figure.Figure`.
"""
import matplotlib.pyplot as plt
# pass through arguments, except save (which we will do later if desired)
figure = self.source.preview(
max_points=max_points,
fig=fig,
lim=lim if lim != "datacube" else None,
vlim=vlim if vlim != "datacube" else None,
point_scaling=point_scaling,
title=title,
save=None,
)
y_cube = (
Angle(self._datacube.ra)
.wrap_at(self.source.ra + 180 * U.deg)
.to_value(U.deg)
)
z_cube = self._datacube.dec.to_value(U.deg)
v_cube = self._datacube.spectral_centre.to_value(U.km / U.s)
dy_cube = 0.5 * (self._datacube.px_size * self._datacube.n_px_x).to_value(
U.deg
) # half-length
dz_cube = 0.5 * (self._datacube.px_size * self._datacube.n_px_y).to_value(
U.deg
) # half-length
dv_cube = 0.5 * (
self._datacube.channel_width * self._datacube.n_channels
).to_value(U.km / U.s)
if dy_cube > 5:
warn("DataCube RA range > 5deg, preview is very approximate!")
if dz_cube > 5:
warn("DataCube Dec range > 5deg, preview is very approximate!")
if np.abs(self._datacube.dec.to_value(U.deg)) + 0.5 * dz_cube > 85:
warn("DataCube extent within 5deg of pole, preview is very approximate!")
sp1, sp2, sp3, cb = figure.get_axes()
sp1.add_patch(
plt.Rectangle(
(
(y_cube - dy_cube),
(z_cube - dz_cube),
),
2 * dy_cube,
2 * dz_cube,
linewidth=5,
edgecolor="red",
facecolor="None",
)
)
sp2.add_patch(
plt.Rectangle(
(
(y_cube - dy_cube),
(v_cube - dv_cube),
),
2 * dy_cube,
2 * dv_cube,
linewidth=5,
edgecolor="red",
facecolor="None",
)
)
sp3.add_patch(
plt.Rectangle(
(
(z_cube - dz_cube),
(v_cube - dv_cube),
),
2 * dz_cube,
2 * dv_cube,
linewidth=5,
edgecolor="red",
facecolor="None",
)
)
if lim == "datacube":
sp1.set_xlim((y_cube + dy_cube), (y_cube - dy_cube))
sp1.set_ylim((z_cube - dz_cube), (z_cube + dz_cube))
sp2.set_xlim((y_cube + dy_cube), (y_cube - dy_cube))
sp3.set_xlim((z_cube - dz_cube), (z_cube + dz_cube))
elif lim is None:
sp1.set_xlim(
(
max((y_cube + 1.05 * dy_cube), sp1.get_xlim()[0]),
min((y_cube - 1.05 * dy_cube), sp1.get_xlim()[1]),
)
)
sp2.set_xlim(
(
max((y_cube + 1.05 * dy_cube), sp2.get_xlim()[0]),
min((y_cube - 1.05 * dy_cube), sp2.get_xlim()[1]),
)
)
sp1.set_ylim(
(
min((z_cube - 1.05 * dz_cube), sp1.get_ylim()[0]),
max((z_cube + 1.05 * dz_cube), sp1.get_ylim()[1]),
)
)
sp3.set_xlim(
(
min((z_cube - 1.05 * dz_cube), sp3.get_xlim()[0]),
max((z_cube + 1.05 * dz_cube), sp3.get_xlim()[1]),
)
)
if vlim == "datacube":
sp2.set_ylim(
(v_cube - dv_cube),
(v_cube + dv_cube),
)
sp3.set_ylim(
(v_cube - dv_cube),
(v_cube + dv_cube),
)
elif vlim is None:
sp2.set_ylim(
(
min((v_cube - 1.05 * dv_cube), sp2.get_ylim()[0]),
max((v_cube + 1.05 * dv_cube), sp2.get_ylim()[1]),
)
)
sp3.set_ylim(
(
min((v_cube - 1.05 * dv_cube), sp3.get_ylim()[0]),
max((v_cube + 1.05 * dv_cube), sp3.get_ylim()[1]),
)
)
if save is not None:
plt.savefig(save)
return figure
[docs]
class Martini(_BaseMartini):
"""
Creates synthetic HI data cubes from simulation data.
Usual use of martini involves first creating instances of classes from each
of the required and optional sub-modules, then creating a
:class:`~martini.martini.Martini` with these instances as arguments. The object can
then be used to create synthetic observations, usually by calling
:meth:`~martini.martini.Martini.insert_source_in_cube`,
(optionally) :meth:`~martini.martini.Martini.add_noise`, (optionally)
:meth:`~martini.martini.Martini.convolve_beam` and
:meth:`~martini.martini.Martini.write_fits` in order.
Parameters
----------
source : SPHSource
An instance of a class derived from
:class:`~martini.sources.sph_source.SPHSource`.
A description of the HI emitting object, including position, geometry
and an interface to the simulation data (SPH particle masses,
positions, etc.). See :doc:`sub-module documentation </sources/index>`.
datacube : ~martini.datacube.DataCube
A :class:`~martini.datacube.DataCube` instance.
A description of the datacube to create, including pixels, channels,
sky position. See :doc:`sub-module documentation </datacube/index>`.
beam : _BaseBeam, optional
An instance of a class derived from `~martini.beams._BaseBeam`.
A description of the beam for the simulated telescope. Given a
description, either mathematical or as an image, the creation of a
custom beam is straightforward. See
:doc:`sub-module documentation </beams/index>`.
noise : _BaseNoise, optional
An instance of a class derived from :class:`~martini.noise._BaseNoise`.
A description of the simulated noise. A simple Gaussian noise model is
provided; implementation of other noise models is straightforward. See
:doc:`sub-module documentation </noise/index>`.
sph_kernel : _BaseSPHKernel
An instance of a class derived from :class:`~martini.sph_kernels._BaseSPHKernel`.
A description of the SPH smoothing kernel. Check simulation
documentation for the kernel used in a particular simulation, and
:doc:`SPH kernel sub-module documentation </sph_kernels/index>` for guidance.
spectral_model : _BaseSpectrum
An instance of a class derived from
:class:`~martini.spectral_models._BaseSpectrum`.
A description of the HI line produced by a particle of given
properties. A Dirac-delta spectrum, and both fixed-width and
temperature-dependent Gaussian line models are provided; implementing
other models is straightforward. See
:doc:`sub-module documentation </spectral_models/index>`.
quiet : bool, optional
If ``True``, suppress output to stdout.
See Also
--------
martini.sources.sph_source.SPHSource
martini.datacube.DataCube
martini.beams
martini.noise
martini.sph_kernels
martini.spectral_models
martini.martini.GlobalProfile
Examples
--------
More detailed examples can be found in the examples directory in the github
distribution of the package.
The following example illustrates basic use of MARTINI, using a (very!)
crude model of a gas disk. This example can be run by doing
'from martini import demo; demo()'::
# ------make a toy galaxy----------
N = 500
phi = np.random.rand(N) * 2 * np.pi
r = []
for L in np.random.rand(N):
def f(r):
return L - 0.5 * (2 - np.exp(-r) * (np.power(r, 2) + 2 * r + 2))
r.append(fsolve(f, 1.0)[0])
r = np.array(r)
# exponential disk
r *= 3 / np.sort(r)[N // 2]
z = -np.log(np.random.rand(N))
# exponential scale height
z *= 0.5 / np.sort(z)[N // 2] * np.sign(np.random.rand(N) - 0.5)
x = r * np.cos(phi)
y = r * np.sin(phi)
xyz_g = np.vstack((x, y, z)) * U.kpc
# linear rotation curve
vphi = 100 * r / 6.0
vx = -vphi * np.sin(phi)
vy = vphi * np.cos(phi)
# small pure random z velocities
vz = (np.random.rand(N) * 2.0 - 1.0) * 5
vxyz_g = np.vstack((vx, vy, vz)) * U.km * U.s**-1
T_g = np.ones(N) * 8e3 * U.K
mHI_g = np.ones(N) / N * 5.0e9 * U.Msun
# ~mean interparticle spacing smoothing
hsm_g = np.ones(N) * 4 / np.sqrt(N) * U.kpc
# ---------------------------------
source = SPHSource(
distance=3.0 * U.Mpc,
rotation={"L_coords": (60.0 * U.deg, 0.0 * U.deg)},
ra=0.0 * U.deg,
dec=0.0 * U.deg,
h=0.7,
T_g=T_g,
mHI_g=mHI_g,
xyz_g=xyz_g,
vxyz_g=vxyz_g,
hsm_g=hsm_g,
)
datacube = DataCube(
n_px_x=128,
n_px_y=128,
n_channels=32,
px_size=10.0 * U.arcsec,
channel_width=10.0 * U.km * U.s**-1,
spectral_centre=source.vsys,
)
beam = GaussianBeam(
bmaj=30.0 * U.arcsec, bmin=30.0 * U.arcsec, bpa=0.0 * U.deg, truncate=4.0
)
noise = GaussianNoise(rms=3.0e-5 * U.Jy * U.beam**-1)
spectral_model = GaussianSpectrum(sigma=7 * U.km * U.s**-1)
sph_kernel = CubicSplineKernel()
M = Martini(
source=source,
datacube=datacube,
beam=beam,
noise=noise,
spectral_model=spectral_model,
sph_kernel=sph_kernel,
)
M.insert_source_in_cube()
M.add_noise()
M.convolve_beam()
M.write_beam_fits(beamfile)
M.write_fits(cubefile)
print(f"Wrote demo fits output to {cubefile}, and beam image to {beamfile}.")
try:
M.write_hdf5(hdf5file)
except ModuleNotFoundError:
print("h5py package not present, skipping hdf5 output demo.")
else:
print(f"Wrote demo hdf5 output to {hdf5file}.")
"""
def __init__(
self,
*,
source: SPHSource,
datacube: DataCube,
beam: _BaseBeam | None = None,
noise: _BaseNoise | None = None,
sph_kernel: _BaseSPHKernel,
spectral_model: _BaseSpectrum,
quiet: bool = False,
) -> None:
super().__init__(
source=source,
datacube=datacube,
beam=beam,
noise=noise,
sph_kernel=sph_kernel,
spectral_model=spectral_model,
quiet=quiet,
)
return
@property
def datacube(self) -> DataCube:
"""
The :class:`~martini.datacube.DataCube` object for this mock observation.
Returns
-------
~martini.datacube.DataCube
The :class:`~martini.datacube.DataCube` contained by this
:class:`~martini.martini.Martini` instance.
"""
return self._datacube
[docs]
def insert_source_in_cube(
self,
skip_validation: bool = False,
progressbar: bool | None = None,
ncpu: int = 1,
) -> None:
"""
Populate the DataCube with flux from the particles in the source.
Parameters
----------
skip_validation : bool, optional
SPH kernel interpolation onto the :class:`~martini.datacube.DataCube`
is approximated for increased speed. For some combinations of pixel size,
distance and SPH smoothing length, the approximation may break down. The
kernel class will check whether this will occur and raise a
``RuntimeError`` if so. This validation can be skipped (at the cost
of accuracy!) by setting this parameter ``True``.
progressbar : bool, optional
A progress bar is shown by default. Progress bars work, with perhaps
some visual glitches, in parallel. If :class:`~martini.martini.Martini` was
initialised with ``quiet`` set to ``True``, progress bars are switched off
unless explicitly turned on.
ncpu : int
Number of processes to use in main source insertion loop. Using more than
one cpu requires the :mod:`multiprocess` module (n.b. not the same as
``multiprocessing``).
"""
super()._insert_source_in_cube(
skip_validation=skip_validation, progressbar=progressbar, ncpu=ncpu
)
return
[docs]
def convolve_beam(self) -> None:
"""Convolve the beam and data cube."""
if self.beam is None:
warn("Skipping beam convolution, no beam object provided to Martini.")
return
minimum_padding = self.beam.needs_pad()
if (self._datacube.padx < minimum_padding[0]) or (
self._datacube.pady < minimum_padding[1]
):
raise ValueError(
"datacube padding insufficient for beam convolution (perhaps you loaded a"
" datacube state with datacube.load_state that was previously initialized"
" by martini with a smaller beam?)"
)
unit = self._datacube._array.unit
assert self.beam.kernel is not None # placate mypy
for spatial_slice in self._datacube.spatial_slices:
# use a view [...] to force in-place modification
spatial_slice[...] = (
fftconvolve(spatial_slice, self.beam.kernel, mode="same") * unit
)
self._datacube.drop_pad()
self._datacube._array = self._datacube._array.to(
U.Jy * U.beam**-1,
equivalencies=U.beam_angular_area(self.beam.area),
)
if not self.quiet:
print(
"Beam convolved.",
" Data cube RMS after beam convolution:"
f" {np.std(self._datacube._array):.2e}",
f" Maximum pixel: {self._datacube._array.max():.2e}",
" Median non-zero pixel:"
f" {np.median(self._datacube._array[self._datacube._array > 0]):.2e}",
sep="\n",
)
return
[docs]
def add_noise(self) -> None:
"""Insert noise into the data cube."""
if self.noise is None:
warn("Skipping noise, no noise object provided to Martini.")
return
if self.beam is None:
warn(
"Skipping noise, no beam object (required to estimate post-convolution"
" noise level) provided to Martini."
)
return
# this unit conversion means noise can be added before or after source insertion:
noise_cube = (
self.noise.generate(self._datacube, self.beam)
.to(
U.Jy * U.arcsec**-2,
equivalencies=U.beam_angular_area(self.beam.area),
)
.to(
self._datacube._array.unit,
equivalencies=[self._datacube.arcsec2_to_pix],
)
)
self._datacube._array = self._datacube._array + noise_cube
if not self.quiet:
print(
"Noise added.",
f" Noise cube RMS: {np.std(noise_cube):.2e} (before beam convolution).",
" Data cube RMS after noise addition (before beam convolution): "
f"{np.std(self._datacube._array):.2e}",
sep="\n",
)
return
[docs]
def write_fits(
self,
filename: str,
overwrite: bool = True,
obj_name: str = "MOCK",
channels: None = None, # deprecated
dtype: str | np.dtype | None = None,
) -> None:
"""
Output the data cube to a FITS-format file.
Parameters
----------
filename : str
Name of the file to write. ``'.fits'`` will be appended if not already
present in the filename. ``'.fits.gz'`` etc. will be recognized.
overwrite : bool, optional
Whether to allow overwriting existing files.
obj_name : str
Name to write in the ``OBJECT`` FITS header field (max 16 characters).
channels : str, deprecated
Deprecated, channels and their units now fixed at
:class:`~martini.datacube.DataCube` initialization.
dtype : str or dtype
Typecode or data-type to which the array is cast. Should be supported
by fits. Default to not do data type conversion.
"""
if channels is not None: # pragma: no cover
warnings.warn(
DeprecationWarning(
"`channels` argument to `write_fits` ignored, channels and their"
" units now fixed at DataCube initialization."
)
)
self._datacube.drop_pad()
filename = str(filename)
filename = filename + ".fits" if "fits" not in filename.split(".") else filename
wcs_header = self._datacube.wcs.to_header()
wcs_header.rename_keyword("WCSAXES", "NAXIS")
header = fits.Header()
header.append(("SIMPLE", "T"))
header.append(("BITPIX", 16))
header.append(("NAXIS", wcs_header["NAXIS"]))
header.append(("NAXIS1", self._datacube.n_px_x))
header.append(("NAXIS2", self._datacube.n_px_y))
header.append(("NAXIS3", self._datacube.n_channels))
if self._datacube.stokes_axis:
header.append(("NAXIS4", 1))
header.append(("EXTEND", "T"))
header.append(("CDELT1", wcs_header["CDELT1"]))
header.append(("CRPIX1", wcs_header["CRPIX1"]))
header.append(("CRVAL1", wcs_header["CRVAL1"]))
header.append(("CTYPE1", wcs_header["CTYPE1"]))
header.append(("CUNIT1", wcs_header["CUNIT1"]))
header.append(("CDELT2", wcs_header["CDELT2"]))
header.append(("CRPIX2", wcs_header["CRPIX2"]))
header.append(("CRVAL2", wcs_header["CRVAL2"]))
header.append(("CTYPE2", wcs_header["CTYPE2"]))
header.append(("CUNIT2", wcs_header["CUNIT2"]))
header.append(("CDELT3", wcs_header["CDELT3"]))
header.append(("CRPIX3", wcs_header["CRPIX3"]))
header.append(("CRVAL3", wcs_header["CRVAL3"]))
header.append(("CTYPE3", wcs_header["CTYPE3"]))
header.append(("CUNIT3", wcs_header["CUNIT3"]))
if self._datacube.stokes_axis:
header.append(("CDELT4", wcs_header["CDELT4"]))
header.append(("CRPIX4", wcs_header["CRPIX4"]))
header.append(("CRVAL4", wcs_header["CRVAL4"]))
header.append(("CTYPE4", wcs_header["CTYPE4"]))
header.append(("CUNIT4", "PAR"))
header.append(("EPOCH", 2000))
header.append(("INSTRUME", "MARTINI", martini_version))
header.append(("BSCALE", 1.0))
header.append(("BZERO", 0.0))
datacube_array_units = self._datacube._array.unit
header.append(
("DATAMAX", np.max(self._datacube._array.to_value(datacube_array_units)))
)
header.append(
("DATAMIN", np.min(self._datacube._array.to_value(datacube_array_units)))
)
header.append(("ORIGIN", "astropy v" + astropy_version))
# long names break fits format, don't let the user set this
if len(obj_name) > 16:
warnings.warn(
"obj_name longer than 16 characters, truncating", RuntimeWarning
)
obj_name = obj_name[:16]
header.append(("OBJECT", obj_name))
if self.beam is not None:
header.append(("BPA", self.beam.bpa.to_value(U.deg)))
header.append(("OBSERVER", "K. Oman"))
header.append(("BUNIT", datacube_array_units.to_string("fits")))
header.append(("DATE-OBS", Time.now().to_value("fits")))
header.append(("MJD-OBS", Time.now().to_value("mjd")))
if self.beam is not None:
header.append(("BMAJ", self.beam.bmaj.to_value(U.deg)))
header.append(("BMIN", self.beam.bmin.to_value(U.deg)))
header.append(("BTYPE", "Intensity"))
header.append(("SPECSYS", wcs_header["SPECSYS"]))
header.append(("RESTFRQ", wcs_header["RESTFRQ"]))
# flip axes to write
data = self._datacube._array.to_value(datacube_array_units).T
if dtype is not None:
data = data.astype(dtype, copy=False)
hdu = fits.PrimaryHDU(header=header, data=data)
hdu.writeto(filename, overwrite=overwrite)
return
[docs]
def write_beam_fits(
self,
filename: str,
overwrite: bool = True,
channels: None = None, # deprecated
) -> None:
"""
Output the beam to a FITS-format file.
The beam is written to file, with pixel sizes, coordinate system, etc.
similar to those used for the :class:`~martini.datacube.DataCube`.
Parameters
----------
filename : str
Name of the file to write. ``".fits"`` will be appended if not already
present.
overwrite : bool, optional
Whether to allow overwriting existing files.
channels : str, deprecated
Deprecated, channels and their units now fixed at
:class:`~martini.datacube.DataCube` initialization.
Raises
------
ValueError
If :class:`~martini.martini.Martini` was initialized without a ``beam``.
"""
if channels is not None: # pragma: no cover
warnings.warn(
DeprecationWarning(
"`channels` argument to `write_fits` ignored, channels and their"
" units now fixed at DataCube initialization."
)
)
if self.beam is None:
raise ValueError("Martini.write_beam_fits: Called with beam set to 'None'.")
assert self.beam.kernel is not None
filename = filename if filename[-5:] == ".fits" else filename + ".fits"
wcs_header = self._datacube.wcs.to_header()
beam_kernel_units = self.beam.kernel.unit
header = fits.Header()
header.append(("SIMPLE", "T"))
header.append(("BITPIX", 16))
header.append(("NAXIS", 3))
header.append(("NAXIS1", self.beam.kernel.shape[0]))
header.append(("NAXIS2", self.beam.kernel.shape[1]))
header.append(("NAXIS3", 1))
header.append(("EXTEND", "T"))
header.append(("BSCALE", 1.0))
header.append(("BZERO", 0.0))
# this is 1/arcsec^2, is this right?
header.append(("BUNIT", beam_kernel_units.to_string("fits")))
header.append(("CRPIX1", self.beam.kernel.shape[0] // 2 + 1))
header.append(("CDELT1", wcs_header["CDELT1"]))
header.append(("CRVAL1", wcs_header["CRVAL1"]))
header.append(("CTYPE1", wcs_header["CTYPE1"]))
header.append(("CUNIT1", wcs_header["CUNIT1"]))
header.append(("CRPIX2", self.beam.kernel.shape[1] // 2 + 1))
header.append(("CDELT2", wcs_header["CDELT2"]))
header.append(("CRVAL2", wcs_header["CRVAL2"]))
header.append(("CTYPE2", wcs_header["CTYPE2"]))
header.append(("CUNIT2", wcs_header["CUNIT2"]))
header.append(("CRPIX3", 1))
header.append(("CDELT3", wcs_header["CDELT3"]))
header.append(("CRVAL3", wcs_header["CRVAL3"]))
header.append(("CTYPE3", wcs_header["CTYPE3"]))
header.append(("CUNIT3", wcs_header["CUNIT3"]))
header.append(("SPECSYS", wcs_header["SPECSYS"]))
header.append(("BMAJ", self.beam.bmaj.to_value(U.deg)))
header.append(("BMIN", self.beam.bmin.to_value(U.deg)))
header.append(("BPA", self.beam.bpa.to_value(U.deg)))
header.append(("BTYPE", "beam "))
header.append(("EPOCH", 2000))
header.append(("OBSERVER", "K. Oman"))
# long names break fits format
header.append(("OBJECT", "MOCKBEAM"))
header.append(("INSTRUME", "MARTINI", martini_version))
header.append(("DATAMAX", np.max(self.beam.kernel.to_value(beam_kernel_units))))
header.append(("DATAMIN", np.min(self.beam.kernel.to_value(beam_kernel_units))))
header.append(("ORIGIN", "astropy v" + astropy_version))
# flip axes to write
hdu = fits.PrimaryHDU(
header=header,
data=self.beam.kernel.to_value(beam_kernel_units)[..., np.newaxis].T,
)
hdu.writeto(filename, overwrite=True)
return
[docs]
def write_hdf5(
self,
filename: str,
overwrite: bool = True,
memmap: bool = False,
compact: bool = False,
channels: None = None, # deprecated
) -> "h5py.File | None":
"""
Output the data cube and beam to a HDF5-format file.
Requires the :mod:`h5py` package.
Parameters
----------
filename : str
Name of the file to write. ``'.hdf5'`` will be appended if not already
present.
overwrite : bool, optional
Whether to allow overwriting existing files.
memmap : bool, optional
If ``True``, create a file-like object in memory and return it instead
of writing file to disk.
compact : bool, optional
If ``True``, omit pixel coordinate arrays to save disk space. In this
case pixel coordinates can still be reconstructed from FITS-style
keywords stored in the FluxCube attributes.
channels : str, deprecated
Deprecated, channels and their units now fixed at
:class:`~martini.datacube.DataCube` initialization.
"""
if channels is not None: # pragma: no cover
warnings.warn(
DeprecationWarning(
"`channels` argument to `write_fits` ignored, channels and their"
" units now fixed at DataCube initialization."
)
)
import h5py
self._datacube.drop_pad()
filename = filename if filename[-5:] == ".hdf5" else filename + ".hdf5"
wcs_header = self._datacube.wcs.to_header()
mode = "w" if overwrite else "x"
driver = "core" if memmap else None
h5_kwargs = {"backing_store": False} if memmap else {}
f = h5py.File(filename, mode, driver=driver, **h5_kwargs)
datacube_array_units = self._datacube._array.unit
f["FluxCube"] = self._datacube._array.to_value(datacube_array_units).squeeze()
c = f["FluxCube"]
origin = 0 # index from 0 like numpy, not from 1
if not compact:
# voxel centre coordinates:
xgrid, ygrid, vgrid = np.meshgrid(
np.arange(self._datacube._array.shape[0]),
np.arange(self._datacube._array.shape[1]),
np.arange(self._datacube._array.shape[2]),
indexing="ij",
)
cgrid = (
np.vstack(
(
xgrid.flatten(),
ygrid.flatten(),
vgrid.flatten(),
np.zeros(vgrid.shape).flatten(),
)
).T
if self._datacube.stokes_axis
else np.vstack(
(
xgrid.flatten(),
ygrid.flatten(),
vgrid.flatten(),
)
).T
)
wgrid = self._datacube.wcs.all_pix2world(cgrid, origin)
grid_shape = (
self.datacube.n_px_x,
self.datacube.n_px_y,
self.datacube.n_channels,
)
ragrid = wgrid[:, 0].reshape(grid_shape)
decgrid = wgrid[:, 1].reshape(grid_shape)
chgrid = wgrid[:, 2].reshape(grid_shape)
f["RA"] = ragrid
f["RA"].attrs["Unit"] = wcs_header["CUNIT1"]
f["Dec"] = decgrid
f["Dec"].attrs["Unit"] = wcs_header["CUNIT2"]
f["channel_mids"] = chgrid
f["channel_mids"].attrs["Unit"] = wcs_header["CUNIT3"]
# voxel vertex coordinates (for e.g. pyplot.pcolormesh):
xgrid_vertices, ygrid_vertices, vgrid_vertices = np.meshgrid(
np.arange(self._datacube._array.shape[0] + 1) - 0.5,
np.arange(self._datacube._array.shape[1] + 1) - 0.5,
np.arange(self._datacube._array.shape[2] + 1) - 0.5,
indexing="ij",
)
cgrid_vertices = (
np.vstack(
(
xgrid_vertices.flatten(),
ygrid_vertices.flatten(),
vgrid_vertices.flatten(),
np.zeros(vgrid_vertices.shape).flatten(),
)
).T
if self._datacube.stokes_axis
else np.vstack(
(
xgrid_vertices.flatten(),
ygrid_vertices.flatten(),
vgrid_vertices.flatten(),
)
).T
)
wgrid_vertices = self._datacube.wcs.all_pix2world(cgrid_vertices, origin)
vertices_grid_shape = (
self.datacube.n_px_x + 1,
self.datacube.n_px_y + 1,
self.datacube.n_channels + 1,
)
ragrid_vertices = wgrid_vertices[:, 0].reshape(vertices_grid_shape)
decgrid_vertices = wgrid_vertices[:, 1].reshape(vertices_grid_shape)
chgrid_vertices = wgrid_vertices[:, 2].reshape(vertices_grid_shape)
f["RA_vertices"] = ragrid_vertices
f["RA_vertices"].attrs["Unit"] = wcs_header["CUNIT1"]
f["Dec_vertices"] = decgrid_vertices
f["Dec_vertices"].attrs["Unit"] = wcs_header["CUNIT2"]
f["channel_vertices"] = chgrid_vertices
f["channel_vertices"].attrs["Unit"] = wcs_header["CUNIT3"]
# channels:
for dataset_name in (
"velocity_channel_mids",
"velocity_channel_edges",
"frequency_channel_mids",
"frequency_channel_edges",
):
f[dataset_name] = getattr(self._datacube, dataset_name)
f[dataset_name].attrs["Unit"] = str(
getattr(self._datacube, dataset_name).unit
)
c.attrs["AxisOrder"] = "(RA,Dec,Channels)"
c.attrs["FluxCubeUnit"] = str(self._datacube._array.unit)
c.attrs["deltaRA_in_RAUnit"] = wcs_header["CDELT1"]
c.attrs["RA0_in_px"] = wcs_header["CRPIX1"] - 1
c.attrs["RA0_in_RAUnit"] = wcs_header["CRVAL1"]
c.attrs["RAUnit"] = wcs_header["CUNIT1"]
c.attrs["RAProjType"] = wcs_header["CTYPE1"]
c.attrs["deltaDec_in_DecUnit"] = wcs_header["CDELT2"]
c.attrs["Dec0_in_px"] = wcs_header["CRPIX2"] - 1
c.attrs["Dec0_in_DecUnit"] = wcs_header["CRVAL2"]
c.attrs["DecUnit"] = wcs_header["CUNIT2"]
c.attrs["DecProjType"] = wcs_header["CTYPE2"]
c.attrs["deltaV_in_VUnit"] = wcs_header["CDELT3"]
c.attrs["V0_in_px"] = wcs_header["CRPIX3"] - 1
c.attrs["V0_in_VUnit"] = wcs_header["CRVAL3"]
c.attrs["VUnit"] = wcs_header["CUNIT3"]
c.attrs["VProjType"] = wcs_header["CTYPE3"]
c.attrs["SpecSys"] = wcs_header["SPECSYS"]
if self.beam is not None:
c.attrs["BeamPA"] = self.beam.bpa.to_value(U.deg)
c.attrs["BeamMajor_in_deg"] = self.beam.bmaj.to_value(U.deg)
c.attrs["BeamMinor_in_deg"] = self.beam.bmin.to_value(U.deg)
c.attrs["DateCreated"] = str(Time.now())
c.attrs["MartiniVersion"] = martini_version
c.attrs["AstropyVersion"] = astropy_version
if self.beam is not None:
if self.beam.kernel is None:
raise ValueError(
"Martini.write_hdf5: Called with beam present but beam kernel"
" uninitialized."
)
beam_kernel_units = self.beam.kernel.unit
f["Beam"] = self.beam.kernel.to_value(beam_kernel_units)[..., np.newaxis]
b = f["Beam"]
b.attrs["BeamUnit"] = self.beam.kernel.unit.to_string("fits")
b.attrs["deltaRA_in_RAUnit"] = wcs_header["CDELT1"]
b.attrs["RA0_in_px"] = self.beam.kernel.shape[0] // 2
b.attrs["RA0_in_RAUnit"] = wcs_header["CRVAL1"]
b.attrs["RAUnit"] = wcs_header["CUNIT1"]
b.attrs["RAProjType"] = wcs_header["CTYPE1"]
b.attrs["deltaDec_in_DecUnit"] = wcs_header["CDELT2"]
b.attrs["Dec0_in_px"] = self.beam.kernel.shape[1] // 2
b.attrs["Dec0_in_DecUnit"] = wcs_header["CRVAL2"]
b.attrs["DecUnit"] = wcs_header["CUNIT2"]
b.attrs["DecProjType"] = wcs_header["CTYPE2"]
b.attrs["deltaV_in_VUnit"] = wcs_header["CDELT3"]
b.attrs["V0_in_px"] = 0
b.attrs["V0_in_VUnit"] = wcs_header["CRVAL3"]
b.attrs["VUnit"] = wcs_header["CUNIT3"]
b.attrs["VProjType"] = wcs_header["CTYPE3"]
b.attrs["BeamPA"] = self.beam.bpa.to_value(U.deg)
b.attrs["BeamMajor_in_deg"] = self.beam.bmaj.to_value(U.deg)
b.attrs["BeamMinor_in_deg"] = self.beam.bmin.to_value(U.deg)
b.attrs["DateCreated"] = str(Time.now())
b.attrs["MartiniVersion"] = martini_version
b.attrs["AstropyVersion"] = astropy_version
if memmap:
return f
else:
f.close()
return None
[docs]
class GlobalProfile(_BaseMartini):
"""
Just produce a global spectrum of an HI source.
A simplified version of the main :class:`~martini.martini.Martini` class for those
times when you just want a spectrum.
Sometimes only the spatially integrated spectrum of a source is wanted, which makes
many parts of the standard MARTINI process unnecessary. This class streamlines the
process of producing a spatially-integrated spectrum, or "global profile", with some
specific assumptions:
- The :class:`~martini.martini.GlobalProfile` class does not assume any spatial
aperture, instead every particle in the source contributes to the spectrum (unless
it falls outside of the spectral bandwidth).
- The positions of particles are still used to calculate the line-of-sight vector
and the velocity along this direction.
There is therefore no need or way to specify a beam or SPH kernel as with the main
:class:`~martini.martini.Martini` class. It is also not possible to use MARTINI's
noise modules with this class. If these restrictions are found to be too limiting
(for example, if the spectrum within a spatial mask defined by a signal-to-noise or
other cut is desired), the best course of action is to produce a spatially-resolved
mock observation and derive the spectrum from those data as would be done with "real"
observations. This class is mainly intended to efficiently provide a "quick look" at
the spectrum, or a reference "ideal" spectrum.
Parameters
----------
source : ~martini.sources.sph_source.SPHSource
An instance of a class derived from
:class:`~martini.sources.sph_source.SPHSource`.
A description of the HI emitting object, including position, geometry
and an interface to the simulation data (SPH particle masses,
positions, etc.). See :doc:`sub-module documentation </sources/index>`.
spectral_model : ~martini.spectral_models._BaseSpectrum
An instance of a class derived from
:class:`~martini.spectral_models._BaseSpectrum`.
A description of the HI line produced by a particle of given
properties. A Dirac-delta spectrum, and both fixed-width and
temperature-dependent Gaussian line models are provided; implementing
other models is straightforward. See
:doc:`sub-module documentation </spectral_models/index>`.
n_channels : int
Number of channels along the spectral axis.
channel_width : ~astropy.units.Quantity
:class:`~astropy.units.Quantity`, with dimensions of velocity or frequency.
Step size along the spectral axis. Can be provided as a velocity or a
frequency.
spectral_centre : ~astropy.units.Quantity, optional
:class:`~astropy.units.Quantity` with dimensions of velocity or frequency.
Velocity (or frequency) of the centre along the spectral axis. Defaults to 0 km/s,
but should normally be set to the systemic velocity of the source.
quiet : bool, optional
If ``True``, suppress output to stdout.
channels : str, deprecated
Deprecated, channels and their units now fixed at
:class:`~martini.datacube.DataCube` initialization.
See Also
--------
martini.sources.sph_source.SPHSource
martini.spectral_models
martini.martini.Martini
Examples
--------
::
# ------make a toy galaxy----------
N = 500
phi = np.random.rand(N) * 2 * np.pi
r = []
for L in np.random.rand(N):
def f(r):
return L - 0.5 * (2 - np.exp(-r) * (np.power(r, 2) + 2 * r + 2))
r.append(fsolve(f, 1.0)[0])
r = np.array(r)
# exponential disk
r *= 3 / np.sort(r)[N // 2]
z = -np.log(np.random.rand(N))
# exponential scale height
z *= 0.5 / np.sort(z)[N // 2] * np.sign(np.random.rand(N) - 0.5)
x = r * np.cos(phi)
y = r * np.sin(phi)
xyz_g = np.vstack((x, y, z)) * U.kpc
# linear rotation curve
vphi = 100 * r / 6.0
vx = -vphi * np.sin(phi)
vy = vphi * np.cos(phi)
# small pure random z velocities
vz = (np.random.rand(N) * 2.0 - 1.0) * 5
vxyz_g = np.vstack((vx, vy, vz)) * U.km * U.s**-1
T_g = np.ones(N) * 8e3 * U.K
mHI_g = np.ones(N) / N * 5.0e9 * U.Msun
# ~mean interparticle spacing smoothing
hsm_g = np.ones(N) * 4 / np.sqrt(N) * U.kpc
# ---------------------------------
source = SPHSource(
distance=3.0 * U.Mpc,
rotation={"L_coords": (60.0 * U.deg, 0.0 * U.deg)},
ra=0.0 * U.deg,
dec=0.0 * U.deg,
h=0.7,
T_g=T_g,
mHI_g=mHI_g,
xyz_g=xyz_g,
vxyz_g=vxyz_g,
hsm_g=hsm_g,
)
spectral_model = GaussianSpectrum(sigma=7 * U.km * U.s**-1)
M = GlobalProfile(
source=source,
spectral_model=spectral_model,
n_channels=32,
channel_width=10 * U.km * U.s**-1,
spectral_centre=source.vsys,
)
# spectrum and channel edges and centres can be accessed as:
M.spectrum # spectrum is evaluated at first access if not already done
M.channel_edges
M.channel_mids
# the spectrum can be explicitly evaluated with:
M.insert_source_in_spectrum()
"""
def __init__(
self,
*,
source: SPHSource,
spectral_model: _BaseSpectrum,
n_channels: int,
channel_width: U.Quantity[U.km / U.s] | U.Quantity[U.Hz],
spectral_centre: U.Quantity[U.km / U.s] | U.Quantity[U.Hz] = 0 * U.km * U.s**-1,
quiet: bool = False,
channels: None = None, # deprecated
) -> None:
if channels is not None: # pragma: no cover
warnings.warn(
DeprecationWarning(
"The `channels` argument to `GlobalProfile.__init__` is deprecated"
" and has been ignored. If `channel_width` has velocity units"
" channels are evenly spaced in velocity, and if it has frequency"
" units they are evenly spaced in frequency."
)
)
super().__init__(
source=source,
datacube=_GlobalProfileDataCube(
n_channels=n_channels,
channel_width=channel_width,
spectral_centre=spectral_centre,
),
beam=None,
noise=None,
sph_kernel=DiracDeltaKernel(size_in_fwhm=np.inf),
spectral_model=spectral_model,
_prune_kwargs={
"spatial": False,
"spectral": True,
"mass": True,
"obj_type_str": "spectrum",
},
quiet=quiet,
)
self.source.pixcoords[:2] = 0
return
[docs]
def insert_source_in_spectrum(self) -> None:
"""
Populate the :class:`~martini.datacube.DataCube` with flux from source particles.
For the :class:`~martini.martini.GlobalProfile` class we assume that every
particle in the source contributes (if it falls in the spectral bandwidth)
regardless of position on the sky. The line-of-sight vector still depends on
the particle positions, so the direction to the individual particles is still
taken into account.
"""
# skip_validation=True: all particles can contribute their kernel to the pixel;
# ncpu=1 since we have 1 pixel and source insertion is parallel over pixels;
# no progressbar since there's only 1 pixel of progress;
# quiet=True because messages assume a resolved source, replace with new ones
super()._insert_source_in_cube(
skip_validation=True, progressbar=False, ncpu=1, quiet=True
)
# The datacube in Jy/arcsec^2 is a bit misleading because the source is
# (presumably) completely unresolved so extrapolating its surface brightness
# across the entire pixel is incorrect. Correctly integrate out spatial
# information and convert to Jy:
self._spectrum = (
(self._datacube._array.squeeze()).to(
U.Jy / U.pix**2, equivalencies=[self._datacube.arcsec2_to_pix]
)
* U.pix**2
).to(U.Jy)
if not self.quiet:
# Need a slightly different calculation for a completely unresolved source.
inserted_flux_density = self.spectrum.sum()
inserted_mass = (
2.36e5
* U.Msun
* self.source.distance.to_value(U.Mpc) ** 2
* inserted_flux_density.to_value(U.Jy)
* self._datacube.channel_width.to_value(U.km / U.s)
)
print(
"Source inserted.",
f" Flux density in spectrum: {inserted_flux_density:.2e}",
f" Mass in spectrum (assuming distance {self.source.distance:.2f}):"
f" {inserted_mass:.2e}",
f" [{inserted_mass / self.source.input_mass * 100:.0f}%"
f" of initial source mass]",
sep="\n",
)
@property
def spectrum(self) -> U.Quantity[U.Jy]:
"""
The spectrum of the source with spatial information integrated out.
The spectrum is calculated when this property is accessed if this hasn't already
been done.
Returns
-------
~astropy.units.Quantity
:class:`~astropy.units.Quantity` with dimensions of flux density.
Spatially-integrated spectrum of the source.
"""
if not hasattr(self, "_spectrum"):
self.insert_source_in_spectrum()
return self._spectrum
@property
def channel_edges(self) -> U.Quantity[U.Hz] | U.Quantity[U.km / U.s]:
"""
The edges of the channels for the spectrum.
Returns
-------
~astropy.units.Quantity
:class:`~astropy.units.Quantity` with dimensions of frequency or velocity.
Edges of the channels with units depending on
:class:`~martini.martini.GlobalProfile`'s native channel spacing.
See Also
--------
martini.martini.GlobalProfile.channel_mids
martini.martini.GlobalProfile.velocity_channel_mids
martini.martini.GlobalProfile.velocity_channel_edges
martini.martini.GlobalProfile.frequency_channel_mids
martini.martini.GlobalProfile.frequency_channel_edges
"""
return self._datacube.channel_edges
@property
def channel_mids(self) -> U.Quantity[U.Hz] | U.Quantity[U.km / U.s]:
"""
The centres of the channels for the spectrum.
Returns
-------
~astropy.units.Quantity
:class:`~astropy.units.Quantity` with dimensions of frequency or velocity.
Edges of the channels with units depending on
:class:`~martini.martini.GlobalProfile`'s native channel spacing.
See Also
--------
martini.martini.GlobalProfile.channel_edges
martini.martini.GlobalProfile.velocity_channel_mids
martini.martini.GlobalProfile.velocity_channel_edges
martini.martini.GlobalProfile.frequency_channel_mids
martini.martini.GlobalProfile.frequency_channel_edges
"""
return self._datacube.channel_mids
@property
def frequency_channel_edges(self) -> U.Quantity[U.Hz]:
"""
The edges of the frequency channels for the spectrum.
Returns
-------
~astropy.units.Quantity
:class:`~astropy.units.Quantity` with dimensions of frequency.
Edges of the channels with frequency units.
See Also
--------
martini.martini.GlobalProfile.velocity_channel_mids
martini.martini.GlobalProfile.velocity_channel_edges
martini.martini.GlobalProfile.frequency_channel_mids
"""
return self._datacube.channel_edges
@property
def frequency_channel_mids(self) -> U.Quantity[U.Hz]:
"""
The centres of the frequency channels for the spectrum.
Returns
-------
~astropy.units.Quantity
:class:`~astropy.units.Quantity` with dimensions of frequency.
Edges of the channels with frequency units.
See Also
--------
martini.martini.GlobalProfile.velocity_channel_mids
martini.martini.GlobalProfile.velocity_channel_edges
martini.martini.GlobalProfile.frequency_channel_edges
"""
return self._datacube.channel_mids
@property
def velocity_channel_edges(self) -> U.Quantity[U.km / U.s]:
"""
The edges of the channels for the spectrum in velocity units.
Returns
-------
~astropy.units.Quantity
:class:`~astropy.units.Quantity` with dimensions of velocity.
Edges of the channels with velocity units.
See Also
--------
martini.martini.GlobalProfile.velocity_channel_mids
martini.martini.GlobalProfile.frequency_channel_mids
martini.martini.GlobalProfile.frequency_channel_edges
"""
return self._datacube.channel_edges
@property
def velocity_channel_mids(self) -> U.Quantity[U.km / U.s]:
"""
The centres of the channels for the spectrum.
Returns
-------
~astropy.units.Quantity
:class:`~astropy.units.Quantity` with dimensions of velocity.
Edges of the channels with velocity units.
See Also
--------
martini.martini.GlobalProfile.velocity_channel_edges
martini.martini.GlobalProfile.frequency_channel_mids
martini.martini.GlobalProfile.frequency_channel_edges
"""
return self._datacube.channel_mids
@property
def channel_width(self) -> U.Quantity[U.Hz] | U.Quantity[U.km / U.s]:
"""
The width of the channels for the spectrum.
Returns
-------
~astropy.units.Quantity
:class:`~astropy.units.Quantity` with dimentions of frequency or velocity.
Width of the channels with units depending on
:class:`~martini.martini.GlobalProfile`'s ``channels`` argument.
"""
return self._datacube.channel_width
[docs]
def reset(self) -> None:
"""Re-initialize the spectrum with zero-values."""
super().reset()
if hasattr(self, "_spectrum"):
del self._spectrum
return
[docs]
def plot_spectrum(
self,
fig: int = 1,
title: str = "",
channels: str = "velocity",
show_vsys: bool = True,
save: str | None = None,
) -> "Figure":
"""
Produce a figure showing the spectrum.
Parameters
----------
fig : int, optional
Number of the figure in matplotlib, it will be created as ``plt.figure(fig)``.
title : str, optional
A title for the figure can be provided.
channels : str, optional
The type of spectral axis for the plot, either ``"velocity"`` or
``"frequency"``.
show_vsys : bool, optional
If ``True``, draw a vertical line at the source systemic velocity.
save : str, optional
If provided, the figure is saved using ``plt.savefig(save)``. A `.png` or
`.pdf` suffix is recommended.
Returns
-------
matplotlib.figure.Figure
The spectrum :class:`~matplotlib.figure.Figure`.
"""
import matplotlib.pyplot as plt
if channels == "velocity":
channel_mids = self.velocity_channel_mids
elif channels == "frequency":
channel_mids = self.frequency_channel_mids
else:
raise ValueError(
"`plot_spectrum` argument `channels` must be 'velocity' or 'frequency'."
)
figure = plt.figure(fig, figsize=(4, 3))
figure.clf()
figure.suptitle(title)
xunit = {"velocity": U.km * U.s**-1, "frequency": U.GHz}[channels]
ax = figure.add_subplot(1, 1, 1)
ax.plot(
channel_mids.to_value(xunit),
self.spectrum.to_value(U.Jy),
ls="solid",
color="black",
lw=2,
)
if show_vsys:
ax.axvline(
self.source.vsys.to_value(xunit),
linestyle="dotted",
lw=1.5,
color="black",
)
ax.set_ylabel(r"Flux density $[\mathrm{Jy}]$")
if channels == "velocity":
ax.set_xlabel(r"Velocity $[\mathrm{km}\,\mathrm{s}^{-1}]$")
elif channels == "frequency":
ax.set_xlabel(r"Frequency $[\mathrm{GHz}]$")
if save is not None:
plt.savefig(save)
return figure