Source code for specutils.manipulation.extract_spectral_region

import sys

from math import floor, ceil  # faster than int(np.floor/ceil(float))

import numpy as np

from astropy import units as u
from ..spectra import Spectrum1D, SpectralRegion

__all__ = ['extract_region', 'extract_bounding_spectral_region', 'spectral_slab']


def _edge_value_to_pixel(edge_value, spectrum, order, side):
    spectral_axis = spectrum.spectral_axis

    if order == 'ascending':
        index = np.searchsorted(spectral_axis, edge_value, side=side)
        return index

    elif order == 'descending':

        if side == 'left':
            opposite_side = 'right'
        else:
            opposite_side = 'left'

        index = len(spectral_axis) - np.searchsorted(spectral_axis[::-1], edge_value, side=opposite_side)
        return index


def _subregion_to_edge_pixels(subregion, spectrum):
    """
    Calculate and return the left and right indices defined
    by the lower and upper bounds and based on the input
    `~specutils.spectra.spectrum1d.Spectrum1D`. The left and right indices will
    be returned.

    Parameters
    ----------
    spectrum: `~specutils.spectra.spectrum1d.Spectrum1D`
        The spectrum object from which the region will be extracted.

    Returns
    -------
    left_index, right_index: int, int
        Left and right indices defined by the lower and upper bounds.

    """
    spectral_axis = spectrum.spectral_axis
    if spectral_axis[-1] > spectral_axis[0]:
        order = "ascending"
        left_func = min
        right_func = max
    else:
        order = "descending"
        left_func = max
        right_func = min

    # Left/lower side of sub region
    if subregion[0].unit.is_equivalent(u.pix):
        # Pixel handling assumes always ascending
        if not spectral_axis.unit.is_equivalent(u.pix):
            left_index = floor(subregion[0].value)
        else:
            # If the lower bound is larger than the largest value, immediately return nothing
            # Assuming ascending, both bounds are "out of bounds"
            if subregion[0] > spectral_axis[-1]:
                return None, None
            else:
                # Get index of closest value
                # See https://stackoverflow.com/a/26026189 if performance becomes an issue
                left_index = np.nanargmin((np.abs(spectral_axis - subregion[0])))
                # Ensure index is inclusive of region bounds
                if (spectral_axis[left_index] > subregion[0]) and (left_index >= 1):
                    left_index -= 1
    else:
        # Convert lower value to spectrum spectral_axis units
        left_reg_in_spec_unit = left_func(subregion).to(spectral_axis.unit,
                                                        u.spectral())
        left_index = _edge_value_to_pixel(left_reg_in_spec_unit, spectrum, order, "left")

    # Right/upper side of sub region
    if subregion[1].unit.is_equivalent(u.pix):
        # Pixel handling assumes always ascending
        if not spectral_axis.unit.is_equivalent(u.pix):
            right_index = ceil(subregion[1].value)
        else:
            # If upper bound is smaller than smallest value, immediately return nothing
            # Assuming ascending, both bounds are "out of bounds"
            if subregion[1] < spectral_axis[0]:
                return None, None
            else:
                # Get index of closest value
                # See https://stackoverflow.com/a/26026189 if performance becomes an issue
                right_index = np.nanargmin((np.abs(spectral_axis - subregion[1])))
                # Ensure index is inclusive of region bounds
                if (spectral_axis[right_index] < subregion[1]) and (right_index < len(spectral_axis)):
                    right_index += 1
    else:
        # Convert upper value to spectrum spectral_axis units
        right_reg_in_spec_unit = right_func(subregion).to(spectral_axis.unit,
                                                 u.spectral())

        right_index = _edge_value_to_pixel(right_reg_in_spec_unit, spectrum, order, "right")

    return left_index, right_index


[docs] def extract_region(spectrum, region, return_single_spectrum=False): """ Extract a region from the input `~specutils.Spectrum1D` defined by the lower and upper bounds defined by the ``region`` instance. The extracted region will be returned as a new `~specutils.Spectrum1D`. Parameters ---------- spectrum: `~specutils.Spectrum1D` The spectrum object from which the region will be extracted. region: `~specutils.SpectralRegion` The spectral region to extract from the original spectrum. return_single_spectrum: `bool` If ``region`` has multiple sections, whether to return a single spectrum instead of multiple `~specutils.Spectrum1D` objects. The returned spectrum will be a unique, concatenated, spectrum of all sub-regions. Returns ------- spectrum: `~specutils.Spectrum1D` or list of `~specutils.Spectrum1D` Excised spectrum, or list of spectra if the input region contained multiple subregions and ``return_single_spectrum`` is `False`. Notes ----- The region extracted is a discrete subset of the input spectrum. No interpolation is done on the left and right side of the spectrum. The region is assumed to be a closed interval (as opposed to Python which is open on the upper end). For example: Given: A ``spectrum`` with spectral_axis of ``[0.1, 0.2, 0.3, 0.4, 0.5, 0.6]*u.um``. A ``region`` defined as ``SpectralRegion(0.2*u.um, 0.5*u.um)`` And we calculate ``sub_spectrum = extract_region(spectrum, region)``, then the ``sub_spectrum`` spectral axis will be ``[0.2, 0.3, 0.4, 0.5] * u.um``. If the ``region`` does not overlap with the ``spectrum`` then an empty Spectrum1D object will be returned. """ extracted_spectrum = [] for subregion in region._subregions: left_index, right_index = _subregion_to_edge_pixels(subregion, spectrum) # If both indices are out of bounds then return an empty spectrum if left_index == right_index: empty_spectrum = Spectrum1D(spectral_axis=[]*spectrum.spectral_axis.unit, flux=[]*spectrum.flux.unit) extracted_spectrum.append(empty_spectrum) else: extracted_spectrum.append(spectrum[..., left_index:right_index]) # If there is only one subregion in the region then we will # just return a spectrum. if len(region) == 1: extracted_spectrum = extracted_spectrum[0] # Otherwise, if requested to return a single spectrum, we need to combine # the spectrum1d objects in extracted_spectrum and return a single object. elif return_single_spectrum: concat_keys = ['flux', 'uncertainty', 'mask'] # spectral_axis handled manually copy_keys = ['velocity_convention', 'rest_value', 'meta'] # NOTE: WCS is intentionally dropped, which will then fallback on lookup def _get_joined_value(sps, key, unique_inds=None): if key == 'uncertainty': # uncertainty cannot be appended directly as its an object, # not an array so instead we'll take a copy of the first entry # and overwrite the internal array with an appended array uncert = sps[0].uncertainty if uncert is None: return None uncert._array = np.concatenate([sp.uncertainty._array for sp in sps]) return uncert[unique_inds] if unique_inds is not None else uncert elif key in concat_keys or key == 'spectral_axis': if getattr(sps[0], key) is None: return None concat_arr = np.concatenate([getattr(sp, key) for sp in sps]) return concat_arr[unique_inds] if unique_inds is not None else concat_arr else: # all were extracted from the same input spectrum, so we don't # need to make sure the properties match return getattr(sps[0], key) # we'll need to account for removing overlapped regions in the spectral axis, # so we'll concatenate that first and track the unique indices spectral_axis = _get_joined_value(extracted_spectrum, 'spectral_axis') spectral_axis_unique, unique_inds = np.unique(spectral_axis, return_index=True) return Spectrum1D(spectral_axis=spectral_axis_unique, **{key: _get_joined_value(extracted_spectrum, key, unique_inds) for key in concat_keys+copy_keys}) return extracted_spectrum
[docs] def spectral_slab(spectrum, lower, upper): """ Extract a slab from the input `~specutils.Spectrum1D` defined by the lower and upper bounds defined by the ``region`` instance. The extracted region will be returned as a new `~specutils.Spectrum1D`. Parameters ---------- spectrum: `~specutils.Spectrum1D` The spectrum object from which the region will be extracted. lower, upper: `~astropy.units.Quantity` The lower and upper bounds of the region to extract from the original spectrum. Returns ------- spectrum: `~specutils.Spectrum1D` or list of `~specutils.Spectrum1D` Excised spectrum, or list of spectra if the input region contained multiple subregions. Notes ----- This is for now just a proxy for function `extract_region`, to ease the transition from spectral-cube. """ region = SpectralRegion(lower, upper) return extract_region(spectrum, region)
[docs] def extract_bounding_spectral_region(spectrum, region): """ Extract the entire bounding region that encompasses all sub-regions contained in a multi-sub-region instance of `~specutils.SpectralRegion`. In case only one sub-region exists, this method is equivalent to `extract_region`. Parameters ---------- spectrum: `~specutils.Spectrum1D` The spectrum object from which the region will be extracted. region: `~specutils.SpectralRegion` The spectral region to extract from the original spectrum, comprised of one or more sub-regions. Returns ------- spectrum: `~specutils.Spectrum1D` Excised spectrum from the bounding region defined by the set of sub-regions in the input ``region`` instance. """ # If there is only one subregion in the region then we will # just return a spectrum. if len(region) == 1: return extract_region(spectrum, region) min_left = sys.maxsize max_right = -sys.maxsize - 1 # Look for indices that bound the entire set of sub-regions. index_list = [_subregion_to_edge_pixels(sr, spectrum) for sr in region._subregions] for left_index, right_index in index_list: if left_index is not None: min_left = min(left_index, min_left) if right_index is not None: max_right = max(right_index, max_right) # If both indices are out of bounds then return an empty spectrum if min_left is None and max_right is None: empty_spectrum = Spectrum1D(spectral_axis=[]*spectrum.spectral_axis.unit, flux=[]*spectrum.flux.unit) return empty_spectrum else: # If only one index is out of bounds then set it to # the lower or upper extent if min_left is None: min_left = 0 if max_right is None: max_right = len(spectrum.spectral_axis) if min_left > max_right: min_left, max_right = max_right, min_left return spectrum[..., min_left:max_right]