import astropy.units as u
import numpy as np
from astropy import constants as const
from astropy.nddata import StdDevUncertainty
from astropy.units import Quantity
from scipy.signal.windows import tukey
from scipy.signal import correlate
from ..manipulation import LinearInterpolatedResampler
from .. import Spectrum1D
__all__ = ['template_correlate', 'template_logwl_resample']
_KMS = u.Unit('km/s') # for use below without having to create a composite unit
[docs]
def template_correlate(observed_spectrum, template_spectrum, lag_units=_KMS,
apodization_window=0.5, resample=True, method="direct"):
"""
Compute cross-correlation of the observed and template spectra.
After re-sampling into log-wavelength, both observed and template
spectra are apodized by a Tukey window in order to minimize edge
and consequent non-periodicity effects and thus decrease
high-frequency power in the correlation function. To turn off the
apodization, use alpha=0.
Parameters
----------
observed_spectrum : :class:`~specutils.Spectrum1D`
The observed spectrum.
template_spectrum : :class:`~specutils.Spectrum1D`
The template spectrum, which will be correlated with
the observed spectrum.
lag_units: `~astropy.units.Unit`
Must be a unit with velocity physical type for lags in velocity. To
output the lags in redshift, use ``u.dimensionless_unscaled``.
apodization_window: float, callable, or None
If a callable, will be treated as a window function for apodization of
the cross-correlation (should behave like a `~scipy.signal.windows`
window function, with ``sym=True``). If a float, will be treated as the
``alpha`` parameter for a Tukey window (`~scipy.signal.windows.tukey`),
in units of pixels. If None, no apodization will be performed
resample: bool or dict
If True or a dictionary, resamples the spectrum and template following
the process in `template_logwl_resample`. If a dictionary, it will be
used as the keywords for `template_logwl_resample`. For example,
``resample=dict(delta_log_wavelength=.1)`` would be the same as calling
``template_logwl_resample(spectrum, template, delta_log_wavelength=.1)``.
If False, *no* resampling is performed (and the user is responsible for
a sensible resampling).
method: str
If you choose "FFT", the correlation will be done through the use
of convolution and will be calculated faster (for small spectral
resolutions it is often correct), otherwise the correlation is determined
directly from sums (the "direct" method in `~scipy.signal.correlate`).
Returns
-------
(`~astropy.units.Quantity`, `~astropy.units.Quantity`)
Arrays with correlation values and lags in km/s
"""
# resample if the user requested to log wavelength
if resample:
if resample is True:
resample_kwargs = dict() # use defaults
else:
resample_kwargs = resample
log_spectrum, log_template = template_logwl_resample(observed_spectrum,
template_spectrum,
**resample_kwargs)
else:
log_spectrum = observed_spectrum
log_template = template_spectrum
# apodize (might be a no-op if apodization_window is None)
observed_log_spectrum, template_log_spectrum = _apodize(log_spectrum,
log_template,
apodization_window)
# Normalize template
normalization = _normalize(observed_log_spectrum, template_log_spectrum)
# Not sure if we need to actually normalize the template. Depending
# on the specific data uncertainty, the normalization factor
# may turn out negative. That causes a flip of the correlation function,
# in which the maximum (correlation peak) is no longer meaningful.
if normalization < 0.:
normalization = 1.
# Correlate
if method.lower() == "fft":
corr = correlate(observed_log_spectrum.flux.value,
(template_log_spectrum.flux.value * normalization),
method="fft")
else:
corr = correlate(observed_log_spectrum.flux.value,
(template_log_spectrum.flux.value * normalization),
method="direct")
# Compute lag
# wave_l is the wavelength array equally spaced in log space.
wave_l = observed_log_spectrum.spectral_axis.value
delta_log_wave = np.log10(wave_l[1]) - np.log10(wave_l[0])
deltas = (np.array(range(len(corr))) - len(corr)/2 + 0.5) * delta_log_wave
lags = np.power(10., deltas) - 1.
if u.dimensionless_unscaled.is_equivalent(lag_units):
lags = Quantity(lags, u.dimensionless_unscaled)
elif _KMS.is_equivalent(lag_units):
lags = lags * const.c.to(lag_units)
else:
raise u.UnitsError('lag_units must be either velocity or dimensionless')
return corr * u.dimensionless_unscaled, lags
def _apodize(spectrum, template, apodization_window):
# Apodization. Must be performed after resampling.
if apodization_window is None:
clean_spectrum = spectrum
clean_template = template
else:
if callable(apodization_window):
window = apodization_window
else:
def window(wlen):
return tukey(wlen, alpha=apodization_window)
clean_spectrum = spectrum * window(len(spectrum.spectral_axis))
clean_template = template * window(len(template.spectral_axis))
return clean_spectrum, clean_template
[docs]
def template_logwl_resample(spectrum, template, wblue=None, wred=None,
delta_log_wavelength=None,
resampler=LinearInterpolatedResampler()):
"""
Resample a spectrum and template onto a common log-spaced spectral grid.
If wavelength limits are not provided, the function will use
the limits of the merged (observed+template) wavelength scale
for building the log-wavelength scale.
For the wavelength step, the function uses either the smallest wavelength
interval found in the *observed* spectrum, or takes it from the
``delta_log_wavelength`` parameter.
Parameters
----------
observed_spectrum : :class:`~specutils.Spectrum1D`
The observed spectrum.
template_spectrum : :class:`~specutils.Spectrum1D`
The template spectrum.
wblue, wred: float
Wavelength limits to include in the correlation.
delta_log_wavelength: float
Log-wavelength step to use to build the log-wavelength
scale. If None, use limits defined as explained above.
resampler
A specutils resampler to use to actually do the resampling. Defaults to
using a `~specutils.manipulation.LinearInterpolatedResampler`.
Returns
-------
resampled_observed : :class:`~specutils.Spectrum1D`
The observed spectrum resampled to a common spectral_axis.
resampled_template: :class:`~specutils.Spectrum1D`
The template spectrum resampled to a common spectral_axis.
"""
# Build an equally-spaced log-wavelength array based on
# the input and template spectrum's limit wavelengths and
# smallest sampling interval. Consider only the observed spectrum's
# sampling, since it's the one that counts for the final accuracy
# of the correlation. Alternatively, use the wred and wblue limits,
# and delta log wave provided by the user.
#
# We work with separate float and units entities instead of Quantity
# instances, due to the profusion of log10 and power function calls
# (they only work on floats)
if wblue:
w0 = np.log10(wblue)
else:
ws0 = np.log10(spectrum.spectral_axis[0].value)
wt0 = np.log10(template.spectral_axis[0].value)
w0 = min(ws0, wt0)
if wred:
w1 = np.log10(wred)
else:
ws1 = np.log10(spectrum.spectral_axis[-1].value)
wt1 = np.log10(template.spectral_axis[-1].value)
w1 = max(ws1, wt1)
if delta_log_wavelength is None:
ds = np.log10(spectrum.spectral_axis.value[1:]) - np.log10(spectrum.spectral_axis.value[:-1])
dw = ds[np.argmin(ds)]
else:
dw = delta_log_wavelength
nsamples = int((w1 - w0) / dw)
log_wave_array = np.ones(nsamples) * w0
for i in range(nsamples):
log_wave_array[i] += dw * i
# Build the corresponding wavelength array
wave_array = np.power(10., log_wave_array) * spectrum.spectral_axis.unit
# Resample spectrum and template into wavelength array so built
resampled_spectrum = resampler(spectrum, wave_array)
resampled_template = resampler(template, wave_array)
# Resampler leaves Nans on flux bins that aren't touched by it.
# We replace with zeros. This has the net effect of zero-padding
# the spectrum and/or template so they exactly match each other,
# wavelengthwise.
clean_spectrum_flux = np.nan_to_num(resampled_spectrum.flux.value) * resampled_spectrum.flux.unit
clean_template_flux = np.nan_to_num(resampled_template.flux.value) * resampled_template.flux.unit
clean_spectrum = Spectrum1D(spectral_axis=resampled_spectrum.spectral_axis,
flux=clean_spectrum_flux,
uncertainty=resampled_spectrum.uncertainty,
velocity_convention='optical',
rest_value=spectrum.rest_value)
clean_template = Spectrum1D(spectral_axis=resampled_template.spectral_axis,
flux=clean_template_flux,
uncertainty=resampled_template.uncertainty,
velocity_convention='optical',
rest_value=template.rest_value)
return clean_spectrum, clean_template
def _normalize(observed_spectrum, template_spectrum):
"""
Calculate a scale factor to be applied to the template spectrum so the
total flux in both spectra will be the same.
Parameters
----------
observed_spectrum : :class:`~specutils.Spectrum1D`
The observed spectrum.
template_spectrum : :class:`~specutils.Spectrum1D`
The template spectrum, which needs to be normalized in order to be
compared with the observed spectrum.
Returns
-------
`float`
A float which will normalize the template spectrum's flux so that it
can be compared to the observed spectrum.
"""
if hasattr(observed_spectrum, "uncertainty") and observed_spectrum.uncertainty is not None:
unc = observed_spectrum.uncertainty.represent_as(StdDevUncertainty)
num = np.nansum((observed_spectrum.flux*template_spectrum.flux)/(unc.array**2))
denom = np.nansum((template_spectrum.flux/unc.array)**2)
else:
num = np.nansum(observed_spectrum.flux*template_spectrum.flux)
denom = np.nansum(template_spectrum.flux**2)
return num/denom