Source code for pyradiosky.utils

# Copyright (c) 2019 Radio Astronomy Software Group
# Licensed under the 2-clause BSD License
"""Utility methods."""

import os

import astropy.units as units
import erfa
import numpy as np
from astropy.coordinates import Angle
from astropy.coordinates.builtin_frames.utils import get_jd12
from astropy.cosmology import Planck15
from astropy.time import Time
from astropy.units import Quantity

from pyradiosky.data import DATA_PATH as SKY_DATA_PATH

f21 = 1.420405751e9


# The frame radio astronomers call the apparent or current epoch is the
# "true equator & equinox" frame, notated E_upsilon in the USNO circular
# astropy doesn't have this frame but it's pretty easy to adapt the CIRS frame
# by modifying the ra to reflect the difference between
# GAST (Grenwich Apparent Sidereal Time) and the earth rotation angle (theta)
def _tee_to_cirs_ra(tee_ra, time):
    """
    Convert from the true equator & equinox frame to the CIRS frame.

    The frame radio astronomers call the apparent or current epoch is the
    "true equator & equinox" frame, notated E_upsilon in the USNO circular
    astropy doesn't have this frame but it's pretty easy to adapt the CIRS frame
    by modifying the ra to reflect the difference between
    GAST (Grenwich Apparent Sidereal Time) and the earth rotation angle (theta)

    Parameters
    ----------
    tee_ra : :class:`astropy.Angle`
        Current epoch RA (RA in the true equator and equinox frame).
    time : :class:`astropy.Time`
        Time object for the epoch of the `tee_ra`.
    """
    era = erfa.era00(*get_jd12(time, "ut1"))
    theta_earth = Angle(era, unit="rad")

    assert isinstance(time, Time)
    assert isinstance(tee_ra, Angle)
    gast = time.sidereal_time("apparent", longitude=0)
    cirs_ra = tee_ra - (gast - theta_earth)
    return cirs_ra


def _cirs_to_tee_ra(cirs_ra, time):
    """
    Convert from CIRS frame to the true equator & equinox frame.

    The frame radio astronomers call the apparent or current epoch is the
    "true equator & equinox" frame, notated E_upsilon in the USNO circular
    astropy doesn't have this frame but it's pretty easy to adapt the CIRS frame
    by modifying the ra to reflect the difference between
    GAST (Grenwich Apparent Sidereal Time) and the earth rotation angle (theta)

    Parameters
    ----------
    cirs_ra : :class:`astropy.Angle`
        CIRS RA.
    time : :class:`astropy.Time`
        Time object for time to convert to the "true equator & equinox" frame.
    """
    era = erfa.era00(*get_jd12(time, "ut1"))
    theta_earth = Angle(era, unit="rad")

    assert isinstance(time, Time)
    assert isinstance(cirs_ra, Angle)
    gast = time.sidereal_time("apparent", longitude=0)
    tee_ra = cirs_ra + (gast - theta_earth)
    return tee_ra


[docs]def stokes_to_coherency(stokes_arr): """ Convert Stokes array to coherency matrix. Parameters ---------- stokes_arr : Quantity Array of stokes parameters in order [I, Q, U, V], shape(4,) or (4, Nfreqs, Ncomponents). Returns ------- coherency matrix : array of float Array of coherencies, shape (2, 2) or (2, 2, Nfreqs, Ncomponents) """ if not isinstance(stokes_arr, Quantity): raise ValueError("stokes_arr must be an astropy Quantity.") initial_shape = stokes_arr.shape if initial_shape[0] != 4: raise ValueError("First dimension of stokes_vector must be length 4.") if stokes_arr.size == 4 and len(initial_shape) == 1: stokes_arr = stokes_arr[:, np.newaxis, np.newaxis] coherency = ( 0.5 * np.array( [ [ stokes_arr[0, :, :] + stokes_arr[1, :, :], stokes_arr[2, :, :] - 1j * stokes_arr[3, :, :], ], [ stokes_arr[2, :, :] + 1j * stokes_arr[3, :, :], stokes_arr[0, :, :] - stokes_arr[1, :, :], ], ] ) * stokes_arr.unit ) if stokes_arr.size == 4 and len(initial_shape) == 1: coherency = coherency.squeeze() return coherency
[docs]def coherency_to_stokes(coherency_matrix): """ Convert coherency matrix to vector of 4 Stokes parameter in order [I, Q, U, V]. Parameters ---------- coherency matrix : Quantity Array of coherencies, shape (2, 2) or (2, 2, Ncomponents) Returns ------- stokes_arr : array of float Array of stokes parameters, shape(4,) or (4, Ncomponents) """ if not isinstance(coherency_matrix, Quantity): raise ValueError("coherency_matrix must be an astropy Quantity.") initial_shape = coherency_matrix.shape if len(initial_shape) < 2 or initial_shape[0] != 2 or initial_shape[1] != 2: raise ValueError("First two dimensions of coherency_matrix must be length 2.") if coherency_matrix.size == 4 and len(initial_shape) == 2: coherency_matrix = coherency_matrix[:, :, np.newaxis] stokes = ( np.array( [ coherency_matrix[0, 0, :] + coherency_matrix[1, 1, :], coherency_matrix[0, 0, :] - coherency_matrix[1, 1, :], coherency_matrix[0, 1, :] + coherency_matrix[1, 0, :], -(coherency_matrix[0, 1, :] - coherency_matrix[1, 0, :]).imag, ] ).real * coherency_matrix.unit ) if coherency_matrix.size == 4 and len(initial_shape) == 2: stokes = stokes.squeeze() return stokes
[docs]def jy_to_ksr(freqs): """ Calculate multiplicative factors to convert [Jy] to [K sr]. Parameters ---------- freqs : :class:`astropy.Quantity` or array_like of float (Deprecated) Frequencies, assumed to be in Hz if not a Quantity. Returns ------- Quantity Conversion factor(s) to go from [Jy] to [K sr]. Shape equal to shape of freqs. """ freqs = np.atleast_1d(freqs) if not isinstance(freqs, Quantity): freqs = freqs * units.Hz equiv = units.brightness_temperature(freqs, beam_area=1 * units.sr) conv_factor = (1 * units.Jy).to(units.K, equivalencies=equiv) * units.sr / units.Jy return conv_factor
[docs]def download_gleam( path=".", filename="gleam.vot", overwrite=False, row_limit=None, for_testing=False ): """ Download the GLEAM vot table from Vizier. Parameters ---------- path : str Folder location to save catalog to. filename : str Filename to save catalog to. overwrite : bool Option to download the file even if it already exists. row_limit : int, optional Max number of rows (sources) to download, default is None meaning download all rows. for_testing : bool Download a file to use for unit tests. If True, some additional columns are included, the rows are limited to 50, the path and filename are set to put the file in the correct location and the overwrite keyword is set to True. """ try: from astroquery.vizier import Vizier except ImportError as e: raise ImportError( "The astroquery module is required to use the download_gleam function." ) from e if for_testing: # pragma: no cover path = SKY_DATA_PATH filename = "gleam_50srcs.vot" overwrite = True row_limit = 50 # We have frequent CI failures with messages like # "Failed to resolve 'vizier.u-strasbg.fr'" # Try using a US mirror to see if we have fewer errors Vizier.VIZIER_SERVER = "vizier.cfa.harvard.edu" opath = os.path.join(path, filename) if os.path.exists(opath) and not overwrite: print( f"GLEAM already downloaded to {opath}. Set overwrite=True to " "re-download it." ) return # full download is too slow for unit tests if row_limit is None: # pragma: no cover Vizier.ROW_LIMIT = -1 else: Vizier.ROW_LIMIT = row_limit desired_columns = [ "GLEAM", "RAJ2000", "DEJ2000", "Fintwide", "e_Fintwide", "alpha", "e_alpha", "chi2", "Fintfit200", "e_Fintfit200", "Fint076", "Fint084", "Fint092", "Fint099", "Fint107", "Fint115", "Fint122", "Fint130", "Fint143", "Fint151", "Fint158", "Fint166", "Fint174", "Fint181", "Fint189", "Fint197", "Fint204", "Fint212", "Fint220", "Fint227", "e_Fint076", "e_Fint084", "e_Fint092", "e_Fint099", "e_Fint107", "e_Fint115", "e_Fint122", "e_Fint130", "e_Fint143", "e_Fint151", "e_Fint158", "e_Fint166", "e_Fint174", "e_Fint181", "e_Fint189", "e_Fint197", "e_Fint204", "e_Fint212", "e_Fint220", "e_Fint227", ] if for_testing: # pragma: no cover desired_columns.extend(["Fpwide", "e_Fp076"]) Vizier.columns = desired_columns catname = "VIII/100/gleamegc" table = Vizier.get_catalogs(catname)[0] for col in desired_columns: assert col in table.colnames, f"column {col} not in downloaded table." table.write(opath, format="votable", overwrite=overwrite) print("GLEAM catalog downloaded and saved to " + opath)
[docs]def flat_spectrum_skymodel( *, variance, nside, ref_chan=0, ref_zbin=0, redshifts=None, freqs=None, frame="icrs" ): """ Generate a full-frequency SkyModel of a flat-spectrum (noiselike) EoR signal. The amplitude of this signal is variance * vol(ref_chan), where vol() gives the voxel volume and ref_chan is a chosen reference point. The generated SkyModel has healpix component type. Parameters ---------- variance: float Variance of the signal, in Kelvin^2, at the reference channel. nside: int HEALPix NSIDE parameter. Must be a power of 2. ref_chan: int Frequency channel to set as reference, if using freqs. ref_zbin: int Redshift bin number to use as reference, if using redshifts. redshifts: numpy.ndarray Redshifts at which to generate maps. Ignored if freqs is provided. freqs: numpy.ndarray Frequencies in Hz, not required if redshifts is passed. Overrides redshifts if passed. Returns ------- pyradiosky.SkyModel A SkyModel instance corresponding a white noise-like EoR signal. Notes ----- Either redshifts or freqs must be provided. The history string of the returned SkyModel gives the expected amplitude. """ # import here to avoid circular imports from pyradiosky import SkyModel if freqs is not None: if not isinstance(freqs, units.Quantity): freqs *= units.Hz if np.any(np.diff(freqs.value) < 0): raise ValueError("freqs must be in ascending order.") redshifts = f21 / freqs.to("Hz").value - 1 ref_z = redshifts[ref_chan] # must sort so that redshifts go in ascending order (opposite freq order) z_order = np.argsort(redshifts) redshifts = redshifts[z_order] freqs = freqs[z_order] ref_zbin = np.where(np.isclose(redshifts, ref_z))[0][0] elif redshifts is not None: if np.any(np.diff(redshifts) < 0): raise ValueError("redshifts must be in ascending order.") freqs = (f21 / (redshifts + 1)) * units.Hz else: raise ValueError("Either redshifts or freqs must be set.") npix = 12 * nside**2 nfreqs = freqs.size omega = 4 * np.pi / npix # Make some noise. stokes = np.zeros((4, npix, nfreqs)) stokes[0, :, :] = np.random.normal(0.0, np.sqrt(variance), (npix, nfreqs)) voxvols = np.zeros(nfreqs) for zi in range(nfreqs - 1): dz = redshifts[zi + 1] - redshifts[zi] vol = ( Planck15.differential_comoving_volume(redshifts[zi]).to_value("Mpc^3/sr") * dz * omega ) voxvols[zi] = vol voxvols[nfreqs - 1] = ( vol # Assume the last redshift bin is the same width as the next-to-last. ) scale = np.sqrt(voxvols / voxvols[ref_zbin]) stokes /= scale stokes = np.swapaxes(stokes, 1, 2) # Put npix in last place again. # sort back to freq order f_order = np.argsort(freqs) freqs = freqs[f_order] stokes = stokes[:, f_order] # The true power spectrum amplitude is variance * reference voxel volume. pspec_amp = variance * voxvols[ref_zbin] history_string = ( f"Generated flat-spectrum model, with spectral amplitude {pspec_amp:.3f} " ) history_string += r"K$^2$ Mpc$^3$" return SkyModel( freq_array=freqs, hpx_inds=np.arange(npix), spectral_type="full", nside=nside, stokes=stokes * units.K, history=history_string, frame=frame, )