"""
Provides a set of functions to handle instrumental effects.
:func:`log_rebin` has been pulled from
:mod:`mangadap.contrib.ppxf_util.py` and modified.
*Source location*:
$MANGADAP_DIR/python/mangadap/util/instrument.py
*Python2/3 compliance*::
from __future__ import division
from __future__ import print_function
from __future__ import absolute_import
import sys
if sys.version > '3':
long = int
*Imports*::
import warnings
import numpy
from scipy.interpolate import InterpolatedUnivariateSpline
import astropy.constants
*Revision history*:
| **27 May 2015**: Original implementation by K. Westfall (KBW)
based on downgrader_MANGA.f provided by D. Thomas, O. Steele, D.
Wilkinson, D. Goddard.
13 June 2015: D.Wilkinson edit to not calculate unimportant
convolution terms -> runs 5x faster.
"""
from __future__ import division
from __future__ import print_function
from __future__ import absolute_import
import sys
if sys.version > '3':
long = int
import warnings
from scipy.interpolate import InterpolatedUnivariateSpline
import numpy as np
from astropy.constants import c as speedOfLight
c = speedOfLight.to('km/s').value
[docs]def where_not(indx, size):
"""
Return a tuple with the indices of a vector that were *not* selected
by a call to `np.where`_. **The function currently only works
for 1D vectors.**
Args:
indx (tuple): Tuple returned by a call to `np.where`_ for a
1D vector.
size (int): Length of the original vector in the call to
`np.where`_.
.. warning::
Performs **no** checks of the input.
"""
return (np.setdiff1d(np.arange(0,size), indx[0]),)
[docs]def spectrum_velocity_scale(wave, log10=False):
"""
Determine the velocity sampling of an input wavelength coordinate
vector. The wavelength vector is assumed to be logarithmically
sampled, but its units are in angstroms.
Args:
wave (np.ndarray): Wavelength coordinates of each spectral
channel in angstroms. It is expected that the spectrum has
been sampled geometrically.
log10 (bool): (Optional) Input spectrum has been sample
geometrically using the base 10 logarithm, instead of the
natural logarithm.
Returns:
float : Velocity scale of the spectrum in km/s.
"""
dl_over_l = (np.log10(wave[1])-np.log10(wave[0]))*np.log(10.0) if log10 else \
np.log(wave[1])-np.log(wave[0])
return c*dl_over_l
[docs]class convolution_integral_element:
"""
Support class for variable sigma convolution. See
:func:`convolution_variable_sigma`.
Args:
y (np.ndarray): Vector to convolve
sigma (np.ndarray): Coordinate-dependent standard deviation of the
Gaussian kernel
ye (np.ndarray): (Optional) Error in the vector to convolve
Raises:
Exception: Raised if *y* is not a 1D vector, or if the shape of
*y* and *sigma* (and *ye* if provided) are different.
Attributes:
x (np.ndarray): Pixel coordinate vector
y (np.ndarray): Vector to convolve
ye (np.ndarray): Error in the vector to convolve
sigma (np.ndarray): Coordinate-dependent standard deviation of the
Gaussian kernel
norm (np.ndarray): Gaussian normalization; calculated once for
efficiency
"""
def __init__(self, y, sigma, ye=None):
if len(y.shape) != 1:
raise Exception('y must be a 1D array!')
if y.shape != sigma.shape:
raise Exception('y and sigma must have the same shape!')
if ye is not None and ye.shape != y.shape:
raise Exception('y and ye must have the same shape!')
self.x = np.arange(sigma.size, dtype=np.float64)
self.y = y
self.ye = ye
self.sigma = sigma
self.norm = np.sqrt(2.0*np.pi)*self.sigma
def _get_kernel(self, xc):
"""Calculate the kernel vector when centered at *xc*."""
div = np.square((self.x-xc)/self.sigma)
close_value = np.where(div < 50.0)
outkern = np.exp(-0.5*div[close_value])/self.norm[close_value]
return close_value,outkern
def __call__(self, xc):
"""
Calculates the weighted mean of :attr:`y`, where the weights are
defined by a Gaussian with standard deviation :attr:`sigma` and
centered at xc.
Args:
xc (float): Center for the Gaussian weighting function
Returns:
float: The weighted mean of :attr:`y`
"""
close_array,kernel = self._get_kernel(xc)
return np.sum(self.y[close_array]*kernel) / np.sum(kernel)
[docs] def error(self, xc):
"""
Calculates the error in the weighted mean of :attr:`y` using
nominal error propagation. The weights are defined by a
Gaussian with standard deviation :attr:`sigma` and centered at
xc.
Args:
xc (float): Center for the Gaussian weighting function
Returns:
float: The error in the weighted mean of :attr:`y`
"""
kernel = self._get_kernel(xc)
return np.sqrt(np.sum(np.square(self.ye*kernel)) / np.sum(kernel))
[docs]def convolution_variable_sigma(y, sigma, ye=None):
r"""
Convolve a discretely sampled function :math:`y(x)` with a Gaussian
kernel, :math:`g`, where the standard deviation of the kernel is a
function of :math:`x`, :math:`\sigma(x)`. Nominal calculations can
be performed to propagate the error in the result; these
calculations **do not** include the covariance between the pixels,
which will mean that the calculations likely have significant error!
The convolution is defined as:
.. math::
(y\ast g)(x) &= \int_{-\infty}^{\infty} y(X)\ g(\sigma,x-X)\ dX \\
&= \int_{-\infty}^{\infty} \frac{y(X)}{\sqrt{2\pi}\
\sigma(X)}\ \exp\left(-\frac{(x-X)^2}{2\
\sigma(X)^2}\right) dX .
To minimize edge effects and account for the censoring of the data
(finite range in :math:`x`), the convolution is actually calculated
as a definite integral and normalized as follows:
.. math::
(y\ast g)(x) \sim\frac{
\int_{x-n_\sigma*\sigma(x)}^{x+n_\sigma*\sigma(x)} y(X)\
g(\sigma,x-X)\ dX}{
\int_{x-n_\sigma*\sigma(x)}^{x+n_\sigma*\sigma(x)}
g(\sigma,x-X)\ dX} .
The above is identical to getting the weighted mean of :math:`y` at
each position :math:`x`, where the weights are defined by a Gaussian
kernel centered at :math:`x` with a variable dispersion.
Use of this function requires:
- *y* and *sigma* must be 1D vectors
- *y* and *sigma* must be uniformly sampled on the same grid
- *sigma* must be in pixel units.
Args:
y (np.ndarray): A uniformly sampled function to convolve.
sigma (np.ndarray): The standard deviation of the Gaussian
kernel sampled at the same positions as *y*. The units of
*sigma* **must** be in pixels.
ye (np.ndarray): (Optional) Errors in the function
:math:`y(x)`.
Returns:
np.ndarray: Arrays with the convolved function :math:`(y\ast
g)(x)` sampled at the same positions as the input :math:`x`
vector and its error. The second array will be returned as
None if the error vector is not provided.
Raises:
Exception: Raised if trying to calculate the errors because they
haven't been implemented yet.
"""
kernel = convolution_integral_element(y,sigma,ye=ye)
conv = np.array(list(map(kernel, kernel.x)))
if ye is None:
return conv
out = np.array(list(map(kernel.error, kernel.x)))
return conv, out
[docs]class spectral_resolution:
r"""
Container class for the resolution, :math:`R =
\lambda/\Delta\lambda`, of a spectrum. The primary functionality is
to determine the parameters necessary to match the resolution of one
spectrum to another. It can also be used as a function to
interpolate the spectral resolution at a given wavelenth.
Args:
wave (np.ndarray): A 1D vector with the wavelength in
angstroms. The sampling may be either in linear steps of
wavelength or :math:`\log_{10}` steps.
sres (np.ndarray): A 1D vector with the spectral resolution,
:math:`R`, sampled at the positions of the provided
wavelength vector.
log10 (bool): (Optional) Flag that the spectrum has been binned
logarithmically (base 10) in wavelength
interp_ext (int or str): (Optional) The value to pass as *ext*
to the interpolator, which defines its behavior when
attempting to sample the spectral resolution beyond where it
is defined. See
`scipy.interpolate.InterpolatedUnivariateSpline`_. Default
is to extrapolate.
Raises:
Exception: Raised if *wave* is not a 1D vector or if *wave* and
*sres* do not have the same shape.
Attributes:
interpolator
(`scipy.interpolate.InterpolatedUnivariateSpline`_): An
object used to interpolate the spectral resolution at any
given wavelength. The interpolation is hard-wired to be
**linear** and its extrapolation behavior is defined by
*interp_ext*. The wavelength and resolution vectors are
held by this object for later reference if needed.
log10 (bool): Flag that the spectrum has been binned
logarithmically (base 10) in wavelength
cnst (:class:`mangadap.util.constants`): Used to define the
conversion factor between a Gaussian sigma and FWHM
c (float): The speed of light; provided by `astropy.constants`_.
dv (float): The velocity step per pixel in km/s. Defined using
:func:`spectrum_velocity_scale` if :attr:`log10` is True;
otherwise set to None.
dw (float): The wavelength step per pixel in angstroms. Defined
as the wavelength step between the first two pixels if
:attr:`log10` is False; otherwise set to None.
min_sig (float): Minimum standard deviation allowed for the
kernel used to match two spectral resolutions. See
:func:`GaussianKernelDifference`.
sig_pd (np.ndarray): The standard deviation in pixels
required to match the spectral resolution of this object to
the resolution defined by a different spectral_resolution
object. See :func:`GaussianKernelDifference`.
sig_mask (np.ndarray): A *uint* vector used to identify
measurements of :attr:`sig_pd` that should **not** be used
to match the spectral resolution of this object to the
resolution defined by a different spectral_resolution
object. See :func:`GaussianKernelDifference`.
sig_vo (float): A constant offset of the kernal standard
deviation **in km/s** that has been applied to
:attr:`sig_pd`. See :func:`GaussianKernelDifference`.
.. todo::
- Allow it to determine if the binning is linear or geometric,
then use the *log10* keyword to distinguish between natural
log and :math:`log_{10}` binning.
- Allow for more than one type of line-spread function?
.. warning::
The default behavior of the interpolator is to extrapolate the
input spectral resolution vector when trying to sample from
regions beyond where it is sampled. Use *interp_ext* change
this; see `scipy.interpolate.InterpolatedUnivariateSpline`_.
.. _scipy.interpolate.InterpolatedUnivariateSpline: http://docs.scipy.org/doc/scipy/reference/generated/scipy.interpolate.InterpolatedUnivariateSpline.html
.. _astropy.constants: http://docs.astropy.org/en/stable/constants/index.html
"""
def __init__(self, wave, sres, log10=False, interp_ext='extrapolate'):
# Check the sizes
if len(wave.shape) != 1:
raise Exception('wave must be a 1D array!')
if wave.shape != sres.shape:
raise Exception('wave and sres must have the same shape!')
# k=1; always use linear interpolation
self.interpolator = InterpolatedUnivariateSpline(wave, sres, k=1, ext=interp_ext)
self.log10 = log10
# Convert from sigma to FWHM: FWHM = sig2fwhm * sig
self.sig2fwhm = 2.0 * np.sqrt(2.0 * np.log(2.0))
# Convert from radians to arcseconds: arcsec = rad2arcs * radians
self.rad2arcs = 60*60*180/np.pi
# Length of one sidereal year in seconds ()
self.sidereal_year = 31558175.779
self.dv = spectrum_velocity_scale(wave, log10=True) if log10 else None
self.dw = None if log10 else wave[1] - wave[0]
# No resolution matching calculated yet
self.min_sig = None
self.sig_pd = None
self.sig_mask = None
self.sig_vo = None
def __call__(self, w):
"""Interpolate the spectral resolution at wavelength *w*."""
return self.interpolator(w)
def _finalize_GaussianKernelDifference(self, sig2_pd):
r"""
Given the calculated :math:`\sigma^2_{p,d}`, calculate and save
the attributes :attr:`sig_pd` and :attr:`sig_mask`. See
:func:`GaussianKernelDifference`.
"""
indx = np.where(np.isclose(sig2_pd, 0.0))
nindx = where_not(indx, sig2_pd.size)
self.sig_pd = np.copy(sig2_pd)
self.sig_pd[nindx] = sig2_pd[nindx]/np.sqrt(np.absolute(sig2_pd[nindx]))
self.sig_pd[indx] = 0.0
self.sig_mask = np.array(self.sig_pd < -self.min_sig).astype(np.uint)
def _convert_vd2pd(self, sig2_vd):
r"""
Convert from :math:`\sigma^2_{v,d}` to :math:`\sigma^2_{p,d}`.
See :func:`GaussianKernelDifference`.
"""
return sig2_vd / np.square(self.dv) if self.log10 else \
sig2_vd / np.square(c*self.dw/self.wave())
def _convert_pd2vd(self, sig2_pd):
r"""
Convert from :math:`\sigma^2_{p,d}` to :math:`\sigma^2_{v,d}`.
See :func:`GaussianKernelDifference`.
"""
return sig2_pd * np.square(self.dv) if self.log10 else \
sig2_pd * np.square(c*self.dw/self.wave())
[docs] def wave(self):
"""
Return the wavelength vector; held by :attr:`interpolator`.
"""
return self.interpolator._data[0]
[docs] def sres(self):
"""
Return the resolution vector; held by :attr:`interpolator`.
"""
return self.interpolator._data[1]
[docs] def match(self, new_sres, no_offset=True, min_sig_pix=0.0):
"""
Currently only an alias for :func:`GaussianKernelDifference`.
"""
self.GaussianKernelDifference(new_sres, no_offset=no_offset, min_sig_pix=min_sig_pix)
[docs] def GaussianKernelDifference(self, new_sres, no_offset=True, min_sig_pix=0.0):
r"""
Determine the parameters of a Gaussian kernel required to
convert the resolution of this object to the resolution of a
different the :class:`spectral_resolution` object, *new_sres*.
The spectral resolution is defined as :math:`R =
\lambda/\Delta\lambda`, where :math:`\Delta\lambda` is the FWHM
of the spectral resolution element. The standard deviation of
the resolution element in angstroms is then
.. math::
\sigma_\lambda = \frac{\lambda}{f R}, \ \ {\rm where} \ \ f
= \frac{{\rm FWHM_\lambda}}{\sigma_\lambda}.
Assuming a Gaussian (in angstroms) line-spread function:
.. math::
\sigma^2_{\lambda,2} = \sigma^2_{\lambda,1} +
\sigma^2_{\lambda,d}
such that
.. math::
\sigma^2_{\lambda,d} = \left(\frac{\lambda}{f}\right)^2
(R^{-2}_2 - R^{-2}_1)
is the defining parameter of the Gaussian kernel needed to take
a spectrum of resolution :math:`R_1` to one with a resolution of
:math:`R_2`.
For input to :func:`convolution_variable_sigma`, the
*wavelength-dependent* parameter of the Gaussian kernel is
converted to pixels. This function allows for the input spectra
to be linearly sampled in angstroms or log10(angstroms). For
the former (*log10=False*),
.. math::
\sigma^2_{p,d} = \left(\frac{\lambda}{f\
\delta\lambda}\right)^2 (R^{-2}_2 - R^{-2}_1)
where :math:`\delta\lambda` is the size of the pixel in
angstroms. If the units are log10(angstrom) (*log10=True*), we
approximate the velocity width of each pixel to be :math:`\delta
v = c \ln(10.0) (\log\lambda[1]-\log\lambda[0])`, such that
.. math::
\sigma^2_{p,d} &= \left(\frac{c}{ \delta v \lambda}\right)^2
\sigma^2_{\lambda,d} \\ &= \left(\frac{c}{ f\ \delta
v}\right)^2 (R^{-2}_2 - R^{-2}_1)\ ;
:math:`c` is the speed of light in km/s.
The nominal use of this algorithm assumes :math:`R_1 \geq R_2`.
However, in practice, :func:`convolution_variable_sigma` only
uses a Gaussian kernel up to some minimum value of
:math:`\epsilon_\sigma`; below this, the kernel is assumed to be
a Delta function. Therefore, as long as
.. math::
\sigma_{p,d} \equiv \sigma^2_{p,d}/\sqrt{|\sigma^2_{p,d}|}
\geq -\epsilon_\sigma\ ,
the behavior of :func:`convolution_variable_sigma` should not be
affected.
Even so, there may be spectral regions that do not have
:math:`\sigma_{p,d} \geq -\epsilon_\sigma`; for such spectral
regions there are three choices:
(**Option 1**) trim the spectral range to only those
spectral regions where the existing resolution is better
than the target resolution,
(**Option 2**) match the existing resolution to the target
resolution up to some constant offset that must be accounted
for in subsequent analyses, or
(**Option 3**) allow for a wavelength dependent difference
in the spectral resolution that must be accounted for in
subsequent analyses.
The choice of either Option 1 or 2 is selected by setting
*no_offset* to, respectively, True or False; Option 1 is the
default behavior. Currently, Option 3 is not allowed.
For Option 1, pixels with :math:`\sigma_{p,d} <
-\epsilon_\sigma` are masked (*sigma_mask = 1*); however, the
returned values of :math:`\sigma_{p,d}` are left unchanged.
For Option 2, we define
.. math::
\sigma^2_{v,o} = -{\rm min}(\sigma^2_{v,d}) - {\rm
max}(\epsilon_\sigma \delta v)^2
where :math:`\delta v` is constant for the logarithmically
binned spectrum and is wavelength dependent for the linearly
binned spectra; in the latter case, the velocity step is
determined for each pixel::
_wave = self.wave()
dv = c * (2.0*(_wave[1:] - _wave[0:-1]) / (_wave[1:] + _wave[0:-1]))
If :math:`\sigma^2_{v,o} > 0.0`, it must be that :math:`{\rm
min}(\sigma^2_{v,d}) < -{\rm max}(\epsilon_\sigma \delta v)^2`,
such that an offset should be applied. In that case, the
returned kernel parameters are
.. math::
\sigma^\prime_{v,d} = \sqrt{\sigma^2_{v,d} +
\sigma^2_{v,o}}\ .
with the units converted to pixels using the equations above, no
pixels are masked, and :math:`\sqrt{\sigma^2_{v,o}}` is returned
for the offset. Otherwise, the offset is set to 0.
.. todo::
Allow to check cases when the convolution kernel is
indpendent of wavelength such that the convolution can be
sped up by performing the convolution using an FFT. For
example, in the case where the spectrum is logarithmically
binned and both :math:`R_1` and :math:`R_2` are
*independent* of wavelength, the convolution kernel is
independent of wavelength.
Args:
new_sres (:class:`spectral_resolution`): Spectral resolution
to match to.
no_offset (bool): (Optional) Force :math:`\sigma^2_{v,o} =
0` by masking regions with :math:`\sigma_{p,d} <
-\epsilon_\sigma`; i.e., the value of this arguments
selects Option 1 (True) or Option 2 (False).
min_sig_pix (float): (Optional) Minimum value of the
standard deviation allowed before assuming the kernel is
a Delta function.
"""
# Save the minimum pixel sigma to allow
self.min_sig = min_sig_pix
# Interpolate the new spectral resolution vector at the wavelengths
# of the input spectral resolution
_wave = self.wave()
_sres = self.sres()
interp_sres = new_sres(_wave)
# Determine the variance (in angstroms) of Gaussian needed to match
# input resolution to the new values
sig2_wd = np.square(_wave/self.sig2fwhm) \
* (1.0/np.square(interp_sres) - 1.0/np.square(_sres))
# Convert to km/s
sig2_vd = np.square(c/_wave) * sig2_wd
# Option 1:
if no_offset:
# Convert the variance to pixel coordinates
sig2_pd = sig2_vd / np.square(self.dv) if self.log10 else \
sig2_wd / np.square(self.dw)
# No offset applied
self.sig_vo = 0.0
# Option 2:
else:
# Calculate the velocity step of each pixel
dv = c * (2.0*(_wave[1:] - _wave[0:-1]) / (_wave[1:] + _wave[0:-1]))
# Get the needed *velocity* offset (this is the square)
self.sig_vo = - np.amin(sig2_vd) - np.square(self.min_sig * np.amax(dv))
# Apply it if it's larger than 0
if self.sig_vo > 0:
sig2_vd += self.sig_vo
self.sig_vo = np.sqrt(self.sig_vo)
else:
self.sig_vo = 0.0
# Convert the variance to pixel coordinates
sig2_pd = self._convert_vd2pd(sig2_vd)
self._finalize_GaussianKernelDifference(sig2_pd)
[docs] def offset_GaussianKernelDifference(self, offset):
r"""
If the properties required to match the resolution of one
spectrum to another has already been calculated (see
:func:`GaussianKernelDifference`), this allows for one to apply
an additional offset. The additional offset **must** be in km/s
(not pixels).
The offset is applied in quadrature; however, the offset can be
negative such that one can reduce :attr:`sig_pd`. Once
converted to km/s, the offset is applied by calculating:
.. math::
\sigma^{\prime\ 2}_{v,d} = \sigma^{2}_{v,d} +
\sigma_{off}|\sigma_{off}|\ .
:attr:`sig_vo` is adjusted in the same way, and the change in
:math:`\sigma^{\prime\ 2}_{v,d}` is then propagated to
:attr:`sig_pd` and :attr:`sig_mask`.
Args:
offset (float): Value of the standard deviation in km/s to
add in quadrature to a previously calculated
:attr:`sig_pd`.
Raises:
Exception: Raised if the kernel properties have not yet been
defined.
"""
if None in [self.min_sig, self.sig_pd, self.sig_mask, self.sig_vo]:
raise Exception('No kernel defined yet. Run GaussianKernelDifference first.')
if np.isclose(offset,0.0):
return
off2 = offset*np.absolute(offset)
sig2_vo = self.sig_vo*np.absolute(self.sig_vo) + off2
self.sig_vo = 0.0 if np.isclose(sig2_vo, 0.0) else sig2_vo/np.sqrt(np.absolute(sig2_vo))
sig2_vd = self._convert_pd2vd(self.sig_pd*np.absolute(sig_pd)) + off2
self._finalize_GaussianKernelDifference(self._convert_vd2pd(sig2_vd))
[docs] def adjusted_resolution(self, indx=None):
r"""
Return the resolution that should result from applying
:func:`convolution_variable_sigma` to the spectrum associated
with this spectral resolution object using :attr:`sigma_pd`.
I.e., calculate:
.. math::
R_2 = \left[ \left(\frac{f}{c}\right)^2 \sigma^2_{v,d} +
R^{-2}_1\right]^{-1/2}\ .
Args:
indx (tuple): (Optional) Selection tuple used to return a
subset of the full resolution vector.
Returns:
np.ndarray: The (full or selected) vector with the
adjusted resolution.
.. todo::
Allow to reset the resolution of this object to the adjusted
resolution and reset the kernel variables to None.
"""
if indx is None:
return 1.0/np.sqrt( np.square(self.sig2fwhm/c) * self._convert_pd2vd(self.sig_pd*np.absolute(self.sig_pd)) + 1.0/np.square(self.sres()) )
return 1.0/np.sqrt( np.square(self.sig2fwhm/c) * (self._convert_pd2vd(self.sig_pd*np.absolute(self.sig_pd)))[indx] + 1.0/np.square(self.sres()[indx]) )
[docs]def match_spectral_resolution(wave, flux, sres, new_sres_wave, new_sres, ivar=None, mask=None, min_sig_pix=0.0, no_offset=True, variable_offset=False, log10=False, new_log10=False):
r"""
Adjust the existing spectral resolution of to a **lower** resolution
as best as possible. The primary functionality is in
:class:`spectral_resolution`, which determines the Gaussian kernel
parameters needed to match the resolution, and
:func:`convolve_variable_sigma`, which actually performs the
convolution to match the resolution.
In particular, see
:func:`spectral_resolution.GaussianKernelDifference` for a
description of how the kernel parameters are determined and how
regions where the target resolution is **higher** than the existing
resolution. In this case, one of the options is to adopt an offset
of the resolution (in km/s) that could be corrected for in
subsequent analysis. In this case, setting *variable_offset* to
True allows the offset to be different for all input spectra. If
one expects to combine the spectra, the default behavior should be
used, forcing all the spectra to have a constant offset.
Args:
wave (np.ndarray): A 1D or 2D (:math:`N_{\rm spec}\times
N_{\rm pix}`) array with the wavelength in angstroms for a
set of spectra. The sampling may be either in linear steps
of wavelength or :math:`\log_{10}` steps, as set using
*log10*.
flux (np.ndarray): A 1D or 2D (:math:`N_{\rm spec}\times
N_{\rm pix}`) array with the flux sampled at the provided
wavelengths.
sres (np.ndarray): A 1D or 2D (:math:`N_{\rm spec}\times
N_{\rm pix}`) array with the spectral resolution, :math:`R`,
at the provided wavelengths.
new_sres_wave (np.ndarray): A 1D vector with the wavelength
in angstroms at which the new resolution of the input
spectra has been sampled. The sampling may be either in
linear steps of wavelength or :math:`\log_{10}` steps, as
set using *new_log10*.
new_sres (np.ndarray): A 1D vector with the new resolution
for the input spectra.
ivar (np.ndarray): (Optional) A 1D or 2D (:math:`N_{\rm
spec}\times N_{\rm pix}`) array with the inverse variance of
the flux sampled at the provided wavelengths. **Currently
never used and, if provided, the output inverse variances
are simply a carbon copy of this array.**
mask (np.ndarray): (Optional) A 1D or 2D (:math:`N_{\rm
spec}\times N_{\rm pix}`) array with a *uint* mask for the
flux sampled at the provided wavelengths.
no_offset (bool): (Optional) Force :math:`\sigma^2_{v,o} = 0` by
masking regions with :math:`\sigma_{p,d} <
-\epsilon_\sigma`; i.e., the value of this arguments selects
Option 1 (True) or Option 2 (False). See
:func:`spectral_resolution.GaussianKernelDifference`.
min_sig_pix (float): (Optional) Minimum value of the standard
deviation in pixels allowed before assuming the kernel is a
Delta function.
variable_offset (bool): (Optional) Flag to allow the offset
applied to each spectrum (when the input contains more than
one spectraum) to be tailored to each spectrum. Otherwise
(*variable_offset=False*) the offset is forced to be the
same for all spectra.
log10 (bool): (Optional) Flag that the spectrum has been binned
logarithmically (base 10) in wavelength
new_log10 (bool): (Optional) Flag that the coordinates of the
new spectral resolution are spectrum as been binned
logarithmically (base 10) in wavelength.
Returns:
np.ndarray : Four or Five arrays are returned:
- A 1D or 2D (:math:`N_{\rm spec}\times N_{\rm pix}`) array
with the resolution-matched flux sampled at the input
wavelengths.
- A 1D or 2D (:math:`N_{\rm spec}\times N_{\rm pix}`) array
with the spectral resolution, :math:`R`, of the
resolution-matched spectra at the provided wavelengths.
- A 1D vector with any constant offset in resolution **in
km/s** between the targetted value and the result. See
:func:`spectral_resolution.GaussianKernelDifference`.
- A 1D or 2D (:math:`N_{\rm spec}\times N_{\rm pix}`) array
with a *uint* mask for the resolution-matched flux sampled
at the input wavelengths. This is returned regardless of
whether an input mask was provided. Any pixel that had a
resolution that was lower than the target resolution (up
to some tolerance defined by *min_sig_pix*) is returned as
masked.
- (Optional) A 1D or 2D (:math:`N_{\rm spec}\times N_{\rm
pix}`) array with the inverse variance of the
resolution-matched flux sampled at the input wavelengths.
**Only output if *ivar* is provided, and in that case this
is just a carbon copy of the input array.**
Raises:
Exception: Raised if:
- the input *wave* array is 2D and the *sres* array is not;
a 1D wavelength array is allowed for a 2D *sres* array but
not vice versa
- the number of spectral pixels in *wave*, *flux*, and
*sres* is not the same
- the shape of the *flux*, *mask* (if provided), and *ivar*
(if provided) are not the same
- the shape of the *new_sres_wave* and *new_sres* arrays
are not the same and/or not 1D
.. todo::
- Add interp_ext != 'extrapolate' option?
- Better way to use warnings?
"""
# Check the dimensionality of wave and sres
wave_matrix = len(wave.shape) == 2
sres_matrix = len(sres.shape) == 2
if wave_matrix and not sres_matrix:
raise Exception('If input wavelength array is 2D, the spectral resolution array must' ' also be 2D')
# Check the shapes
if (wave_matrix == sres_matrix and wave.shape != sres.shape) or \
(not wave_matrix and sres_matrix and wave.shape[0] != sres.shape[1]):
raise Exception('Input spectral resolution and coordinate arrays must have the same' ' number of spectral channels!')
if (wave_matrix and wave.shape != flux.shape) or \
(not wave_matrix and len(flux.shape) == 2 and wave.shape[0] != flux.shape[1]) or \
(not wave_matrix and len(flux.shape) == 1 and wave.shape != flux.shape):
raise Exception('Input flux and coordinate arrays must have the same number of' ' spectral channels!')
if (mask is not None and mask.shape != flux.shape):
raise Exception('Input flux and mask arrays must have the same shape!')
if (ivar is not None and ivar.shape != flux.shape):
raise Exception('Input flux and ivar arrays must have the same shape!')
if len(new_sres_wave.shape) != 1 or len(new_sres.shape) != 1:
raise Exception('New spectral resolution and coordinate arrays must be 1D!')
if new_sres_wave.shape != new_sres.shape:
raise Exception('New spectral resolution and coordinate arrays must have the same shape!')
# Raise a warning if the new_sres vector will have to be
# extrapolated for the input wavelengths
if np.amin(wave) < new_sres_wave[0] or np.amax(wave) > new_sres_wave[-1]:
warnings.warn('Mapping to the new spectral resolution will require extrapolating the' \
' provided input vectors!')
# Initialize some variables
nspec = 1 if len(sres.shape) == 1 else sres.shape[0]
sigma_offset = np.zeros(nspec, dtype=np.float64)
new_res = spectral_resolution(new_sres_wave, new_sres, log10=new_log10)
res = []
# Get the kernel parameters necessary to match all spectra to the
# new resolution
if nspec == 1:
res = [spectral_resolution(wave, sres, log10=log10)]
res[0].match(new_res, no_offset=no_offset, min_sig_pix=min_sig_pix)
sigma_offset[0] = res[0].sig_vo
else:
for i in range(0,nspec):
_wave = wave[i,:].ravel() if wave_matrix else wave
_sres = sres[i,:].ravel() if sres_matrix else sres
res = res + [spectral_resolution(_wave, _sres, log10=log10)]
res[i].match(new_res, no_offset=no_offset, min_sig_pix=min_sig_pix)
sigma_offset[i] = res[i].sig_vo
# Force all the offsets to be the same, if requested
if not no_offset and not variable_offset:
common_offset = np.max(sigma_offset)
offset_diff = np.sqrt( np.square(common_offset) - np.square(sigma_offset))
for r, o in zip(res,offset_diff):
r.offset_GaussianKernelDifference(o)
# Perform the convolutions
out_flux = np.copy(flux)
out_sres = np.copy(sres)
if mask is None:
mask = np.zeros(flux.shape, dtype=np.uint)
out_mask = np.copy(mask)
if nspec == 1:
indx = np.where(res[0].sig_pd > min_sig_pix)
out_flux[indx] = convolution_variable_sigma(flux[indx], res[0].sig_pd[indx])
out_sres[indx] = res[0].adjusted_resolution(indx=indx)
out_mask = np.array((res[0].sig_mask == 1) | (mask == 1)).astype(np.uint)
else:
for i in range(0,nspec):
print('Matching resolution: {0}/{1}'.format(i+1,nspec))
indx = np.where(res[i].sig_pd > min_sig_pix)
out_flux[i,indx] = convolution_variable_sigma(flux[i,indx].ravel(), res[i].sig_pd[indx])
out_sres[i,indx] = res[i].adjusted_resolution(indx=indx)
out_mask[i,:] = np.array((res[i].sig_mask == 1) | (mask[i,:] == 1)).astype(np.uint)
# TODO: Add this functionality from the IDL version?
#
# Finally, the code masks a number of pixels at the beginning and
# end of the spectra to remove regions affected by errors in the
# convolution due to the censoring of the data. The number of
# pixels is the FWHM of the largest Gaussian applied in the
# convolution: ceil(sig2fwhm*max(diff_sig_w)/dw). This is currently
# hard-wired and should be tested.
if ivar is not None:
out_ivar = np.copy(ivar)
# Handle the inverse variances then return the answers
return out_flux, out_sres, sigma_offset, out_mask, out_ivar
return out_flux, out_sres, sigma_offset, out_mask
[docs]def log_rebin(lamRange, spec, oversample=None, velscale=None, flux=False, log10=False,
newRange=None, wave_in_ang=False, unobs=0.0):
"""
.. note::
Copyright (C) 2001-2014, Michele Cappellari
E-mail: cappellari_at_astro.ox.ac.uk
This software is provided as is without any warranty whatsoever.
Permission to use, for non-commercial purposes is granted.
Permission to modify for personal or internal use is granted,
provided this copyright and disclaimer are included unchanged at
the beginning of the file. All other rights are reserved.
Logarithmically rebin a spectrum, while rigorously conserving the
flux. Basically the photons in the spectrum are simply
ridistributed according to a new grid of pixels, with non-uniform
size in the spectral direction.
This routine makes the `standard' zero-order assumption that the
spectrum is *constant* within each pixels. It is possible to perform
log-rebinning by assuming the spectrum is represented by a
piece-wise polynomial of higer degree, while still obtaining a
uniquely defined linear problem, but this reduces to a deconvolution
and amplifies noise.
.. warning::
This assumption can be poor for sharp features in the spectrum.
Beware if resampling spectra with strong, marginally sampled
features!
This same routine can be used to compute approximate errors of the
log-rebinned spectrum. To do this type the command
>>> err2New, logLam, velscale = log_rebin(lamRange, np.square(err))
and the desired errors will be given by np.sqrt(err2New).
.. warning::
This rebinning of the error-spectrum is very *approximate* as it
does not consider the correlation introduced by the rebinning!
Args:
lamRange (np.ndarray): two elements vector containing the
central wavelength of the first and last pixels in the
spectrum, which is assumed to have constant wavelength
scale! E.g. from the values in the standard FITS keywords:
LAMRANGE = CRVAL1 + [0,CDELT1*(NAXIS1-1)]. It must be
LAMRANGE[0] < LAMRANGE[1].
spec (np.ndarray): Input spectrum.
oversample (int): (Optional) Oversampling can be done, not to
loose spectral resolution, especally for extended wavelength
ranges and to avoid aliasing. Default is to provide the
same number of output pixels as input.
velscale (float): Velocity scale in km/s per pixels. If this
variable is not defined, then it will contain in output the
velocity scale. If this variable is defined by the user it
will be used to set the output number of pixels and
wavelength scale.
flux (bool): (Optional) Set this keyword to preserve total flux.
In this case the log rebinning changes the pixels flux in
proportion to their dLam so the following command will show
large differences beween the spectral shape before and after
:func:`log_rebin`::
# Plot log-rebinned spectrum
pyplot.plot(exp(logLam), specNew)
pyplot.plot(np.arange(lamRange[0],lamRange[1],spec.size), spec, 'g')
pyplot.show()
By default, when this keyword is *not* set, the above two
lines produce two spectra that almost perfectly overlap each
other.
log10 (bool): (Optional) Flag that the spectrum should be binned
in units of base-10 log wavelength, instead of natural log
newRange (np.ndarray): (Optional) Force the spectrum to be
sampled to a new spectral range (lamRange is the *existing*
spectral range).
wave_in_ang (bool): (Optional) Return the wavelength coordinates
in angstroms, not log(angstroms)
unobs (float): (Optional) Default value for unobserved spectral
regions.
Returns:
np.ndarray, float : Returns three variables: logarithmically
rebinned spectrum, the log of the wavelength at the
geometric center of each pixel, and the velocity scale of
each pixel in km/s.
Raises:
ValueError: Raised if the input spectrum is not a
one-dimensional np.ndarray.
*Modification History*:
| **V1.0.0**: Using interpolation. Michele Cappellari, Leiden,
22 October 2001
| **V2.0.0**: Analytic flux conservation. MC, Potsdam, 15 June
2003
| **V2.1.0**: Allow a velocity scale to be specified by the
user. MC, Leiden, 2 August 2003
| **V2.2.0**: Output the optional logarithmically spaced
wavelength at the geometric mean of the wavelength at the
border of each pixel. Thanks to Jesus Falcon-Barroso. MC,
Leiden, 5 November 2003
| **V2.2.1**: Verify that lamRange[0] < lamRange[1]. MC,
Vicenza, 29 December 2004
| **V2.2.2**: Modified the documentation after feedback from
James Price. MC, Oxford, 21 October 2010
| **V2.3.0**: By default now preserve the shape of the spectrum,
not the total flux. This seems what most users expect from
the procedure. Set the keyword /FLUX to preserve flux like
in previous version. MC, Oxford, 30 November 2011
| **V3.0.0**: Translated from IDL into Python. MC, Santiago, 23
November 2013
| **V3.1.0**: Fully vectorized log_rebin. Typical speed up by
two orders of magnitude. MC, Oxford, 4 March 2014
| **05 Jun 2015**: (K. Westfall, KBW) Pulled from ppxf_util.py.
Conform to mangadap documentation standard. Transcribe
edits made to IDL version that provides for the log10 and
newRange arguments. Add option to return wavelength in
angstroms, not log(angstroms). Break out determination of
input and output spectrum pixel coordinates to a new
function, :func:`log_rebin_pix`. Added default value for
unobserved pixels. Default behavior unchanged.
.. todo::
- Allow to resample an already geometrically binned spectrum
"""
lamRange = np.asarray(lamRange)
if type(spec) != np.ndarray:
raise ValueError('Input spectrum must be a np.ndarray')
s = spec.shape
if len(s) != 1:
raise ValueError('input spectrum must be a vector')
n = s[0]
# This is broken out into a separate procedure so that it can be
# called to determine the size of the rebinned spectra without
# actually doing the rebinning
dLam, m, logscale, velscale = \
log_rebin_pix(lamRange, n, oversample=oversample, velscale=velscale, log10=log10,
newRange=newRange)
# Get the sampling of the existing spectrum
lim = lamRange/dLam + [-0.5, 0.5] # All in units of dLam
borders = np.linspace(*lim, num=n+1) # Linearly sampled pixels
# Set limits to a new wavelength range
if newRange is not None:
lim = np.asarray(newRange)/dLam + [-0.5, 0.5]
# Set the limits to the (base-10 or natural) log of the wavelength
logLim = np.log(lim) if not log10 else np.log10(lim)
logLim[1] = logLim[0] + m*logscale # Set last wavelength, based on integer # of pixels
# Geometrically spaced pixel borders for the new spectrum
newBorders = np.power(10., np.linspace(*logLim, num=m+1)) if log10 else \
np.exp(np.linspace(*logLim, num=m+1))
# Get the new spectrum by performing an analytic integral
k = (newBorders - borders[0]).clip(0, n-1).astype(int)
specNew = np.add.reduceat(spec, k)[:-1]
specNew *= np.diff(k) > 0 # fix for design flaw of reduceat()
specNew += np.diff((newBorders - borders[k])*spec[k])
# Don't conserve the flux
if not flux:
specNew /= np.diff(newBorders)
# Output log(wavelength): log of geometric mean
LamNew = np.sqrt(newBorders[1:]*newBorders[:-1])*dLam
# Set values for unobserved regions
if newRange is not None and (newRange[0] < lamRange[0] or newRange[1] > lamRange[1]):
specNew[ (LamNew < lamRange[0]) | (LamNew > lamRange[1]) ] = unobs
# Return log(wavelength), if requested
if not wave_in_ang:
LamNew = np.log10(LamNew) if log10 else np.log(LamNew)
# Return spectrum, wavelength coordinates, and pixel size in km/s
return specNew, LamNew, velscale
[docs]def log_rebin_pix(lamRange, n, oversample=None, velscale=None, log10=False, newRange=None):
"""
Determine the number of new pixels and their coordinate step when
rebinning a spectrum in geometrically stepped bins. The input
spectrum must be sampled linearly in wavelength. This is primarily
a support routine for :func:`log_rebin`.
Although breaking this out from the main :func:`log_rebin` function
leads to a few repeat calculations in that function, the use of this
function is in determine a common wavelength range for a large
number of spectra before resampling the spectra themselves. See
:class:`mangadap.TplLibrary` .
Args:
lamRange (np.ndarray): two elements vector containing the
central wavelength of the first and last pixels in the
spectrum, which is assumed to have constant wavelength
scale! E.g. from the values in the standard FITS keywords:
LAMRANGE = CRVAL1 + [0,CDELT1*(NAXIS1-1)]. It must be
LAMRANGE[0] < LAMRANGE[1].
n (int): Number of pixels in the original spectrum.
oversample (int): (Optional) Oversampling can be done, not to
loose spectral resolution, especally for extended wavelength
ranges and to avoid aliasing. Default is to provide the
same number of output pixels as input.
velscale (float): Velocity scale in km/s per pixels. If this
variable is not defined, then it will contain in output the
velocity scale. If this variable is defined by the user it
will be used to set the output number of pixels and
wavelength scale.
log10 (bool): (Optional) Flag that the spectrum should be binned
in units of base-10 log wavelength, instead of natural log
newRange (np.ndarray): (Optional) Force the spectrum to be
sampled to a new spectral range (lamRange is the *existing*
spectral range).
Returns:
float, int : Returns (1) the linear wavelength step of each
pixel in the input spectrum, (2) the number of pixels for
the rebinned spectrum, (3) the log-linear wavelength step
for each pixel in the new spectrum, (4) the velocity step
for each pixel in the new spectrum.
Raises:
ValueError: Raised if the input wavelength range (*lamRange* or
*newRange*) does not have two elements or is not sorted.
"""
lamRange = np.asarray(lamRange)
if len(lamRange) != 2:
raise ValueError('lamRange must contain two elements')
if lamRange[0] >= lamRange[1]:
raise ValueError('It must be lamRange[0] < lamRange[1]')
# Size of output spectrum
m = int(n) if oversample is None else int(n*oversample)
# Get the sampling of the existing spectrum
dLam = np.diff(lamRange)/(n - 1.) # Assume constant dLam
lim = lamRange/dLam + [-0.5, 0.5] # All in units of dLam
# Get the sampling for the new spectrum, if requested to be
# different
if newRange is not None:
newRange = np.asarray(newRange)
if len(newRange) != 2:
raise ValueError('newRange must contain two elements')
if newRange[0] >= newRange[1]:
raise ValueError('It must be newRange[0] < newRange[1]')
lim = newRange/dLam + [-0.5, 0.5] # Set limits to a new wavelength range
# Adjust the length
nn = int((lamRange[1]-lamRange[0]-newRange[1]+newRange[0])/dLam)
m = m-nn if oversample is None else m-nn*oversample
# Set the limits to the (base-10 or natural) log of the wavelength
logLim = np.log(lim) if not log10 else np.log10(lim)
# Set the velocity scale, if velscale not provided; otherwise force
# the sampling based in the input velscale
if velscale is None: # Velocity scale is not set by user
velscale = np.diff(logLim)[0]/m*c # Only for output
if log10:
velscale *= np.log(10.) # Adjust to log base-10
logscale = velscale/c # dlambda/lambda = dln(lambda)
if log10:
logscale /= np.log(10.) # Convert to dlog10(lambda)
m = int(np.diff(logLim)/logscale) # Number of output pixels
return dLam, m, logscale, velscale
[docs]def downgrade(wave, flux, deltal_in, sigma_galaxy, wave_instrument, r_instrument):
"""
Adapted from the manga DAP downgrader from Kyle Westfall.
Downgrades an input spectrum to a given galaxy velocity dispersion
using the input SEDs resolution and the resolution of the observation.
Returns flux of downgraded SED.
"""
sig2fwhm = 2.0 * np.sqrt(2.0 * np.log(2.0))
fwhm = deltal_in/wave*c
sigma = fwhm/sig2fwhm
sres = wave/deltal_in
new_sig = np.zeros(wave.shape, dtype=np.float64)
# match wavelength between model and instrument to downgrade
def find_nearest(array,value):
idx = (np.abs(array-value)).argmin()
return idx,array[idx]
for wi,w in enumerate(wave):
index, value = find_nearest(wave_instrument,w)
sig_instrument = c/r_instrument[index]/sig2fwhm
new_sig[wi] = np.sqrt(sigma_galaxy**2.0 +sig_instrument**2.0)
new_fwhm = sig2fwhm * new_sig
new_sres = c / new_fwhm
if len(wave)<5:
raise ValueError("Not enough wavelength points...!")
a_wave = wave[2]-wave[1]
b_wave = wave[3]-wave[2]
if b_wave-a_wave < 0.000001*a_wave:
log_wave = False
else:
log_wave = True
new_flux, matched_sres, sigma_offset, new_mask = match_spectral_resolution(wave, flux, sres, wave, new_sres, min_sig_pix=0.0, log10=log_wave, new_log10=log_wave)
return new_flux