Source code for martini.sources.sph_source

"""
Provides the generic :class:`~martini.sources.sph_source.SPHSource` class.

Enables working with any SPH or similar simulation as input.
"""

import numpy as np
from scipy.spatial.transform import Rotation
from typing import TYPE_CHECKING
from astropy.coordinates import (
    CartesianRepresentation,
    CartesianDifferential,
    SphericalRepresentation,
    SkyCoord,
    SpectralCoord,
    ICRS,
)
import astropy.units as U
from ._L_align import L_align
from ._extra_cartesian_transforms import (
    translate,
    translate_d,
    affine_transform,
    affine_transform_d,
)
from ..datacube import HIfreq, DataCube
from ..L_coords import L_coords

if TYPE_CHECKING:
    from matplotlib.figure import Figure
    from astropy.coordinates.builtin_frames.baseradec import BaseRADecFrame

# Extend CartesianRepresentation to allow more transformations
setattr(CartesianRepresentation, "translate", translate)
setattr(CartesianRepresentation, "affine_transform", affine_transform)

# Extend CartesianDifferential to allow more transformations
setattr(CartesianDifferential, "translate", translate_d)
setattr(CartesianDifferential, "affine_transform", affine_transform_d)

_origin = CartesianRepresentation(
    np.zeros((3, 1)) * U.kpc,
    differentials={"s": CartesianDifferential(np.zeros((3, 1)) * U.km / U.s)},
)

# Affine transform matrices have mixed dimensionful and dimensionless elements so need to
# be processed as raw arrays. We define conventional units for length and velocity type
# affine transforms to ensure internal consistency.
_COORDINATE_TRANSFORM_UNITS = U.Mpc
_VELOCITY_TRANSFORM_UNITS = U.km / U.s


[docs] class SPHSource(object): r""" Class abstracting HI emission sources consisting of SPH simulation particles. This class constructs an HI emission source from arrays of SPH particle properties: mass, smoothing length, temperature, position, and velocity. Parameters ---------- distance : ~astropy.units.Quantity :class:`~astropy.units.Quantity`, with dimensions of length. Source distance, also used to set the velocity offset via Hubble's law. vpeculiar : ~astropy.units.Quantity, optional :class:`~astropy.units.Quantity`, with dimensions of velocity. Source peculiar velocity along the direction to the source centre. rotation : ~scipy.spatial.transform.Rotation, optional A rotation to apply to the source particles, specified using the :class:`~scipy.spatial.transform.Rotation` class. That class supports many ways to specify a rotation (Euler angle, rotation matrices, quaternions, etc.). Refer to the :mod:`scipy` documentation for details. Note that the ``y-z`` plane will be the one eventually placed in the plane of the "sky". Cannot be used at the same time as ``L_coords``. L_coords : ~martini.L_coords.L_coords, optional A named tuple specifying 3 angles. Import it as ``from martini import L_coords``. The angles are used to orient the galaxy relative to its angular momentum vector, "L". The routine will first identify a preferred plane based on the angular momenta of the central 1/3 of HI gas. This plane will then be rotated to lie in the plane of the "sky" (``y-z`` plane), rotated by an angle ``az_rot`` around the angular momentum vector (rotation around ``x``), then inclined by ``incl`` towards or away from the line of sight (rotation around ``y``) and finally rotated on the sky to set the position angle ``pa`` (second rotation around ``x``). All rotations are extrinsic. The position angle refers to the receding side of the galaxy measured East of North. The angles should be specified using syntax like: ``L_coords=L_coords(incl=0 * U.deg, pa=270 * U.deg, az_rot=0 * U.deg)``. These example values are the defaults. Cannot be used at the same time as ``rotation``. ra : ~astropy.units.Quantity, optional :class:`~astropy.units.Quantity`, with dimensions of angle. Right ascension for the source centroid. dec : ~astropy.units.Quantity, optional :class:`~astropy.units.Quantity`, with dimensions of angle. Declination for the source centroid. h : float, optional Dimensionless hubble constant, :math:`H_0 = h (100\\,\\mathrm{km}\\,\\mathrm{s}^{-1}\\,\\mathrm{Mpc}^{-1})`. T_g : ~astropy.units.Quantity, optional :class:`~astropy.units.Quantity`, with dimensions of temperature. Particle temperature. mHI_g : ~astropy.units.Quantity :class:`~astropy.units.Quantity`, with dimensions of mass. Particle HI mass. xyz_g : ~astropy.units.Quantity :class:`~astropy.units.Quantity` , with dimensions of length. Particle position offset from source centroid. Note that the 'y-z' plane is that eventually placed in the plane of the "sky"; 'x' is the axis corresponding to the "line of sight". vxyz_g : ~astropy.units.Quantity :class:`~astropy.units.Quantity`, with dimensions of velocity. Particle velocity offset from source centroid. Note that the 'y-z' plane is that eventually placed in the plane of the "sky"; 'x' is the axis corresponding to the "line of sight". hsm_g : ~astropy.units.Quantity, optional :class:`~astropy.units.Quantity`, with dimensions of length. Particle SPH smoothing lengths, defined as the FWHM of the smoothing kernel. Smoothing lengths are variously defined in the literature as the radius where the kernel amplitude reaches 0, or some rational fraction of this radius (and other definitions may well exist). The FWHM requested here is not a standard choice (with the exception of SWIFT snapshots!), but has the advantage of avoiding ambiguity in the definition. coordinate_axis : int, optional Rank of axis corresponding to position or velocity of a single particle. I.e. ``coordinate_axis=0`` if shape is (3, N), or ``1`` if (N, 3). Usually prefer to omit this as it can be determined automatically, but is ambiguous for sources with exactly 3 particles. coordinate_frame : ~astropy.coordinates.builtin_frames.baseradec.BaseRADecFrame Optional. The coordinate frame assumed in converting particle coordinates to RA and Dec, and for transforming coordinates and velocities to the data cube frame. The frame needs to have a well-defined velocity as well as spatial origin. Recommended frames are :class:`~astropy.coordinates.GCRS`, :class:`~astropy.coordinates.ICRS`, :class:`~astropy.coordinates.HCRS`, :class:`~astropy.coordinates.LSRK`, :class:`~astropy.coordinates.LSRD` or :class:`~astropy.coordinates.LSR`. The frame should be passed initialized, e.g. ``ICRS()`` (not just ``ICRS``). """ h: float T_g: U.Quantity[U.K] | None mHI_g: U.Quantity[U.Msun] coordinates_g: U.Quantity[U.kpc] hsm_g: U.Quantity[U.kpc] | None npart: int ra: U.Quantity[U.deg] dec: U.Quantity[U.deg] distance: U.Quantity[U.Mpc] vpeculiar: U.Quantity[U.km / U.s] _coordinate_affine_transform: np.ndarray _velocity_affine_transform: np.ndarray vhubble: U.Quantity[U.km / U.s] vsys: U.Quantity[U.km / U.s] sky_coordinates: ICRS coordinate_frame: "BaseRADecFrame" pixcoords: U.Quantity[U.pix] input_mass: U.Quantity[U.Msun] skycoords: SkyCoord | None spectralcoords: SpectralCoord | None def __init__( self, *, distance: U.Quantity[U.Mpc], vpeculiar: U.Quantity[U.km / U.s] = 0.0 * U.km / U.s, rotation: Rotation | None = None, L_coords: L_coords | None = None, ra: U.Quantity[U.deg] = 0.0 * U.deg, dec: U.Quantity[U.deg] = 0.0 * U.deg, h: float = 0.7, T_g: U.Quantity[U.K] | None = None, mHI_g: U.Quantity[U.Msun], xyz_g: U.Quantity[U.kpc], vxyz_g: U.Quantity[U.km / U.s], hsm_g: U.Quantity[U.kpc] | None = None, coordinate_axis: int | None = None, coordinate_frame: "BaseRADecFrame" = ICRS(), ) -> None: if isinstance(rotation, dict): raise ValueError( "The method to specify rotations in martini has been updated. Replace:\n" "1) rotation={'rotmat': <rotation_matrix>}\n" " with:\n" " from scipy.spatial.transform import Rotation\n" " rotation=Rotation.from_matrix(<rotation_matrix>)\n" "2) rotation={'axis_angle': (<axis>, <angle>)}\n" " with:\n" " rotation=Rotation.from_euler(<axis>, <angle>.to_value(U.rad))\n" "3) rotation={'L_coords': (<incl>, <az_rot>[, <pa>])}\n" " with:\n" " from martini import L_coords\n" " L_coords=L_coords(incl=<incl>, az_rot=<az_rot>[, pa=<pa>])\n" "Refer to https://martini.readthedocs.io/en/stable/sources/index.html" "#rotation-and-translation and " "https://martini.readthedocs.io/en/stable/modules/" "martini.sources.sph_source.html#martini.sources.sph_source.SPHSource " "for further details." ) if coordinate_axis is None: if (xyz_g.shape[0] == 3) and (xyz_g.shape[1] != 3): coordinate_axis = 0 elif (xyz_g.shape[0] != 3) and (xyz_g.shape[1] == 3): coordinate_axis = 1 elif xyz_g.shape == (3, 3): raise RuntimeError( "martini.sources.SPHSource: cannot guess " "coordinate_axis with shape (3, 3), provide" " explicitly." ) else: raise RuntimeError( "martini.sources.SPHSource: incorrect " "coordinate shape (not (3, N) or (N, 3))." ) else: coordinate_axis = coordinate_axis if xyz_g.shape != vxyz_g.shape: raise ValueError( "martini.sources.SPHSource: xyz_g and vxyz_g must have matching shapes." ) if coordinate_axis == 0: xyz_g = xyz_g.T vxyz_g = vxyz_g.T self.coordinate_frame = coordinate_frame self.h = h self.T_g = T_g self.mHI_g = mHI_g self.input_mass = mHI_g.sum() self.coordinates_g = CartesianRepresentation( xyz_g, xyz_axis=1, differentials={"s": CartesianDifferential(vxyz_g, xyz_axis=1)}, ) self.hsm_g = hsm_g self.npart = self.coordinates_g.size self.ra = ra self.dec = dec self.distance = distance self.vpeculiar = vpeculiar self.vhubble = (self.h * 100.0 * U.km * U.s**-1 * U.Mpc**-1) * self.distance self.vsys = self.vhubble + self.vpeculiar self._coordinate_affine_transform = np.eye(4) self._velocity_affine_transform = np.eye(4) self.rotate(rotation=rotation, L_coords=L_coords) # complains if both not None self.skycoords = None self.spectralcoords = None self.pixcoords = None return def _init_skycoords(self, _reset: bool = True) -> None: """ Initialize the sky coordinates of the particles. Parameters ---------- _reset : bool If ``True``, return particles to their original positions. Setting to ``False`` is only intended for testing. """ # _reset False only for unit testing distance_unit_vector = ( SphericalRepresentation(self.ra, self.dec, 1) .represent_as(CartesianRepresentation) .xyz ) distance_vector = distance_unit_vector * self.distance vpeculiar_vector = distance_unit_vector * self.vpeculiar self.rotate(Rotation.from_euler("y", -self.dec.to_value(U.rad))) self.rotate(Rotation.from_euler("z", self.ra.to_value(U.rad))) self.translate(distance_vector) # must be after translate: # \vec{v_H} = (100 h km/s/Mpc * D) * r^, but D * r^ is just \vec{r}: vhubble_vectors = ( self.h * 100.0 * U.km * U.s**-1 * U.Mpc**-1 ) * self.coordinates_g.xyz self.boost(vpeculiar_vector) # can't use boost for particle-by-particle velocities: self.coordinates_g.differentials["s"] = CartesianDifferential( self.coordinates_g.differentials["s"].d_xyz + vhubble_vectors ) self.skycoords = SkyCoord( self.coordinates_g, frame=self.coordinate_frame, copy=True ) origin_skycoord = SkyCoord( x=0 * U.kpc, y=0 * U.kpc, z=0 * U.kpc, v_x=0 * U.km / U.s, v_y=0 * U.km / U.s, v_z=0 * U.km / U.s, representation_type="cartesian", differential_type="cartesian", frame=self.coordinate_frame, ) self.spectralcoords = SpectralCoord( self.skycoords.radial_velocity, doppler_convention="radio", doppler_rest=HIfreq, target=self.skycoords, observer=origin_skycoord, ) if _reset: self.coordinates_g.differentials["s"] = CartesianDifferential( self.coordinates_g.differentials["s"].d_xyz - vhubble_vectors ) self.boost(-vpeculiar_vector) self.translate(-distance_vector) self.rotate(Rotation.from_euler("z", -self.ra.to_value(U.rad))) self.rotate(Rotation.from_euler("y", self.dec.to_value(U.rad))) return def _init_pixcoords(self, datacube: DataCube, origin: int = 0) -> None: """ Initialize pixel coordinates of the particles. Parameters ---------- datacube : ~martini.datacube.DataCube The DataCube (including its WCS) for which to calculate coordinates. origin : int Index of the first pixel in the WCS (FITS-style is 1, python-style is 0). """ assert self.skycoords is not None, ( "Initialize source.skycoords before calling _init_pixcoords." ) assert self.spectralcoords is not None, ( "Initialize source.spectralcoords before calling _init_pixcoords." ) skycoords_df_frame = self.skycoords.transform_to(datacube.coordinate_frame) spectralcoords_df_specsys = ( self.spectralcoords.with_observer_stationary_relative_to(datacube.specsys) ) # pixels indexed from 0 (not like in FITS!) for better use with numpy self.pixcoords = ( np.vstack( datacube.wcs.sub(3).wcs_world2pix( skycoords_df_frame.ra.to(datacube.units[0]), skycoords_df_frame.dec.to(datacube.units[1]), spectralcoords_df_specsys.to(datacube.units[2]), origin, ) ) * U.pix ) return
[docs] def apply_mask(self, mask: np.ndarray) -> None: """ Remove particles from source arrays according to a mask. Parameters ---------- mask : ~numpy.ndarray Boolean mask. Remove particles with indices corresponding to ``False`` values from the source arrays. """ if mask.size != self.npart: raise ValueError("Mask must have same length as particle arrays.") mask_sum = np.sum(mask) if mask_sum == 0: raise RuntimeError("No non-zero mHI source particles in target region.") self.npart = mask_sum if self.T_g is not None and not self.T_g.isscalar: self.T_g = self.T_g[mask] if not self.mHI_g.isscalar: self.mHI_g = self.mHI_g[mask] self.coordinates_g = self.coordinates_g[mask] if self.skycoords is not None: self.skycoords = self.skycoords[mask] if self.spectralcoords is not None: self.spectralcoords = self.spectralcoords[mask] if self.pixcoords is not None: self.pixcoords = self.pixcoords[:, mask] if self.hsm_g is not None and not self.hsm_g.isscalar: self.hsm_g = self.hsm_g[mask] return
def _append_to_coordinate_affine_transform( self, affine_transform: np.ndarray ) -> None: """ Add a new transformation to the sequence for the spatial coordinates. The cumulative transformation is stored as a single 4x4 transformation matrix, so we update the current transformation using a dot product. Affine transform matrices contain a mix of dimensionless and dimensionful values so we need to process them as raw numpy arrays. By convention in :mod:`martini` the elements with (implicit) length dimensions are always Mpc. Parameters ---------- affine_transform : :class:`~numpy.ndarray` The affine transform to add to the cumulative coordinate transformation. """ self._coordinate_affine_transform = np.dot( affine_transform, self._coordinate_affine_transform ) return def _append_to_velocity_affine_transform( self, affine_transform: np.ndarray ) -> None: """ Add a new transformation to the sequence for the velocity coordinates. The cumulative transformation is stored as a single 4x4 transformation matrix, so we update the current transformation using a dot product. Affine transform matrices contain a mix of dimensionless and dimensionful values so we need to process them as raw numpy arrays. By convention in :mod:`martini` the elements with (implicit) velocity dimensions are always km/s. Parameters ---------- affine_transform : :class:`~numpy.ndarray` The affine transform to add to the cumulative velocity transformation. """ self._velocity_affine_transform = np.dot( affine_transform, self._velocity_affine_transform, ) return
[docs] def rotate( self, rotation: Rotation | None = None, *, L_coords: L_coords | None = None, ) -> Rotation: """ Rotate the source. The arguments correspond to different rotation types. Multiple types cannot be given in a single function call. Parameters ---------- rotation : ~scipy.spatial.transform.Rotation, optional A :class:`~scipy.spatial.transform.Rotation` specifying the rotation. This type of object can be initialized from many ways of specifying rotations: rotation matrices, Euler angles, quaternions, etc. Refer to the :mod:`scipy` documentation for details. L_coords : ~martini.L_coords.L_coords, optional First element containing an inclination, second element an azimuthal angle (both :class:`~astropy.units.Quantity` instances with dimensions of angle). The routine will first attempt to identify a preferred plane based on the angular momenta of the central 1/3 of particles in the source. This plane will then be rotated to lie in the 'y-z' plane, followed by a rotation by the azimuthal angle about its angular momentum pole (rotation about 'x'), and then inclined (rotation about 'y'). By default the position angle on the sky is 270 degrees, but if a third element is provided it sets the position angle (second rotation about 'x'). Returns ------- ~scipy.spatial.transform.Rotation A :class:`~scipy.spatial.transform.Rotation` specifying the actual rotation done in this function call. """ args_given = (rotation is not None, L_coords is not None) if np.sum(args_given) == 0: # no-op return Rotation.identity() elif np.sum(args_given) > 1: raise ValueError("Multiple rotations in a single call not allowed.") if rotation is not None: do_rot = rotation.as_matrix() if L_coords is not None: do_rot = np.eye(3) do_rot = L_align( self.coordinates_g.get_xyz(), self.coordinates_g.differentials["s"].get_d_xyz(), self.mHI_g, frac=0.3, Laxis="x", ).dot(do_rot) do_rot = ( Rotation.from_euler("x", L_coords.az_rot.to_value(U.rad)) .as_matrix() .dot(do_rot) ) do_rot = ( Rotation.from_euler("y", L_coords.incl.to_value(U.rad)) .as_matrix() .dot(do_rot) ) do_rot = ( Rotation.from_euler( "x", ( L_coords.pa - 90 * U.deg if L_coords.incl >= 0 else L_coords.pa - 270 * U.deg ).to_value(U.rad), ) .as_matrix() .dot(do_rot) ) affine_transform = np.eye(4) affine_transform[:3, :3] = do_rot self._append_to_coordinate_affine_transform(affine_transform) self._append_to_velocity_affine_transform(affine_transform) self.coordinates_g = self.coordinates_g.transform(do_rot) return Rotation.from_matrix(do_rot)
[docs] def translate(self, translation_vector: U.Quantity[U.kpc]) -> None: """ Translate the source. Note that the "line of sight" is along the 'x' axis. Parameters ---------- translation_vector : ~astropy.units.Quantity :class:`~astropy.units.Quantity` with shape (3, ), with dimensions of length. Vector by which to offset the source particle coordinates. """ self.coordinates_g = self.coordinates_g.translate(translation_vector) affine_transform = np.eye(4) affine_transform[:3, 3] = translation_vector.squeeze().to_value( _COORDINATE_TRANSFORM_UNITS ) self._append_to_coordinate_affine_transform(affine_transform) return
[docs] def boost(self, boost_vector: U.Quantity[U.km / U.s]) -> None: """ Apply an offset to the source velocity. Note that the "line of sight" is along the 'x' axis. Parameters ---------- boost_vector : ~astropy.units.Quantity :class:`~astropy.units.Quantity` with shape (3, ), with dimensions of velocity. Vector by which to offset the source particle velocities. """ self.coordinates_g.differentials["s"] = self.coordinates_g.differentials[ "s" ].translate(boost_vector) affine_transform = np.eye(4) affine_transform[:3, 3] = boost_vector.squeeze().to_value( _VELOCITY_TRANSFORM_UNITS ) self._append_to_velocity_affine_transform(affine_transform) return
@property def current_rotation(self) -> np.ndarray: """ Current rotation matrix of the source. Returns ------- ~numpy.ndarray The rotation matrix taking the coordinate originally passed in to the source to the current orientation. See Also -------- ~martini.sources.sph_source.SPHSource.save_current_rotation ~martini.sources.sph_source.SPHSource.save_current_affine_transformations ~martini.sources.sph_source.SPHSource.load_current_affine_transformations """ # The rotation part of the _coordinate_affine_transform and the # _velocity_affine_transform are identical, just pick one. return self._coordinate_affine_transform[:3, :3]
[docs] def save_current_rotation(self, fname: str) -> None: """ Output current rotation matrix to file. The rotation matrix can be applied to astropy coordinates (e.g. a :class:`~astropy.coordinates.representation.cartesian.CartesianRepresentation`) as ``coordinates.transform(np.loadtxt(fname))``. If you want to save the full coordinate transformation state of a :mod:`martini` source (and optionally re-load it later), see :meth:`~martini.source.sph_source.SPHSource.save_current_affine_transformations` and :meth:`~martini.source.sph_source.SPHSource.load_affine_transformations`. Parameters ---------- fname : str File in which to save rotation matrix (as a text file). See Also -------- ~martini.sources.sph_source.SPHSource.save_current_affine_transformations ~martini.sources.sph_source.SPHSource.load_affine_transformations """ np.savetxt(fname, self._coordinate_affine_transform[:3, :3]) return
[docs] def save_current_affine_transformations(self, fname: str) -> None: r""" Output current affine transformation matrices (position and velocity) to a file. These matrices encode the arbitrary combination of translations and rotations that takes the source from its initial orientation when loaded into :mod:`martini` to its current orientation. The rotation part is dimensionless, while the translation part assumes dimensions of Mpc for positions or km/s for velocities (but is encoded without units). An affine transformation matrix looks like: .. math:: A = \left[ {\begin{array}{cccc} R_{00} & R_{01} & R_{02} & t_{0} \\ R_{10} & R_{11} & R_{12} & t_{1} \\ R_{20} & R_{21} & R_{22} & t_{2} \\ 0 & 0 & 0 & 1 \\ \end{array} } \right] Where :math:`R` is a rotation matrix and :math:`t` is a translation vector (either in position or velocity). The affine transformation applied to a vector :math:`x` is equivalent to :math:`Rx+t`. The saved file contains two affine transformation matrices (in order: one for position, one for velocity) as an array with shape ``(2, 4, 4)``. The rotational part of the two matrices is identical. Parameters ---------- fname : str File in which to save affine transformation matrices (in ``*.npy`` format). See Also -------- ~martini.sources.sph_source.SPHSource.load_affine_transformations """ np.save( fname, np.array( [self._coordinate_affine_transform, self._velocity_affine_transform] ), ) return
[docs] def load_affine_transformations(self, fname: str) -> None: r""" Load a set of affine transformation matrices (position and velocity) from a file. These matrices encode an arbitrary combination of translations and rotations. When loaded they are applied to the source to transform it from its current orientation. The state is only the one that the source had when the files were saved if it has not been transformed since being loaded into :mod:`martini`. The rotation part of these matrices is dimensionless, while the translation part assumes dimensions of Mpc for positions or km/s for velocities (but is encoded without units). An affine transformation matrix looks like: .. math:: A = \left[ {\begin{array}{cccc} R_{00} & R_{01} & R_{02} & t_{0} \\ R_{10} & R_{11} & R_{12} & t_{1} \\ R_{20} & R_{21} & R_{22} & t_{2} \\ 0 & 0 & 0 & 1 \\ \end{array} } \right] Where :math:`R` is a rotation matrix and :math:`t` is a translation vector (either in position or velocity). The affine transformation applied to a vector :math:`x` is equivalent to :math:`Rx+t`. The loaded file should contain two affine transformation matrices (in order: one for position, one for velocity) as an array with shape ``(2, 4, 4)``. The rotational part of the two matrices should be identical. The translation parts should assume implicit units of Mpc and km/s. This is the format saved by :meth:`~martini.sources.sph_source.SPHSource.save_current_affine_transformations`. Paramters --------- fname : str File from which to load affine transformation matrices (in ``*.npy`` format). Raises ------ ValueError If the rotation part of the two matrices is not identical: this would result in an inconsistent source state. See Also -------- ~martini.sources.sph_source.SPHSource.save_current_affine_transformations """ coordinate_affine_transform, velocity_affine_transform = np.load(fname) if not np.allclose( coordinate_affine_transform[:3, :3], velocity_affine_transform[:3, :3] ): raise ValueError( "Rotation portion of position and velocity affine transforms do not" " match, source would be put into an inconsistent state." ) self.coordinates_g = self.coordinates_g.affine_transform( coordinate_affine_transform, _COORDINATE_TRANSFORM_UNITS ) self.coordinates_g.differentials["s"] = self.coordinates_g.differentials[ "s" ].affine_transform(velocity_affine_transform, _VELOCITY_TRANSFORM_UNITS) self._append_to_coordinate_affine_transform(coordinate_affine_transform) self._append_to_velocity_affine_transform(velocity_affine_transform)
[docs] def preview( self, max_points: int = 5000, fig: "int | Figure" = 1, lim: U.Quantity[U.deg] | None = None, vlim: U.Quantity[U.km / U.s] | None = None, point_scaling: str = "auto", title: str = "", save: str | None = None, ) -> "Figure": """ Produce a figure showing the source particle coordinates and velocities. Makes a 3-panel figure showing the projection of the source as it will appear in the mock observation. The first panel shows the particles in the y-z plane, coloured by the x-component of velocity (MARTINI projects the source along the x-axis). The second and third panels are position-velocity diagrams showing the x-component of velocity against the y and z coordinates, respectively. Parameters ---------- max_points : int, optional Maximum number of points to draw per panel, the particles will be randomly subsampled if the source has more. fig : int or ~matplotlib.figure.Figure, optional Number of the figure in matplotlib, it will be created as ``plt.figure(fig)``. Or, an existing figure can be provided. lim : ~astropy.units.Quantity, optional :class:`~astropy.units.Quantity`, with dimensions of angle. The coordinate axes extend from ``-lim`` to ``lim``. If unspecified, the maximum angular separation between the source centre defined by its ``ra`` and ``dec`` and the particle sky positions is used. vlim : ~astropy.units.Quantity, optional :class:`~astropy.units.Quantity`, with dimensions of speed. The velocity axes and colour bar extend from ``-vlim`` to ``vlim``. If unspecified, the maximum absolute velocity of particles in the source is used. point_scaling : str, optional By default points are scaled in size and transparency according to their HI mass and the smoothing length (loosely proportional to their surface densities, but with different scaling to achieve a visually useful plot). For some sources the automatic scaling may not give useful results, using ``point_scaling="fixed"`` will plot points of constant size without opacity. title : str, optional A title for the figure can be provided. save : str, optional If provided, the figure is saved using ``plt.savefig(save)``. A ``.png`` or ``.pdf`` suffix is recommended. Returns ------- ~matplotlib.figure.Figure The preview figure. """ import matplotlib.pyplot as plt self._init_skycoords() assert self.skycoords is not None # placate mypy lim = ( np.max( SkyCoord(self.ra, self.dec, frame=self.coordinate_frame) .separation(self.skycoords) .to_value(U.deg) ) if lim is None else lim.to_value(U.deg) ) vlim = ( np.max( np.abs( (self.skycoords.radial_velocity - self.vsys).to_value(U.km / U.s) ) ) if vlim is None else vlim.to_value(U.km / U.s) ) wrapped_ra = self.skycoords.ra.wrap_at(self.ra + 180 * U.deg) cmask = np.logical_and.reduce( ( np.abs((wrapped_ra - self.ra)).to_value(U.deg) < lim, np.abs((self.skycoords.dec - self.dec)).to_value(U.deg) < lim, np.abs((self.skycoords.radial_velocity - self.vsys)).to_value( U.km / U.s ) < vlim, ) ) nparts = cmask.sum() # every Nth particle to plot at most max_points, or all particles: mask = np.arange(self.mHI_g.size)[cmask][:: max(nparts // max_points, 1)] hsm_factor = ( 1 if (self.hsm_g is None or self.hsm_g.isscalar or mask.size <= 1) else (1 - (self.hsm_g[mask] / self.hsm_g[mask].max()) ** 0.1).to_value( U.dimensionless_unscaled ) # larger -> more transparent + (self.hsm_g[mask].min() == self.hsm_g[mask].max()) * 0.5 # guard against getting all 0s ) alpha = hsm_factor if point_scaling == "auto" else 1.0 if self.hsm_g is None: size_scale = 1 else: size_scale = ( self.hsm_g.to_value(U.kpc) / (lim * U.deg * self.distance).to_value( U.kpc, U.dimensionless_angles() ) if (self.hsm_g.isscalar or mask.size <= 1) else ( self.hsm_g[mask].to_value(U.kpc) / (lim * U.deg * self.distance).to_value( U.kpc, U.dimensionless_angles() ) ) ) size = 300 * size_scale if point_scaling == "auto" else 10 if newfig := not isinstance(fig, plt.Figure): figure = plt.figure(fig, figsize=(12, 4)) else: figure = fig figure.suptitle(title) # ----- MOMENT 1 ----- if newfig: sp1 = figure.add_subplot(1, 3, 1, aspect="equal") sp1.set_facecolor("#222222") sp2 = figure.add_subplot(1, 3, 2) sp2.set_facecolor("#222222") sp3 = figure.add_subplot(1, 3, 3) sp3.set_facecolor("#222222") sp1.set_xlabel(r"$\mathrm{RA}\,[\mathrm{deg}]$") sp1.set_ylabel(r"$\mathrm{Dec}\,[\mathrm{deg}]$") sp2.set_xlabel(r"$\mathrm{RA}\,[\mathrm{deg}]$") sp2.set_ylabel(r"$v_r\,[\mathrm{km}\,\mathrm{s}^{-1}]$") sp3.set_xlabel(r"$\mathrm{Dec}\,[\mathrm{deg}]$") sp3.set_ylabel(r"$v_r\,[\mathrm{km}\,\mathrm{s}^{-1}]$") vmin, vmax = -vlim, vlim else: sp1, sp2, sp3, _ = figure.axes cb = sp1.collections[0].colorbar if cb is not None: vmin, vmax = ( min(-vlim, cb.mappable.get_clim()[0]), max(vlim, cb.mappable.get_clim()[1]), ) scatter = sp1.scatter( wrapped_ra[mask].to_value(U.deg), self.skycoords.dec[mask].to_value(U.deg), c=(self.skycoords.radial_velocity[mask] - self.vsys).to_value(U.km / U.s), marker="o", cmap="coolwarm", edgecolor="None", s=size, vmin=vmin, vmax=vmax, alpha=alpha, zorder=0, ) sp1.plot( [self.ra.to_value(U.deg)], [self.dec.to_value(U.deg)], marker="+", ls="None", mec="white", ms=6, zorder=1, ) if newfig: sp1.set_xlim((self.ra.to_value(U.deg) + lim, self.ra.to_value(U.deg) - lim)) sp1.set_ylim( (self.dec.to_value(U.deg) - lim, self.dec.to_value(U.deg) + lim) ) else: sp1.set_xlim( ( max(sp1.get_xlim()[0], self.ra.to_value(U.deg) + lim), min(sp1.get_xlim()[1], self.ra.to_value(U.deg) - lim), ) ) sp1.set_ylim( ( min(sp1.get_ylim()[0], self.dec.to_value(U.deg) - lim), max(sp1.get_ylim()[1], self.dec.to_value(U.deg) + lim), ) ) if newfig: cb = figure.colorbar(mappable=scatter, ax=sp1, orientation="horizontal") cb.set_label( r"$v_\mathrm{r} - v_\mathrm{sys}\,[\mathrm{km}\,\mathrm{s}^{-1}]$" ) else: if cb is not None: cb.mappable.set_clim((vmin, vmax)) # ----- PV Y ----- sp2.scatter( wrapped_ra[mask].to_value(U.deg), self.skycoords.radial_velocity[mask].to_value(U.km / U.s), c="white", edgecolors="None", marker="o", s=size, alpha=alpha, zorder=0, ) sp2.plot( [self.ra.to_value(U.deg)], [self.vsys.to_value(U.km / U.s)], marker="+", ls="None", mec="red", ms=6, zorder=1, ) if newfig: sp2.set_xlim((self.ra.to_value(U.deg) + lim, self.ra.to_value(U.deg) - lim)) sp2.set_ylim( ( self.vsys.to_value(U.km / U.s) - vlim, self.vsys.to_value(U.km / U.s) + vlim, ) ) else: sp2.set_xlim( ( max(sp2.get_xlim()[0], self.ra.to_value(U.deg) + lim), min(sp2.get_xlim()[1], self.ra.to_value(U.deg) - lim), ) ) sp2.set_ylim( ( min(sp2.get_ylim()[0], self.vsys.to_value(U.km / U.s) - vlim), max(sp2.get_ylim()[1], self.vsys.to_value(U.km / U.s) + vlim), ) ) # ----- PV Z ----- sp3.scatter( self.skycoords.dec[mask].to_value(U.deg), self.skycoords.radial_velocity[mask].to_value(U.km / U.s), c="white", edgecolors="None", marker="o", s=size, alpha=alpha, zorder=0, ) sp3.plot( [self.dec.to_value(U.deg)], [self.vsys.to_value(U.km / U.s)], marker="+", ls="None", mec="red", ms=6, zorder=1, ) if newfig: sp3.set_xlim( (self.dec.to_value(U.deg) - lim, self.dec.to_value(U.deg) + lim) ) sp3.set_ylim( ( self.vsys.to_value(U.km / U.s) - vlim, self.vsys.to_value(U.km / U.s) + vlim, ) ) else: sp3.set_xlim( ( min(sp3.get_xlim()[0], self.dec.to_value(U.deg) - lim), max(sp3.get_xlim()[1], self.dec.to_value(U.deg) + lim), ) ) sp3.set_ylim( ( min(sp3.get_ylim()[0], self.vsys.to_value(U.km / U.s) - vlim), max(sp3.get_ylim()[1], self.vsys.to_value(U.km / U.s) + vlim), ) ) figure.subplots_adjust(wspace=0.3) if save is not None: plt.savefig(save) return figure