Source code for rail.core.utilPhotometry

"""
Module that implements operations on photometric data such as magnitudes and fluxes.
"""
from abc import ABC, abstractmethod

import numpy as np
import pandas as pd

from ceci.config import StageParameter as Param
from rail.core.data import PqHandle
from rail.core.stage import RailStage

import hyperbolic  # https://github.com/jlvdb/hyperbolic


# default column names in DC2
LSST_BANDS = 'ugrizy'
DEFAULT_MAG_COLS = [f"mag_{band}_lsst" for band in LSST_BANDS]
DEFAULT_MAGERR_COLS = [f"mag_err_{band}_lsst" for band in LSST_BANDS]


def _compute_flux(magnitude, zeropoint):
    """
    Compute the flux corresponding to a given magnitude and photometric zeropoint.

    Parameters
    ----------
    magnitude : array-like
        Magnitude or array of magnitudes.
    zeropoint : array-like
        Photometric zeropoint used for conversion.

    Returns
    -------
    flux : array-like
        Flux value(s).
    """
    flux = np.exp((zeropoint - magnitude) / hyperbolic.pogson)
    return flux


def _compute_flux_error(flux, magnitude_error):
    """
    Compute the flux error corresponding to a given flux and magnitude error.

    Parameters
    ----------
    flux : array-like
        Flux or array of fluxes.
    magnitude_error : array-like
        Magnitude error or array of magnitude errors.

    Returns
    -------
    flux_error : array-like
        Flux error value(s).
    """
    flux_error = magnitude_error / hyperbolic.pogson * flux
    return flux_error


[docs] class PhotormetryManipulator(RailStage, ABC): """ Base class to perform opertations on magnitudes. A table with input magnitudes and errors is processed and transformed into an output table with new magnitudes and errors. Subclasses must implement the run() and compute() method. """ name = 'PhotormetryManipulator' config_options = RailStage.config_options.copy() config_options.update( value_columns=Param( list, default=DEFAULT_MAG_COLS, msg="list of columns that prove photometric measurements (fluxes or magnitudes)"), error_columns=Param( list, default=DEFAULT_MAGERR_COLS, msg="list of columns with errors corresponding to value_columns " "(assuming same ordering)"), zeropoints=Param( list, default=[], required=False, msg="optional list of magnitude zeropoints for value_columns " "(assuming same ordering, defaults to 0.0)"), is_flux=Param( bool, default=False, msg="whether the provided quantities are fluxes or magnitudes")) inputs = [('input', PqHandle)] outputs = [('output', PqHandle)] def __init__(self, args, comm=None): super().__init__(args, comm) self._check_config() # convenience remapping of parameters self.value_columns = self.config.value_columns self.error_columns = self.config.error_columns self.zeropoints = self.config.zeropoints self.n_col = len(self.value_columns) def _check_config(self): # compare column definitions n_mag = len(self.config.value_columns) n_err = len(self.config.error_columns) n_zpt = len(self.config.zeropoints) if n_mag != n_err: raise IndexError( f"number of magnitude and error columns do not match ({n_mag} != {n_err})") # check and zeropoints or parse default value if n_zpt == 0: self.config.zeropoints = [0.0] * n_mag elif n_zpt != n_mag: raise IndexError( f"number of zeropoints and magnitude columns do not match ({n_zpt} != {n_mag})")
[docs] def get_as_fluxes(self): """ Loads specified photometric data as fluxes, converting magnitudes on the fly. """ input_data = self.get_data('input', allow_missing=True) if self.config.is_flux: data = input_data[self.value_columns + self.error_columns] else: data = pd.DataFrame() # convert magnitudes to fluxes for val_name, zeropoint in zip(self.value_columns, self.zeropoints): data[val_name] = _compute_flux( input_data[val_name], zeropoint=zeropoint) # compute flux errors from magnitude errors for val_name, err_name in zip(self.value_columns, self.error_columns): data[err_name] = _compute_flux_error( data[val_name], input_data[err_name]) return data
[docs] @abstractmethod def run(self): # pragma: no cover """ Implements the operation performed on the photometric data. """ data = self.get_as_fluxes() # do work self.add_data('output', data)
[docs] @abstractmethod def compute(self, data): # pragma: no cover """ Main method to call. Parameters ---------- data : `PqHandle` Input tabular data with column names as defined in the configuration. Returns ------- output: `PqHandle` Output tabular data. """ self.set_data('input', data) self.run() self.finalize() return self.get_handle('output')
[docs] class HyperbolicSmoothing(PhotormetryManipulator): """ Initial stage to compute hyperbolic magnitudes (Lupton et al. 1999). Estimates the smoothing parameter b that is used by the second stage (`HyperbolicMagnitudes`) to convert classical to hyperbolic magnitudes. """ name = 'HyperbolicSmoothing' config_options = PhotormetryManipulator.config_options.copy() inputs = [('input', PqHandle)] outputs = [('parameters', PqHandle)]
[docs] def run(self): """ Computes the smoothing parameter b (see Lupton et al. 1999) per photometric band. """ # get input data data = self.get_as_fluxes() fields = np.zeros(len(data), dtype=int) # placeholder # compute the optimal smoothing factor b for each photometric band stats = [] for fx_col, fxerr_col, zeropoint in zip( self.value_columns, self.error_columns, self.zeropoints): # compute the median flux error and zeropoint stats_filt = hyperbolic.compute_flux_stats( data[fx_col], data[fxerr_col], fields, zeropoint=zeropoint) # compute the smoothing parameter b (in normalised flux) stats_filt[hyperbolic.Keys.b] = hyperbolic.estimate_b( stats_filt[hyperbolic.Keys.zp], stats_filt[hyperbolic.Keys.flux_err]) # compute the smoothing parameter b (in absolute flux) stats_filt[hyperbolic.Keys.b_abs] = ( stats_filt[hyperbolic.Keys.ref_flux] * stats_filt[hyperbolic.Keys.b]) # collect results stats_filt[hyperbolic.Keys.filter] = fx_col stats_filt = stats_filt.reset_index().set_index([ hyperbolic.Keys.filter, hyperbolic.Keys.field]) stats.append(stats_filt) # store resulting smoothing parameters for next stage self.add_data('parameters', pd.concat(stats))
[docs] def compute(self, data): """ Main method to call. Computes the set of smoothing parameters (b) for an input catalogue with classical photometry and their respective errors. These parameters are required by the follow-up stage `HyperbolicMagnitudes` and are parsed as tabular data. Parameters ---------- data : `PqHandle` Input table with magnitude and magnitude error columns as defined in the configuration. Returns ------- parameters : `PqHandle` Table with smoothing parameters per photometric band and additional meta data. """ self.set_data('input', data) self.run() self.finalize() return self.get_handle('parameters')
[docs] class HyperbolicMagnitudes(PhotormetryManipulator): """ Convert a set of classical magnitudes to hyperbolic magnitudes (Lupton et al. 1999). Requires input from the initial stage (`HyperbolicSmoothing`) to supply optimal values for the smoothing parameters (b). """ name = 'HyperbolicMagnitudes' config_options = PhotormetryManipulator.config_options.copy() inputs = [('input', PqHandle), ('parameters', PqHandle)] outputs = [('output', PqHandle)] def _check_filters(self, stats_table): """ Check whether the column definition matches the loaded smoothing parameters. Parameters: ----------- stats_table : `pd.DataFrame` Data table that contains smoothing parameters per photometric band (from `HyperbolicSmoothing`). Raises ------ KeyError : Filter defined in magnitude_columns is not found in smoothing parameter table. """ # filters in the parameter table param_filters = set(stats_table.reset_index()[hyperbolic.Keys.filter]) # filters parsed through configuration config_filters = set(self.value_columns) # check if the filters match filter_diff = config_filters - param_filters if len(filter_diff) != 0: strdiff = ", ".join(sorted(filter_diff)) raise KeyError(f"parameter table contains no smoothing parameters for: {strdiff}")
[docs] def run(self): """ Compute hyperbolic magnitudes and their error based on the parameters determined by `HyperbolicSmoothing`. """ # get input data data = self.get_as_fluxes() stats = self.get_data('parameters', allow_missing=True) self._check_filters(stats) fields = np.zeros(len(data), dtype=int) # placeholder for variable field/pointing depth # intialise the output data output = pd.DataFrame(index=data.index) # allows joining on input # compute smoothing parameter b b = stats[hyperbolic.Keys.b].groupby( # median flux error in each filter hyperbolic.Keys.filter).agg(np.nanmedian) b = b.to_dict() # hyperbolic magnitudes for val_col, err_col in zip(self.value_columns, self.error_columns): # get the smoothing parameters stats_filt = hyperbolic.fill_missing_stats(stats.loc[val_col]) # map reference flux from fields/pointings to sources ref_flux_per_source = hyperbolic.fields_to_source( stats_filt[hyperbolic.Keys.ref_flux], fields, index=data.index) norm_flux = data[val_col] / ref_flux_per_source norm_flux_err = data[err_col] / ref_flux_per_source # compute the hyperbolic magnitudes hyp_mag = hyperbolic.compute_magnitude( norm_flux, b[val_col]) hyp_mag_err = hyperbolic.compute_magnitude_error( norm_flux, b[val_col], norm_flux_err) # add data to catalogue key_mag = val_col.replace("mag_", "mag_hyp_") key_mag_err = err_col.replace("mag_", "mag_hyp_") output[key_mag] = hyp_mag output[key_mag_err] = hyp_mag_err # store results self.add_data('output', output)
[docs] def compute(self, data, parameters): """ Main method to call. Outputs hyperbolic magnitudes compuated from a set of smoothing parameters and input catalogue with classical magitudes and their respective errors. Parameters ---------- data : `PqHandle` Input table with photometry (magnitudes or flux columns and their respective uncertainties) as defined by the configuration. parameters : `PqHandle` Table with smoothing parameters per photometric band, determined by `HyperbolicSmoothing`. Returns ------- output: `PqHandle` Output table containting hyperbolic magnitudes and their uncertainties. If the columns in the input table contain a prefix `mag_`, this output tabel will replace the prefix with `hyp_mag_`, otherwise the column names will be identical to the input table. """ self.set_data('input', data) self.set_data('parameters', parameters) self.run() self.finalize() return self.get_handle('output')