Source code for martini.datacube

"""Provide the :class:`~martini.datacube.DataCube` class for creating a data cube."""

from typing import TYPE_CHECKING
from collections.abc import Callable, Iterator
import numpy as np
import astropy.units as U
from astropy import wcs
from astropy.wcs.wcs import WCS
from astropy.coordinates import ICRS, SpectralCoord
import warnings
from astropy.coordinates import frame_transform_graph

if TYPE_CHECKING:
    from astropy.coordinates.builtin_frames.baseradec import BaseRADecFrame

try:
    from typing import Self
except ImportError:
    from typing_extensions import Self

HIfreq: U.Quantity[U.Hz] = 1.420405751e9 * U.Hz
_supported_specsys = frame_transform_graph.get_names()


def _validate_specsys(specsys: str) -> str:
    """
    Check if a specsys is one recognized by astropy.

    Parameters
    ----------
    specsys : str
        A string specifying a specsys to be checked against the list supported by astropy.

    Returns
    -------
    str
        The input specsys string.

    Raises
    ------
    ValueError
        If the specsys is not one recognized by :mod:`astropy`.
    """
    if specsys is not None and specsys not in _supported_specsys:
        raise ValueError(f"Supported specsys values are {_supported_specsys}.")
    return specsys


[docs] class DataCube(object): """ Handles creation and management of the data cube itself. Basic usage simply involves initializing with the parameters listed below. More advanced usage might arise if designing custom classes for other sub- modules, especially beams. To initialize a :class:`~martini.datacube.DataCube` from a saved state, see :meth:`~martini.datacube.DataCube.load_state`. Parameters ---------- n_px_x : int Pixel count along the x (RA) axis. Even integers strongly preferred. n_px_y : int Pixel count along the y (Dec) axis. Even integers strongly preferred. n_channels : int Number of channels along the spectral axis. px_size : ~astropy.units.Quantity :class:`~astropy.units.Quantity`, with dimensions of angle. Angular scale of one pixel. channel_width : ~astropy.units.Quantity :class:`~astropy.units.Quantity`, with dimensions of velocity or frequency. Step size along the spectral axis. Can be provided as a velocity or a frequency. spectral_centre : ~astropy.units.Quantity, optional :class:`~astropy.units.Quantity` with dimensions of velocity or frequency. Velocity (or frequency) of the centre along the spectral axis. Defaults to 0 km/s, normally set this to the systemic velocity of the source. ra : ~astropy.units.Quantity, optional :class:`~astropy.units.Quantity`, with dimensions of angle. Right ascension of the cube centroid. Defaults to 0 degrees. dec : ~astropy.units.Quantity, optional :class:`~astropy.units.Quantity` with dimensions of angle. Declination of the cube centroid. Defaults to 0 degrees. stokes_axis : bool, optional Whether the datacube should be initialized with a Stokes' axis. coordinate_frame : ~astropy.coordinates.builtin_frames.baseradec.BaseRADecFrame, \ optional The coordinate frame of the World Coordinate System (WCS) associated with the data cube. Recommended frames are :class:`~astropy.coordinates.GCRS`, :class:`~astropy.coordinates.ICRS`, :class:`~astropy.coordinates.HCRS`, :class:`~astropy.coordinates.LSRK`, :class:`~astropy.coordinates.LSRD` or :class:`~astropy.coordinates.LSR`. The frame should be passed initialized, e.g. ``ICRS()`` (not just ``ICRS``). specsys : str, optional The spectral reference frame (standard of rest) of the World Coordinate System (WCS) associated with the data cube. Some common options include ``"gcrs"``, ``"icrs"``, ``"hcrs"``, ``"lsrk"``, ``"lsrd"``, ``"lsr"``. For a complete list, use :meth:`astropy.coordinates.frame_transform_graph.get_names`. velocity_centre : ~astropy.units.Quantity, deprecated Deprecated, use spectral centre instead. See Also -------- martini.datacube.DataCube.load_state martini.datacube.DataCube.from_wcs Notes ----- The ``channel_width`` argument defines the channel spacing in either frequency or velocity units. If provided with units of frequency, the data cube channels will be evenly spaced in frequency. Conversely, if provided with units of velocity, the channels will be evenly spaced in velocity. The ``spectral_centre`` is related and can also have units of frequency or velocity, but will be converted to have the same units as the ``channel_width`` in order to define the channels. """ px_size: U.Quantity[U.arcsec] arcsec2_to_pix: tuple[ U.Quantity[U.Jy * U.pix**-2], U.Quantity[U.Jy * U.arcsec**-2], Callable[[U.Quantity[U.Jy * U.pix**-2]], U.Quantity[U.Jy * U.arcsec**-2]], Callable[[U.Quantity[U.Jy * U.arcsec**-2]], U.Quantity[U.Jy * U.pix**-2]], ] channel_width: U.Quantity[U.km / U.s] spectral_centre: U.Quantity[U.km / U.s] ra: U.Quantity[U.deg] dec: U.Quantity[U.deg] padx: int pady: int _array: ( U.Quantity[U.Jy * U.pix**-2] | U.Quantity[U.Jy * U.arcsec**-2] | U.Quantity[U.Jy * U.beam**-1] ) _wcs: WCS | None n_px_x: int n_px_y: int n_channels: int stokes_axis: bool coordinate_frame: "BaseRADecFrame" specsys: str _freq_channel_mode: bool _channel_edges: U.Quantity[U.Hz] | U.Quantity[U.m / U.s] | None _channel_mids: U.Quantity[U.Hz] | U.Quantity[U.m / U.s] | None def __init__( self, *, n_px_x: int, n_px_y: int, n_channels: int, px_size: U.Quantity[U.arcsec], channel_width: U.Quantity[U.km * U.s**-1], spectral_centre: U.Quantity[U.km * U.s**-1] = 0.0 * U.km * U.s**-1, ra: U.Quantity[U.deg] = 0.0 * U.deg, dec: U.Quantity[U.deg] = 0.0 * U.deg, stokes_axis: bool = False, coordinate_frame: "BaseRADecFrame" = ICRS(), specsys: str = "icrs", velocity_centre: None = None, # deprecated ) -> None: if velocity_centre is not None: # pragma: no cover warnings.warn( DeprecationWarning( "velocity_centre is deprecated, use spectral_centre instead." ) ) spectral_centre = velocity_centre self.stokes_axis = stokes_axis self.coordinate_frame = coordinate_frame self.specsys = _validate_specsys(specsys) datacube_unit = U.Jy * U.pix**-2 self._array = np.zeros((n_px_x, n_px_y, n_channels)) * datacube_unit if self.stokes_axis: self._array = self._array[..., np.newaxis] self.n_px_x, self.n_px_y, self.n_channels = n_px_x, n_px_y, n_channels self.px_size = px_size self.arcsec2_to_pix = ( U.Jy * U.pix**-2, U.Jy * U.arcsec**-2, lambda x: x / self.px_size.to_value(U.arcsec) ** 2, lambda x: x * self.px_size.to_value(U.arcsec) ** 2, ) if U.get_physical_type(channel_width) == "frequency": self._freq_channel_mode = True elif U.get_physical_type(channel_width) == "velocity": self._freq_channel_mode = False else: raise ValueError("Channel width must have frequency or velocity units.") self.channel_width = np.abs(channel_width) self.spectral_centre = SpectralCoord( spectral_centre, doppler_convention="radio", doppler_rest=HIfreq, ).to(channel_width.unit) self.ra = ra self.dec = dec self.padx = 0 self.pady = 0 self._channel_edges = None self._channel_mids = None self._wcs = None return
[docs] def velocity_channels(self) -> None: """Issue a warning then do nothing (deprecated).""" warnings.warn( DeprecationWarning( "Changing the channel mode is deprecated. You can access channels in" " deisred units with `DataCube.frequency_channel_edges`," " `DataCube.frequency_channel_mids`, `DataCube.velocity_channel_edges`" " and `DataCube.velocity_channel_mids`." ) ) # pragma: no cover
[docs] def freq_channels(self) -> None: """Issue a warning then do nothing (deprecated).""" warnings.warn( DeprecationWarning( "Changing the channel mode is deprecated. You can access channels in" " deisred units with `DataCube.frequency_channel_edges`," " `DataCube.frequency_channel_mids`, `DataCube.velocity_channel_edges`" " and `DataCube.velocity_channel_mids`." ) ) # pragma: no cover
[docs] @classmethod def from_wcs(cls, input_wcs: WCS, specsys: str | None = None) -> Self: """ Create a DataCube from a World Coordinate System (WCS). The WCS can be obtained, for instance, from a FITS header. To create a MARTINI data cube with pixels and channels exactly matching an observed data cube (stored in a FITS file) would be a bit tedious using the usual :meth:`~martini.datacube.DataCube.__init__` since the number of pixels, channel spacing and so on must be specified by hand. This function instead constructs a :class:`~martini.datacube.DataCube` from a :class:`~astropy.wcs.WCS` instance, that can be easily obtained from a FITS file or header (see example below). The resulting data cube has exactly the same dimensions (pixels and channels) and World Coordinate System (WCS) as the input WCS. Parameters ---------- input_wcs : ~astropy.wcs.WCS The :class:`~astropy.wcs.WCS` instance to use as the basis for the :class:`~martini.datacube.DataCube`. specsys : str, optional The spectral reference frame (standard of rest) of the World Coordinate System (WCS) associated with the data cube, selected from the list ``"gcrs"``, ``"icrs"``, ``"hcrs"``, ``"lsrk"``, ``"lsrd"``, ``"lsr"``. See Also -------- martini.datacube.DataCube Notes ----- MARTINI's data cubes have a fixed axis ordering: first the RA axis, then the Dec axis, then the spectral axis, and finally the Stokes' axis (if present). A :class:`~martini.datacube.DataCube` created with :meth:`~martini.datacube.DataCube.from_wcs` may therefore have its axes re-ordered (tranposed) relative to the ``input_wcs``. The FITS files output by MARTINI have the same axis ordering, and may therefore also be transposed relative to a data cube used to construct the :class:`~astropy.wcs.WCS` for the ``input_wcs`` argument. Examples -------- It is easy to initialize a :class:`~astropy.wcs.WCS` from a FITS-format header. For example, given a FITS file ``my_cube.fits``, setting up a :class:`~martini.datacube.DataCube` with matching World Coordinate System (WCS) looks like:: from astropy import wcs from astropy.io import fits from martini.datacube import DataCube with fits.open("my_cube.fits") as fitsfile: fits_hdr = fitsfile[0].header # header of the main HDU fits_wcs = wcs.WCS(fits_hdr) datacube = DataCube.from_wcs(fits_wcs) """ init_args: dict = { "n_px_x": None, "n_px_y": None, "n_channels": None, "px_size": None, "channel_width": None, "spectral_centre": None, "ra": None, "dec": None, "stokes_axis": None, "coordinate_frame": None, "specsys": None, } for axis_type in input_wcs.get_axis_types(): if axis_type["coordinate_type"] == "stokes": init_args["stokes_axis"] = True if init_args["stokes_axis"] is None: init_args["stokes_axis"] = False centre_coords = input_wcs.all_pix2world( [[n_px // 2 + (1 + n_px % 2) / 2 for n_px in input_wcs.pixel_shape]], 1, # origin, i.e. index pixels from 1 ).squeeze() ax_ra, ax_dec, ax_spec = ( input_wcs.wcs.lng, input_wcs.wcs.lat, input_wcs.wcs.spec, ) unit_ra = U.Unit(input_wcs.world_axis_units[ax_ra], format="fits") unit_dec = U.Unit(input_wcs.world_axis_units[ax_dec], format="fits") unit_spec = U.Unit(input_wcs.world_axis_units[ax_spec], format="fits") ra_px_size = np.abs(input_wcs.wcs.cdelt[ax_ra]) * unit_ra init_args["n_px_x"] = input_wcs.pixel_shape[ax_ra] init_args["ra"] = centre_coords[ax_ra] * unit_ra dec_px_size = input_wcs.wcs.cdelt[ax_dec] * unit_dec init_args["n_px_y"] = input_wcs.pixel_shape[ax_dec] init_args["dec"] = centre_coords[ax_dec] * unit_dec init_args["channel_width"] = np.abs(input_wcs.wcs.cdelt[ax_spec]) * unit_spec init_args["n_channels"] = input_wcs.pixel_shape[ax_spec] init_args["spectral_centre"] = centre_coords[ax_spec] * unit_spec init_args["coordinate_frame"] = wcs.utils.wcs_to_celestial_frame(input_wcs) if specsys is not None: _validate_specsys(specsys) init_args["specsys"] = specsys input_wcs.wcs.specsys = specsys elif input_wcs.wcs.specsys == "": warnings.warn( UserWarning( "Input WCS did not specify 'SPECSYS' (see `specsys` argument to " "`from_wcs` for a work-around)." ) ) else: if input_wcs.wcs.specsys in _supported_specsys: init_args["specsys"] = input_wcs.wcs.specsys elif input_wcs.wcs.specsys.lower() in _supported_specsys: init_args["specsys"] = input_wcs.wcs.specsys.lower() elif input_wcs.wcs.specsys == "BARYCENT": warnings.warn( UserWarning( "Input WCS specified 'SPECSYS' of 'BARYCENT'. Assuming ICRS" " barycentric reference system." ) ) init_args["specsys"] = "icrs" else: raise ValueError( f"Input WCS specified 'SPECSYS' of '{input_wcs.wcs.specsys}' not" " yet supported by MARTINI. Please report using a github issue or" " email kyle.a.oman@durham.ac.uk." ) if ra_px_size != dec_px_size: raise ValueError( "Martini requires square pixels but input wcs has non-square pixels" " (|CDELT| for RA and Dec axes do not match)." ) else: init_args["px_size"] = ra_px_size # == dec_px_size datacube = cls(**init_args) # order celestial, then spectral, then other (stokes): datacube_wcs = input_wcs.reorient_celestial_first() if datacube_wcs.wcs.lat == 0: # RA & Dec swapped datacube_wcs = datacube_wcs.swapaxes(0, 1) if input_wcs.wcs.restfrq == 0.0: warnings.warn( UserWarning( "Input WCS did not specify RESTFRQ, " f"assuming {HIfreq.to_value(U.Hz)} (Hz)." ) ) datacube_wcs.wcs.restfrq = HIfreq.to_value(U.Hz) datacube._wcs = datacube_wcs datacube._freq_channel_mode = ( U.get_physical_type( U.Unit( datacube_wcs.world_axis_units[datacube_wcs.wcs.spec], format="fits" ) ) == "frequency" ) return datacube
@property def units( self, ) -> ( tuple[ U.Quantity[U.deg], U.Quantity[U.deg], U.Quantity[U.Hz] | U.Quantity[U.m / U.s], ] | tuple[ U.Quantity[U.deg], U.Quantity[U.deg], U.Quantity[U.Hz] | U.Quantity[U.m / U.s], U.Quantity[U.dimensionless_unscaled], ] ): """ The units of the DataCube's World Coordinate System (WCS). Returns ------- tuple A tuple containing :class:`~astropy.units.Unit` instances describing the WCS units. """ return tuple(U.Unit(unit, format="fits") for unit in self.wcs.wcs.cunit) @property def wcs(self) -> WCS: """ The DataCube's World Coordinate System (WCS). Returns ------- ~astropy.wcs.WCS The :class:`~astropy.wcs.WCS` instance that describes the :class:`~martini.datacube.DataCube`'s World Coordinate System (WCS). """ if self._wcs is None: hdr = wcs.utils.celestial_frame_to_wcs(self.coordinate_frame).to_header() hdr.update({"WCSAXES": 3}) # add spectral axis hdr.update( { "NAXIS1": self.n_px_x, "NAXIS2": self.n_px_y, "NAXIS3": self.n_channels, } ) hdr.update({"RESTFRQ": HIfreq.to_value(U.Hz)}) self._wcs = wcs.WCS(hdr) self._wcs.wcs.ctype = [ self._wcs.wcs.ctype[0], self._wcs.wcs.ctype[1], "FREQ" if self._freq_channel_mode else "VRAD", ] self._wcs.wcs.specsys = self.specsys self._wcs.wcs.cunit = [ self._wcs.wcs.cunit[0], self._wcs.wcs.cunit[1], ( U.Hz.to_string(format="fits") if self._freq_channel_mode else (U.m / U.s).to_string(format="fits") ), ] self._wcs.wcs.crpix = [ self.n_px_x / 2.0 + 0.5 + self.padx, self.n_px_y / 2.0 + 0.5 + self.pady, self.n_channels / 2.0 + 0.5, ] spec_step_sign = 1 if self._freq_channel_mode else -1 self._wcs.wcs.cdelt = [ -self.px_size.to_value(self._wcs.wcs.cunit[0]), self.px_size.to_value(self._wcs.wcs.cunit[1]), spec_step_sign * np.abs(self.channel_width.to_value(self._wcs.wcs.cunit[2])), ] self._wcs.wcs.crval = [ self.ra.to_value(self._wcs.wcs.cunit[0]), self.dec.to_value(self._wcs.wcs.cunit[1]), self.spectral_centre.to_value(self._wcs.wcs.cunit[2]), ] if self.stokes_axis: self._wcs = wcs.utils.add_stokes_axis_to_wcs( self._wcs, self._wcs.wcs.naxis ) self._wcs.pixel_shape = (self.n_px_x, self.n_px_y, self.n_channels, 1) return self._wcs @property def channel_mids(self) -> U.Quantity[U.Hz] | U.Quantity[U.m / U.s]: """ The centres of the channels from the coordinate system. Returns ------- ~astropy.units.Quantity :class:`~astropy.units.Quantity` with dimensions of frequency or velocity containing the channel centres. """ if self._channel_mids is None: self._channel_mids = SpectralCoord( ( self.wcs.sub(("spectral",)).all_pix2world( np.arange(self.n_channels), 0, ) * self.units[2] ).squeeze(), doppler_convention="radio", doppler_rest=HIfreq, ) return self._channel_mids @property def channel_edges(self) -> U.Quantity[U.Hz] | U.Quantity[U.m / U.s]: """ The edges of the channels from the coordinate system. Returns ------- ~astropy.units.Quantity :class:`~astropy.units.Quantity` with dimensions of frequency or velocity containing the channel edges. """ if self._channel_edges is None: self._channel_edges = SpectralCoord( ( self.wcs.sub(("spectral",)).all_pix2world( np.arange(self.n_channels + 1) - 0.5, 0, ) * self.units[2] ).squeeze(), doppler_convention="radio", doppler_rest=HIfreq, ) return self._channel_edges @property def velocity_channel_mids(self) -> U.Quantity[U.m / U.s]: """ The centres of the channels from the coordinate system in velocity units. Returns ------- ~astropy.units.Quantity :class:`~astropy.units.Quantity` with dimensions of velocity containing the channel centres. """ return self.channel_mids.to(U.m / U.s) @property def velocity_channel_edges(self) -> U.Quantity[U.m / U.s]: """ The edges of the channels from the coordinate system in velocity units. Returns ------- ~astropy.units.Quantity :class:`~astropy.units.Quantity` with dimensions of velocity containing the channel edges. """ return self.channel_edges.to(U.m / U.s) @property def frequency_channel_mids(self) -> U.Quantity[U.Hz]: """ The centres of the channels from the coordinate system in frequency units. Returns ------- ~astropy.units.Quantity :class:`~astropy.units.Quantity` with dimensions of frequency containing the channel centres. """ return self.channel_mids.to(U.Hz) @property def frequency_channel_edges(self) -> U.Quantity[U.Hz]: """ The edges of the channels from the coordinate system in frequency units. Returns ------- ~astropy.units.Quantity :class:`~astropy.units.Quantity` with dimensions of frequency containing the channel edges. """ return self.channel_edges.to(U.Hz) @property def _stokes_index(self) -> int | None: """ The position of the Stokes' axis in the WCS axis order. Unlike the RA (``wcs.WCS().wcs.lng``), Dec (``wcs.WCS().wcs.lat``) and spectral axis (``wcs.WCS().wcs.spec``), the Stokes' axis isn't exposed in a convenient way, so we implement a helper. Returns ------- int or None Index of the Stokes' axis (or ``None`` if it is not present). """ for index, axis_type in enumerate(self.wcs.get_axis_types()): if axis_type == "stokes": return index return None # not found @property def channel_maps(self) -> Iterator[U.Quantity]: """ An iterator over the channel maps. An alias for :meth:`~martini.datacube.DataCube.spatial_slices`. Returns ------- iter The iterator over the spatial 'slices' of the cube. """ return self.spatial_slices @property def spatial_slices(self) -> Iterator[U.Quantity]: """ An iterator over the spatial 'slices' of the cube. Returns ------- iter The iterator over the spatial 'slices' of the cube. """ if self.stokes_axis: return iter( self._array.squeeze(self._stokes_index).transpose( (self.wcs.wcs.spec, self.wcs.wcs.lng, self.wcs.wcs.lat) ) ) else: return iter( self._array.transpose( (self.wcs.wcs.spec, self.wcs.wcs.lng, self.wcs.wcs.lat) ) ) @property def spectra(self) -> Iterator[U.Quantity]: """ An iterator over the spectra (one in each spatial pixel). Returns ------- iter The iterator over the spectra making up the cube. """ if self.stokes_axis: return iter( self._array.squeeze(self._stokes_index) .transpose((self.wcs.wcs.lng, self.wcs.wcs.lat, self.wcs.wcs.spec)) .reshape(self.n_px_x * self.n_px_y, self.n_channels) ) else: return iter( self._array.transpose( (self.wcs.wcs.lng, self.wcs.wcs.lat, self.wcs.wcs.spec) ).reshape(self.n_px_x * self.n_px_y, self.n_channels) )
[docs] def add_pad(self, pad: tuple[int, int]) -> None: """ Resize the cube to add a padding region in the spatial direction. Accurate convolution with a beam requires a cube padded according to the size of the beam kernel (its representation sampled on a grid with the same spacing). The beam class is required to handle defining the size of pad required. Parameters ---------- pad : tuple 2-tuple (or other sequence) containing the number of pixels to add in the x (RA) and y (Dec) directions. See Also -------- martini.datacube.DataCube.drop_pad """ if self.padx > 0 or self.pady > 0: raise RuntimeError("Tried to add padding to already padded datacube array.") tmp = self._array shape: tuple = ( self.n_px_x + pad[0] * 2, self.n_px_y + pad[1] * 2, self.n_channels, ) if self.stokes_axis: shape = shape + (1,) self._array = np.zeros(shape) self._array = self._array * tmp.unit xregion = np.s_[pad[0] : -pad[0]] if pad[0] > 0 else slice(None, None, None) yregion = np.s_[pad[1] : -pad[1]] if pad[1] > 0 else slice(None, None, None) self._array[xregion, yregion, ...] = tmp extend_crpix = [pad[0], pad[1], 0] if self.stokes_axis: extend_crpix.append(0) self.wcs.wcs.crpix = self.wcs.wcs.crpix + np.array(extend_crpix) self.padx, self.pady = pad return
[docs] def drop_pad(self) -> None: """ Remove the padding added using :meth:`~martini.datacube.DataCube.add_pad`. After convolution, the pad region contains meaningless information and can be discarded. See Also -------- martini.datacube.DataCube.add_pad """ if (self.padx == 0) and (self.pady == 0): return self._array = self._array[self.padx : -self.padx, self.pady : -self.pady, ...] retract_crpix = [self.padx, self.pady, 0] if self.stokes_axis: retract_crpix.append(0) self.wcs.wcs.crpix = self.wcs.wcs.crpix - np.array(retract_crpix) self.padx, self.pady = 0, 0 return
[docs] def copy(self) -> Self: """ Produce a copy of the :class:`~martini.datacube.DataCube`. May be especially useful to create multiple datacubes with differing intermediate steps. Returns ------- ~martini.datacube.DataCube Copy of the :class:`~martini.datacube.DataCube` object. """ copy = type(self)( n_px_x=self.n_px_x, n_px_y=self.n_px_y, n_channels=self.n_channels, px_size=self.px_size, channel_width=self.channel_width, spectral_centre=self.spectral_centre, ra=self.ra, dec=self.dec, ) copy.padx, copy.pady = self.padx, self.pady copy._wcs = self.wcs.copy() copy._freq_channel_mode = self._freq_channel_mode copy._channel_edges = self._channel_edges copy._channel_mids = self._channel_mids copy._array = self._array.copy() return copy
[docs] def save_state(self, filename: str, overwrite: bool = False) -> None: """ Write the :class:`~martini.datacube.DataCube` state to file for re-use. The state can be re-initialized (see :meth:`~martini.datacube.DataCube.load_state`). Note that :mod:`h5py` must be installed for use. NOT for outputting mock observations, for this see :meth:`~martini.martini.Martini.write_fits` and :meth:`~martini.martini.Martini.write_hdf5`. Parameters ---------- filename : str File to write. overwrite : bool Whether to allow overwriting existing files. (default: ``False``). See Also -------- martini.datacube.DataCube.load_state """ import h5py mode = "w" if overwrite else "w-" with h5py.File(filename, mode=mode) as f: array_unit = self._array.unit f["_array"] = self._array.to_value(array_unit) f["_array"].attrs["datacube_unit"] = str(array_unit) f["_array"].attrs["n_px_x"] = self.n_px_x f["_array"].attrs["n_px_y"] = self.n_px_y f["_array"].attrs["n_channels"] = self.n_channels px_size_unit = self.px_size.unit f["_array"].attrs["px_size"] = self.px_size.to_value(px_size_unit) f["_array"].attrs["px_size_unit"] = str(px_size_unit) channel_width_unit = self.channel_width.unit f["_array"].attrs["channel_width"] = self.channel_width.to_value( channel_width_unit ) f["_array"].attrs["channel_width_unit"] = str(channel_width_unit) spectral_centre_unit = self.spectral_centre.unit f["_array"].attrs["spectral_centre"] = self.spectral_centre.to_value( spectral_centre_unit ) f["_array"].attrs["spectral_centre_unit"] = str(spectral_centre_unit) ra_unit = self.ra.unit f["_array"].attrs["ra"] = self.ra.to_value(ra_unit) f["_array"].attrs["ra_unit"] = str(ra_unit) dec_unit = self.dec.unit f["_array"].attrs["dec"] = self.dec.to_value(dec_unit) f["_array"].attrs["dec_unit"] = str(self.dec.unit) f["_array"].attrs["padx"] = self.padx f["_array"].attrs["pady"] = self.pady f["_array"].attrs["_freq_channel_mode"] = int(self._freq_channel_mode) f["_array"].attrs["stokes_axis"] = self.stokes_axis f["_array"].attrs["wcs_hdr"] = self.wcs.to_header_string() return
[docs] @classmethod def load_state(cls, filename: str) -> Self: """ Initialize a :class:`~martini.datacube.DataCube` from a state saved. Note that :mod:`h5py` must be installed for use. Note that ONLY the :class:`~martini.datacube.DataCube` state is restored, other modules and their configurations are not affected. Parameters ---------- filename : str File to open. Returns ------- ~martini.datacube.DataCube A suitably initialized :class:`~martini.datacube.DataCube` object. See Also -------- martini.datacube.DataCube.save_state """ import h5py with h5py.File(filename, mode="r") as f: n_px_x = f["_array"].attrs["n_px_x"] n_px_y = f["_array"].attrs["n_px_y"] n_channels = f["_array"].attrs["n_channels"] px_size = f["_array"].attrs["px_size"] * U.Unit( f["_array"].attrs["px_size_unit"] ) channel_width = f["_array"].attrs["channel_width"] * U.Unit( f["_array"].attrs["channel_width_unit"] ) spectral_centre = f["_array"].attrs["spectral_centre"] * U.Unit( f["_array"].attrs["spectral_centre_unit"] ) ra = f["_array"].attrs["ra"] * U.Unit(f["_array"].attrs["ra_unit"]) dec = f["_array"].attrs["dec"] * U.Unit(f["_array"].attrs["dec_unit"]) stokes_axis = bool(f["_array"].attrs["stokes_axis"]) D = cls( n_px_x=n_px_x, n_px_y=n_px_y, n_channels=n_channels, px_size=px_size, channel_width=channel_width, spectral_centre=spectral_centre, ra=ra, dec=dec, stokes_axis=stokes_axis, ) D.add_pad((f["_array"].attrs["padx"], f["_array"].attrs["pady"])) D._array = f["_array"] * U.Unit(f["_array"].attrs["datacube_unit"]) # must be after add_pad: D._wcs = wcs.WCS(f["_array"].attrs["wcs_hdr"]) return D
def __repr__(self) -> str: """ Print the contents of the data cube array itself. Returns ------- str Text representation of the :attr:`~martini.datacube.DataCube._array` contents. """ return self._array.__repr__()
class _GlobalProfileDataCube(DataCube): """ Helper class that configures a data cube with a single pixel to hold a spectrum. Parameters ---------- n_channels : int Number of channels along the spectral axis. channel_width : ~astropy.units.Quantity :class:`~astropy.units.Quantity` with dimensions of velocity or frequency. Step size along the spectral axis. Can be provided as a velocity or a frequency. spectral_centre : ~astropy.units.Quantity :class:`~astropy.units.Quantity` with dimensions of velocity or frequency. Velocity (or frequency) of the centre along the spectral axis. specsys : str, optional The spectral reference frame (standard of rest) of the World Coordinate System (WCS) associated with the data cube, selected from the list ``"gcrs"``, ``"icrs"``, ``"hcrs"``, ``"lsrk"``, ``"lsrd"``, ``"lsr"``. velocity_centre : ~astropy.units.Quantity, deprecated Deprecated, use spectral centre instead. """ def __init__( self, *, n_channels: int, channel_width: U.Quantity[U.km / U.s] | U.Quantity[U.Hz], spectral_centre: U.Quantity[U.km / U.s] | U.Quantity[U.Hz], specsys: str = "icrs", velocity_centre: None = None, # deprecated ) -> None: super().__init__( n_px_x=1, n_px_y=1, n_channels=n_channels, px_size=1 * U.deg, # must be >0, ignored for insertion, needed for units channel_width=channel_width, spectral_centre=spectral_centre, ra=0.0 * U.deg, dec=0.0 * U.deg, stokes_axis=False, coordinate_frame=ICRS(), specsys=specsys, velocity_centre=velocity_centre, ) return