Core Demo Notebooks
Data, files, IO, and RAIL
Data, files, IO, and RAIL¶
author: Sam Schmidt
Last successfully run: Apr 18, 2022
The switchover to a ceci
-based backend has increased the complexity of methods of data access and IO, this notebook will demonstrate a variety of ways that users may interact with data in RAIL
In addition to the main RAIL code, we have developed another companion package, tables_io
available here on Github.
tables_io
aims to simplify IO for reading/writing to some of the most common file formats used within DESC: HDF5 files, parquet files, Astropy tables, and qp
ensembles. There are several examples of tables_io usage in the nb directory of the tables_io
repository, but we will demonstrate usage in several places in this notebook as well. For furthe examples consult the tables_io nb examples.
In short, tables_io
aims to simplify fileIO, and much of the io is automatically sorted out for you if your files have the appriorate extensions: that is, you can simply do a tables_io.read("file.fits") to read in a fits file or tables_io.read("newfile.pq") to read in a dataframe in parquet format. Similarly, you can specify the output format via the extension as well. This functionality is extended to qp
and RAIL
through their use of tables_io
, and file extensions will control how files are read and written unless explicitly overridden.
Another concept used in the ceci
-based RAIL when used in a Jupyter Notebook is the DataStore and DataHandle file specifications (see RAIL/rail/core/data.py for the actual code implementing these). ceci
requires that each pipeline stage have defined input
and output
files, and is primarily geared toward pipelines rather than interactive runs with a jupyter notebook. The DataStore enables interactive use of files in Jupyter. We will demonstrate some useful features of the DataStore below.
Let's start out with some imports:
import os
import tables_io
import rail
import qp
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline
First, let's use tables_io to read in some example data. There are two example files that ship with RAIL containing a small amount of cosmoDC2 data from healpix pixel 9816
, it is located in the RAIL/tests/data/
directory in the RAIL repository, one for "training" and one for "validation". Let's read in one of those data files with tables_io:
(NOTE: for historical reasons, our examples files have data that is in hdf5 format where all of the data arrays are actually in a single hdf5 group named "photometry". We will grab the data specifically from that hdf5 group by reading in the file and specifying ["photometry"] as the group in the cell below. We'll call our dataset "traindata_io" to indicate that we've read it in via tables_io, and distinguish it from the data that we'll place in the DataStore in later steps:
from rail.core.utils import RAILDIR
trainFile = os.path.join(RAILDIR, 'rail/examples/testdata/test_dc2_training_9816.hdf5')
testFile = os.path.join(RAILDIR, 'rail/examples/testdata/test_dc2_validation_9816.hdf5')
traindata_io = tables_io.read(trainFile)["photometry"]
tables_io reads this data in as an ordered dictionary of numpy arrays by default, though you can be converted to other data formats, such as a pandas dataframe as well. Let's print out the keys in the ordered dict showing the available columns, then convert the data to a pandas dataframe and look at a few of the columns as a demo:
traindata_io.keys()
odict_keys(['id', 'mag_err_g_lsst', 'mag_err_i_lsst', 'mag_err_r_lsst', 'mag_err_u_lsst', 'mag_err_y_lsst', 'mag_err_z_lsst', 'mag_g_lsst', 'mag_i_lsst', 'mag_r_lsst', 'mag_u_lsst', 'mag_y_lsst', 'mag_z_lsst', 'redshift'])
traindata_pq = tables_io.convert(traindata_io, tables_io.types.PD_DATAFRAME)
traindata_pq.head()
id | mag_err_g_lsst | mag_err_i_lsst | mag_err_r_lsst | mag_err_u_lsst | mag_err_y_lsst | mag_err_z_lsst | mag_g_lsst | mag_i_lsst | mag_r_lsst | mag_u_lsst | mag_y_lsst | mag_z_lsst | redshift | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 8062500000 | 0.005001 | 0.005001 | 0.005001 | 0.005046 | 0.005003 | 0.005001 | 16.960892 | 16.506310 | 16.653412 | 18.040369 | 16.423904 | 16.466377 | 0.020435 |
1 | 8062500062 | 0.005084 | 0.005075 | 0.005048 | 0.009552 | 0.005804 | 0.005193 | 20.709402 | 20.437565 | 20.533852 | 21.615589 | 20.388210 | 20.408886 | 0.019361 |
2 | 8062500124 | 0.005057 | 0.005016 | 0.005015 | 0.011148 | 0.005063 | 0.005023 | 20.437067 | 19.312630 | 19.709715 | 21.851952 | 18.770441 | 18.953411 | 0.036721 |
3 | 8062500186 | 0.005011 | 0.005007 | 0.005005 | 0.005477 | 0.005041 | 0.005014 | 19.128675 | 18.619995 | 18.803484 | 19.976501 | 18.479452 | 18.546589 | 0.039469 |
4 | 8062500248 | 0.005182 | 0.005118 | 0.005084 | 0.015486 | 0.006211 | 0.005308 | 21.242783 | 20.731707 | 20.911802 | 22.294912 | 20.645004 | 20.700289 | 0.026994 |
Next, let's set up the Data Store, so that our RAIL module will know where to fetch data. We will set "allow overwrite" so that we can overwrite data files and not throw errors while in our jupyter notebook:
#import RailStage stuff
from rail.core.data import TableHandle
from rail.core.stage import RailStage
DS = RailStage.data_store
DS.__class__.allow_overwrite = True
We need to add our data to the DataStore, we can add previously read data, like our traindata_pq
, or add data to the DataStore directly via the DS.read_file
method, which we will do with our "test data". We can add data with DS.add_data
for the data already in memory, we want our data in a Numpy Ordered Dict, so we will specify the type as a TableHandle. If, instead, we were storing a qp ensemble then we would set the handle as a QPHandle
. The DataHandles are defined in RAIL/rail/core/data.py, and you can see the specific code and DataHandles there.
#add data that is already read in
train_data = DS.add_data("train_data", traindata_io, TableHandle )
To read in data from file, we can use DS.read_file
, once again we want a TableHandle, and we can feed it the testFile
path defined in Cell #2 above:
#add test data directly to datastore from file:
test_data = DS.read_file("test_data", TableHandle, testFile)
Let's list the data abailable to us in the DataStore:
DS
DataStore { train_data:<class 'rail.core.data.TableHandle'> None, (d) test_data:<class 'rail.core.data.TableHandle'> /opt/hostedtoolcache/Python/3.9.16/x64/lib/python3.9/site-packages/rail/examples/testdata/test_dc2_validation_9816.hdf5, (wd) }
Note that the DataStore is just a dictionary of the files. Each Handle object contains the actual data, which is accessible via the .data
property for that file. While not particularly designed for it, you can manipulate the data via these dictionaries, which is handy for on-the-fly exploration in notebooks.
For example, say we want to add an additional column to the train_data, say "FakeID" with a more simple identifier than the long ObjID that is contained the id
column:
train_data().keys()
numgals = len(train_data()['id'])
train_data()['FakeID'] = np.arange(numgals)
Let's convert our train_data to a pandas dataframe with tables_io, and our new "FakeID" column should now be present:
train_table = tables_io.convertObj(train_data(), tables_io.types.PD_DATAFRAME)
train_table.head()
id | mag_err_g_lsst | mag_err_i_lsst | mag_err_r_lsst | mag_err_u_lsst | mag_err_y_lsst | mag_err_z_lsst | mag_g_lsst | mag_i_lsst | mag_r_lsst | mag_u_lsst | mag_y_lsst | mag_z_lsst | redshift | FakeID | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 8062500000 | 0.005001 | 0.005001 | 0.005001 | 0.005046 | 0.005003 | 0.005001 | 16.960892 | 16.506310 | 16.653412 | 18.040369 | 16.423904 | 16.466377 | 0.020435 | 0 |
1 | 8062500062 | 0.005084 | 0.005075 | 0.005048 | 0.009552 | 0.005804 | 0.005193 | 20.709402 | 20.437565 | 20.533852 | 21.615589 | 20.388210 | 20.408886 | 0.019361 | 1 |
2 | 8062500124 | 0.005057 | 0.005016 | 0.005015 | 0.011148 | 0.005063 | 0.005023 | 20.437067 | 19.312630 | 19.709715 | 21.851952 | 18.770441 | 18.953411 | 0.036721 | 2 |
3 | 8062500186 | 0.005011 | 0.005007 | 0.005005 | 0.005477 | 0.005041 | 0.005014 | 19.128675 | 18.619995 | 18.803484 | 19.976501 | 18.479452 | 18.546589 | 0.039469 | 3 |
4 | 8062500248 | 0.005182 | 0.005118 | 0.005084 | 0.015486 | 0.006211 | 0.005308 | 21.242783 | 20.731707 | 20.911802 | 22.294912 | 20.645004 | 20.700289 | 0.026994 | 4 |
And there it is, a new "FakeID" column is now added to the end of the dataset, success!
Using the data in a pipeline stage: photo-z estimation example¶
Now that we have our data in place, we can use it in a RAIL stage. As an example, we'll estimate photo-z's for our data. Let's train the KNearNeighPDF
algorithm with our train_data, and then estimate photo-z's for the test_data. We need to make the RAIL stages for each of these steps, first we need to train/inform our nearest neighbor algorithm with the train_data:
from rail.estimation.algos.knnpz import Inform_KNearNeighPDF, KNearNeighPDF
inform_knn = Inform_KNearNeighPDF.make_stage(name='inform_knn', input='train_data',
nondetect_val=99.0, model='knnpz.pkl',
hdf5_groupname='')
inform_knn.inform(train_data)
split into 7669 training and 2556 validation samples finding best fit sigma and NNeigh... best fit values are sigma=0.03166666666666667 and numneigh=7 Inserting handle into data store. model_inform_knn: inprogress_knnpz.pkl, inform_knn
<rail.core.data.ModelHandle at 0x7f65ecb5f7c0>
Running the inform
method on the training data has crated the "knnpz.pkl" file, which contains our trained tree, along with the sigma
bandwidth parameter and the numneigh
(number of neighbors to use in the PDF estimation). In the future, you could skip the inform
stage and simply load this pkl file directly into the estimation stage to save time.
Now, let's stage and run the actual PDF estimation on the test data: NOTE: we have set hdf5_groupname to "photometry", as the original data does have all our our needed photometry in a single hdf5 group named "photometry"!
estimate_knn = KNearNeighPDF.make_stage(name='estimate_knn', hdf5_groupname='photometry', nondetect_val=99.0,
model='knnpz.pkl', output="KNNPZ_estimates.hdf5")
Inserting handle into data store. model: knnpz.pkl, estimate_knn
Note that we have specified the name of the output file here with the kwarg output="KNNPZ_estimates.hdf5"
if no output is specified then the DataStore will construct its own name based on the name of the stage, and it will also default to a particular storage format, in the case of many of the estimator codes this is a FITS file titled "output_[stage name].fits".
knn_estimated = estimate_knn.estimate(test_data)
Process 0 running estimator on chunk 0 - 10000 Process 0 estimating PZ PDF for rows 0 - 10,000 Inserting handle into data store. output_estimate_knn: inprogress_KNNPZ_estimates.hdf5, estimate_knn Process 0 running estimator on chunk 10000 - 20000 Process 0 estimating PZ PDF for rows 10,000 - 20,000 Process 0 running estimator on chunk 20000 - 20449 Process 0 estimating PZ PDF for rows 20,000 - 20,449
We have successfully estimated PDFs for the ~20,000 galaxies in the test file! Note that the PDFs are in qp
format! Also note that they have been written to disk as "KNNPZ_estimate.hdf5"; however, they are also still available to us via the knn_estimated
dataset in the datastore. Let's plot an example PDF from our data in the DataStore:
We can do a quick plot to check our photo-z's. Our qp Ensemble can be called by knn_estimated()
and is subsecuently stored in knn_estimated.data
, and the Ensemble can calculate the mode of each PDF if we give it a grid of redshift values to check, which we can plot against our true redshifts from the test data:
pzmodes = knn_estimated().mode(grid=np.linspace(0,3,301)).flatten()
true_zs = test_data()['photometry']['redshift']
plt.figure(figsize=(8,8))
plt.scatter(true_zs, pzmodes, label='photoz mode for KNearNeigh',s=2)
plt.xlabel("redshift", fontsize=15)
plt.ylabel("photoz mode", fontsize=15)
plt.legend(loc='upper center', fontsize=12)
<matplotlib.legend.Legend at 0x7f65ecb27bb0>
As an alternative, we can read the data from file and make the same plot to show that you don't need to use the DataStore, you can, instead, operate on the output files:
newens = qp.read("KNNPZ_estimates.hdf5")
newpzmodes = newens.mode(grid=np.linspace(0,3,301))
plt.figure(figsize=(8,8))
plt.scatter(true_zs, newpzmodes, label='photoz mode for KNearNeigh',s=2)
plt.xlabel("redshift", fontsize=15)
plt.ylabel("photoz mode", fontsize=15)
plt.legend(loc='upper center', fontsize=12)
<matplotlib.legend.Legend at 0x7f65eca23310>
That's about it. For more usages, including how to chain together multiple stages, feeding results one into the other with the DataStore names, see goldenspike.ipynb in the examples/goldenspike directory.
Computing Hyperbolic Magnitudes
hyperbolic_magnitude_test.ipynb
Computing hyperbolic magnitudes¶
Implementation of Lupton et al. (1999) by Jan Luca van den Busch.
Hyperbolic magnitudes aim to overcome limitations of classical magnitudes, which are logarithmic in flux. Hyperbolic magnitudues are implemented using the inverse hyperbolic sine and therefore have a linear behaviour in flux at low signal to noise, which gradually transitions to the classical logarithmic scaling at high signal to noise (i.e. equivalent to classical magnitudes in this limit).
This notebooks provides an example of how to convert classical to hyperbolical magnitudes using the pipeline stages HyperbolicSmoothing
and HyperbolicMagnitudes
in the rail.core
module.
import os
import numpy as np
import matplotlib.pyplot as plt
import rail
from rail.core.data import TableHandle
from rail.core.stage import RailStage
from rail.core.utilPhotometry import HyperbolicSmoothing, HyperbolicMagnitudes
We first set up a data store for interactive usage of RAIL (see the rail/examples/goldenspike/goldenspike.ipynb
for further examples).
DS = RailStage.data_store
DS.__class__.allow_overwrite = True
Next we load some DC2 sample data that provides LSST ugrizy magnitudes and magnitude errors, which we want to convert to hyperbolic magnitudes.
from rail.core.utils import RAILDIR
testFile = os.path.join(RAILDIR, 'rail', 'examples', 'testdata', 'test_dc2_training_9816.pq')
test_mags = DS.read_file("test_data", TableHandle, testFile)
Determining the smoothing parameters¶
First we run the rail.core.HyperbolicSmoothing
stage. This stage computes the smoothing parameter (called $b$ in Lupton et al. 1999), which determines the transition between the linear and logarithmic behaviour of the hyperbolic magnitudes.
The input for this stage is a table containing magnitudes and magnitude errors per object (fluxes are also supported as input data by setting is_flux=True
in the configuration). In this example, we assume that the magnitude zeropoint is 0.0 and that we want to convert all 6 LSST bands. This can be specified with the value_columns
and error_columns
parameters, which list the names of the magnitude columns and their corresponding magnitude errors.
lsst_bands = 'ugrizy'
configuration = dict(
value_columns=[f"mag_{band}_lsst" for band in lsst_bands],
error_columns=[f"mag_err_{band}_lsst" for band in lsst_bands],
zeropoints=[0.0] * len(lsst_bands),
is_flux=False)
smooth = HyperbolicSmoothing.make_stage(name='hyperbolic_smoothing', **configuration)
smooth.compute(test_mags)
Inserting handle into data store. parameters_hyperbolic_smoothing: inprogress_parameters_hyperbolic_smoothing.pq, hyperbolic_smoothing
<rail.core.data.PqHandle at 0x7f2248ef3970>
The output of this stage is a table of relevant statistics required to compute the hyperbolic magnitudes per filter:
- the median flux error
- the zeropoint (which can be computed by comparing fluxes and magnitudes in the original
hyperbolic
code) - the reference flux $f_{\rm ref}$ that corresponds to the given zeropoint
- the smoothing parameter $b$ (in terms of the absolute and the relative flux $x = f / f_{\rm ref}$
The field ID
column is currently not used by the RAIL module and can be ignored.
smooth_params = smooth.get_handle("parameters").data
smooth_params
flux error | zeropoint | ref. flux | b relative | b absolute | ||
---|---|---|---|---|---|---|
filter | field ID | |||||
mag_u_lsst | 0 | 1.559839e-11 | 0.0 | 1.0 | 1.625332e-11 | 1.625332e-11 |
mag_g_lsst | 0 | 3.286980e-12 | 0.0 | 1.0 | 3.424989e-12 | 3.424989e-12 |
mag_r_lsst | 0 | 3.052049e-12 | 0.0 | 1.0 | 3.180194e-12 | 3.180194e-12 |
mag_i_lsst | 0 | 4.441195e-12 | 0.0 | 1.0 | 4.627666e-12 | 4.627666e-12 |
mag_z_lsst | 0 | 7.823318e-12 | 0.0 | 1.0 | 8.151793e-12 | 8.151793e-12 |
mag_y_lsst | 0 | 1.785106e-11 | 0.0 | 1.0 | 1.860057e-11 | 1.860057e-11 |
Computing the magnitudes¶
Based on the smoothing parameters, the hyperbolic magnitudes are computed with be computed by rail.core.HyperbolicMagnitudes
.
The input for this module is, again, the table with magnitudes and magnitude errors and the output table of rail.core.HyperbolicSmoothing
.
hypmag = HyperbolicMagnitudes.make_stage(name='hyperbolic_magnitudes', **configuration)
hypmag.compute(test_mags, smooth_params)
Inserting handle into data store. parameters: None, hyperbolic_magnitudes Inserting handle into data store. output_hyperbolic_magnitudes: inprogress_output_hyperbolic_magnitudes.pq, hyperbolic_magnitudes
<rail.core.data.PqHandle at 0x7f2290e9ce20>
The output of this module is a table with hyperbolic magnitudes and their corresponding error.
Note: The current default is to relabel the columns names by substituting mag_
by mag_hyp_
. If this substitution is not possible, the column names are identical to the input table with classical magnitudes.
test_hypmags = hypmag.get_handle("output").data
test_hypmags
mag_hyp_u_lsst | mag_hyp_err_u_lsst | mag_hyp_g_lsst | mag_hyp_err_g_lsst | mag_hyp_r_lsst | mag_hyp_err_r_lsst | mag_hyp_i_lsst | mag_hyp_err_i_lsst | mag_hyp_z_lsst | mag_hyp_err_z_lsst | mag_hyp_y_lsst | mag_hyp_err_y_lsst | |
---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 18.040370 | 0.005046 | 16.960892 | 0.005001 | 16.653413 | 0.005001 | 16.506310 | 0.005001 | 16.466378 | 0.005001 | 16.423906 | 0.005003 |
1 | 21.615533 | 0.009551 | 20.709402 | 0.005084 | 20.533851 | 0.005048 | 20.437566 | 0.005075 | 20.408885 | 0.005193 | 20.388203 | 0.005804 |
2 | 21.851866 | 0.011146 | 20.437067 | 0.005057 | 19.709715 | 0.005015 | 19.312630 | 0.005016 | 18.953412 | 0.005023 | 18.770441 | 0.005063 |
3 | 19.976499 | 0.005477 | 19.128676 | 0.005011 | 18.803485 | 0.005005 | 18.619996 | 0.005007 | 18.546590 | 0.005014 | 18.479452 | 0.005041 |
4 | 22.294717 | 0.015481 | 21.242782 | 0.005182 | 20.911803 | 0.005084 | 20.731707 | 0.005118 | 20.700288 | 0.005308 | 20.644994 | 0.006211 |
... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
10220 | 25.732646 | 0.301680 | 25.301790 | 0.047027 | 25.099622 | 0.036055 | 25.180361 | 0.055825 | 25.295404 | 0.108750 | 25.229366 | 0.226270 |
10221 | 25.251545 | 0.205102 | 24.512358 | 0.023323 | 24.345662 | 0.018623 | 24.434138 | 0.028559 | 24.547622 | 0.055349 | 24.678486 | 0.140864 |
10222 | 25.147493 | 0.187751 | 24.113802 | 0.016640 | 23.828346 | 0.012276 | 23.711119 | 0.015380 | 23.755514 | 0.027202 | 23.830545 | 0.065739 |
10223 | 26.305978 | 0.435503 | 25.067304 | 0.038089 | 24.770026 | 0.026890 | 24.586800 | 0.032711 | 24.781555 | 0.068406 | 24.653411 | 0.137773 |
10224 | 26.429216 | 0.461142 | 25.548904 | 0.058784 | 24.983338 | 0.032494 | 24.889564 | 0.042924 | 24.836702 | 0.071907 | 24.752944 | 0.150422 |
10225 rows × 12 columns
This plot shows the difference between the classical and hyperbolic magnitude as function of the classical $r$-band magnitude. The turn-off point is determined by the value for $b$ estimated above.
filt = "r"
mag_class = test_mags.data[f"mag_{filt}_lsst"]
magerr_class = test_mags.data[f"mag_err_{filt}_lsst"]
mag_hyp = test_hypmags[f"mag_hyp_{filt}_lsst"]
magerr_hyp = test_hypmags[f"mag_hyp_err_{filt}_lsst"]
fig = plt.figure(dpi=100)
plt.axhline(y=0.0, color="k", lw=0.55)
plt.scatter(mag_class, mag_class - mag_hyp, s=1)
plt.xlabel("Classical magnitudue")
plt.ylabel("Classical $-$ hyperbolic magnitude")
plt.title("$r$-band magnitude")
Text(0.5, 1.0, '$r$-band magnitude')
Iterator Test
iterator_test notebook¶
author: Eric Charles last run successfully: March 16, 2022
This notebook demonstrates three ways to iterate over tabular data
- Using the
tables_io.iteratorNative
function - Using the
rail.core.data.TableHandle
data handle object - Using the
rail.core.stage.RailStage
functionality
# Basic imports
import os
import rail
import tables_io
from rail.core.stage import RailStage
from rail.core.data import TableHandle
Get access to the rail DataStore, and set it to allow use to overwrite data¶
Allowing overwrites will prevent errors when re-running cells in the notebook
DS = RailStage.data_store
DS.__class__.allow_overwrite = True
Set up the path to the test data¶
from rail.core.utils import RAILDIR
pdfs_file = os.path.join(RAILDIR, "rail/examples/testdata/test_dc2_training_9816.hdf5")
Get access to the data directly, using the DataStore.read_file function¶
This will load the entire table from the file we are reading
data = DS.read_file('input', TableHandle, pdfs_file)
print(data())
OrderedDict([('photometry', OrderedDict([('id', array([8062500000, 8062500062, 8062500124, ..., 8082681636, 8082693813, 8082707059])), ('mag_err_g_lsst', array([0.00500126, 0.00508365, 0.00505737, ..., 0.01664717, 0.03818999, 0.05916394], dtype=float32)), ('mag_err_i_lsst', array([0.00500074, 0.00507535, 0.00501555, ..., 0.0153863 , 0.03277681, 0.04307469], dtype=float32)), ('mag_err_r_lsst', array([0.00500058, 0.00504773, 0.00501542, ..., 0.0122792 , 0.02692565, 0.03255744], dtype=float32)), ('mag_err_u_lsst', array([0.00504562, 0.00955173, 0.01114765, ..., 0.20123477, 0.7962344 , 0.99701214], dtype=float32)), ('mag_err_y_lsst', array([0.00500337, 0.00580441, 0.005063 , ..., 0.0662687 , 0.14290111, 0.15717329], dtype=float32)), ('mag_err_z_lsst', array([0.0050014 , 0.0051933 , 0.00502286, ..., 0.0272381 , 0.06901625, 0.07261812], dtype=float32)), ('mag_g_lsst', array([16.960892, 20.709402, 20.437067, ..., 24.11405 , 25.068745, 25.552408], dtype=float32)), ('mag_i_lsst', array([16.50631 , 20.437565, 19.31263 , ..., 23.711334, 24.587885, 24.891462], dtype=float32)), ('mag_r_lsst', array([16.653412, 20.533852, 19.709715, ..., 23.828472, 24.770744, 24.984402], dtype=float32)), ('mag_u_lsst', array([18.040369, 21.61559 , 21.851952, ..., 25.185795, 26.682219, 26.926563], dtype=float32)), ('mag_y_lsst', array([16.423904, 20.38821 , 18.770441, ..., 23.83491 , 24.673431, 24.777039], dtype=float32)), ('mag_z_lsst', array([16.466377, 20.408886, 18.953411, ..., 23.75624 , 24.786388, 24.842054], dtype=float32)), ('redshift', array([0.02043499, 0.01936132, 0.03672067, ..., 2.97927326, 2.98694714, 2.97646626]))]))])
Iterate using the tables_io.iteratorNative function¶
This will open the HDF5 file, and iterate over the file, returning chunks of data
# set up the iterator, and see what type of objec the iterator is
x = tables_io.iteratorNative(pdfs_file, groupname='photometry', chunk_size=1000)
print(x)
for xx in x:
print(xx[0], xx[1], xx[2]['id'][0])
<generator object iterHdf5ToDict at 0x7f6ecfdbd430> 0 1000 8062500000 1000 2000 8062643020 2000 3000 8062942715 3000 4000 8063364908 4000 5000 8063677075 5000 6000 8064196253 6000 7000 8064664220 7000 8000 8065297891 8000 9000 8066223293 9000 10000 8067729889 10000 10225 8075587302
Iterate using the rail.core.data.TableHandle data handle object¶
This will create a TableHandle object that points to the correct file, which can be use to iterate over that file.
th = TableHandle('data', path=pdfs_file)
x = th.iterator(groupname='photometry', chunk_size=1000)
print(x)
for xx in x:
print(xx[0], xx[1], xx[2]['id'][0])
<generator object iterHdf5ToDict at 0x7f6ecfdbd5f0> 0 1000 8062500000 1000 2000 8062643020 2000 3000 8062942715 3000 4000 8063364908 4000 5000 8063677075 5000 6000 8064196253 6000 7000 8064664220 7000 8000 8065297891 8000 9000 8066223293 9000 10000 8067729889 10000 10225 8075587302
Iterator using the rail.core.stage.RailStage functionality¶
This will create a RailStage pipeline stage, which takes as input an HDF5 file,
so the input_iterator
function can be used to iterate over that file.
from rail.core.utilStages import ColumnMapper
cm = ColumnMapper.make_stage(input=pdfs_file, chunk_size=1000, hdf5_groupname='photometry', columns=dict(id='bob'))
x = cm.input_iterator('input')
for xx in x:
print(xx[0], xx[1], xx[2]['id'][0])
0 1000 8062500000 1000 2000 8062643020 2000 3000 8062942715 3000 4000 8063364908 4000 5000 8063677075 5000 6000 8064196253 6000 7000 8064664220 7000 8000 8065297891 8000 9000 8066223293 9000 10000 8067729889 10000 10225 8075587302
Pipeline Example
RAIL Pipeline example notebook¶
author: Eric Charles
last run successfully: Dec 12, 2022
This notbook shows how to:
- Build a simple rail pipeline interactive,
- Save that pipeline (including configuraiton) to a yaml file,
- Load that pipeline from the saved yaml file,
- Run the loaded pipeline.
import os
import numpy as np
import ceci
import rail
from rail.core.stage import RailStage
from rail.creation.degradation import LSSTErrorModel, InvRedshiftIncompleteness, LineConfusion, QuantityCut
from rail.creation.engines.flowEngine import FlowCreator, FlowPosterior
from rail.core.data import TableHandle
from rail.core.stage import RailStage
from rail.core.utilStages import ColumnMapper, TableConverter
We'll start by setting up the Rail data store. RAIL uses ceci, which is designed for pipelines rather than interactive notebooks; the data store will work around that and enable us to use data interactively.
When working interactively, we want to allow overwriting data in the Rail data store to avoid errors if we re-run cells.
See the rail/examples/goldenspike/goldenspike.ipynb
example notebook for more details on the Data Store.
DS = RailStage.data_store
DS.__class__.allow_overwrite = True
Some configuration setup¶
The example pipeline builds some of the RAIL creation functionality into a pipeline.
Here we are defining:
- The location of the pretrained PZFlow file used with this example.
- The bands we will be generating data for.
- The names of the columns where we will be writing the error estimates.
- The grid of redshifts we use for posterior estimation.
from rail.core.utils import RAILDIR
flow_file = os.path.join(RAILDIR, 'rail/examples/goldenspike/data/pretrained_flow.pkl')
bands = ['u','g','r','i','z','y']
band_dict = {band:f'mag_{band}_lsst' for band in bands}
rename_dict = {f'mag_{band}_lsst_err':f'mag_err_{band}_lsst' for band in bands}
post_grid = [float(x) for x in np.linspace(0., 5, 21)]
Define the pipeline stages¶
The RailStage base class defines the make_stage
"classmethod" function, which allows us to make a stage of
that particular type in a general way.
Note that that we are passing in the configuration parameters to each pipeline stage as keyword arguments.
The names of the parameters will depend on the stage type.
A couple of things are important:
- Each stage should have a unique name. In
ceci
, stage names default to the name of the class (e.g., FlowCreator, or LSSTErrorModel); this would be problematic if you wanted two stages of the same type in a given pipeline, so be sure to assign each stage its own name. - At this point, we aren't actually worrying about the connections between the stages.
flow_engine_test = FlowCreator.make_stage(name='flow_engine_test',
model=flow_file, n_samples=50)
lsst_error_model_test = LSSTErrorModel.make_stage(name='lsst_error_model_test',
bandNames=band_dict)
col_remapper_test = ColumnMapper.make_stage(name='col_remapper_test', hdf5_groupname='',
columns=rename_dict)
flow_post_test = FlowPosterior.make_stage(name='flow_post_test',
column='redshift', flow=flow_file,
grid=post_grid)
table_conv_test = TableConverter.make_stage(name='table_conv_test', output_format='numpyDict',
seed=12345)
No GPU/TPU found, falling back to CPU. (Set TF_CPP_MIN_LOG_LEVEL=0 and rerun for more info.)
Inserting handle into data store. model: /opt/hostedtoolcache/Python/3.9.16/x64/lib/python3.9/site-packages/rail/examples/goldenspike/data/pretrained_flow.pkl, flow_engine_test
Make the pipeline and add the stages¶
Here we make an empty interactive pipeline (interactive in the sense that it will be run locally, rather than using the batch submission mechanisms built into ceci
), and add the stages to that pipeline.
pipe = ceci.Pipeline.interactive()
stages = [flow_engine_test, lsst_error_model_test, col_remapper_test, table_conv_test]
for stage in stages:
pipe.add_stage(stage)
Here are some examples of interactive introspection into the pipeline¶
I.e., some functions that you can use to figure out what the pipeline is doing.
# Get the names of the stages
pipe.stage_names
['flow_engine_test', 'lsst_error_model_test', 'col_remapper_test', 'table_conv_test']
# Get the configuration of a particular stage
pipe.flow_engine_test.config
StageConfig{output_mode:default,n_samples:50,seed:12345,name:flow_engine_test,model:/opt/hostedtoolcache/Python/3.9.16/x64/lib/python3.9/site-packages/rail/examples/goldenspike/data/pretrained_flow.pkl,config:None,aliases:{'output': 'output_flow_engine_test'},}
# Get the list of outputs 'tags'
# These are how the stage thinks of the outputs, as a list names associated to DataHandle types.
pipe.flow_engine_test.outputs
[('output', rail.core.data.PqHandle)]
# Get the list of outputs 'aliased tags'
# These are how the pipeline things of the outputs, as a unique key that points to a particular file
pipe.flow_engine_test._outputs
{'output_flow_engine_test': 'output_flow_engine_test.pq'}
Okay, now let's connect up the pipeline stages¶
We can use the RailStage.connect_input
function to connect one stage to another.
By default, this will connect the output data product called output
for one stage.
lsst_error_model_test.connect_input(flow_engine_test)
col_remapper_test.connect_input(lsst_error_model_test)
#flow_post_test.connect_input(col_remapper_test, inputTag='input')
table_conv_test.connect_input(col_remapper_test)
Inserting handle into data store. output_flow_engine_test: inprogress_output_flow_engine_test.pq, flow_engine_test Inserting handle into data store. output_lsst_error_model_test: inprogress_output_lsst_error_model_test.pq, lsst_error_model_test Inserting handle into data store. output_col_remapper_test: inprogress_output_col_remapper_test.pq, col_remapper_test
Initialize the pipeline¶
This will do a few things:
- Attach any global pipeline inputs that were not specified in the connections above. In our case, the input flow file is pre-existing and must be specified as a global input.
- Specifiy output and logging directories.
- Optionally, create the pipeline in 'resume' mode, where it will ignore stages if all of their output already exists.
pipe.initialize(dict(model=flow_file), dict(output_dir='.', log_dir='.', resume=False), None)
(({'flow_engine_test': <Job flow_engine_test>, 'lsst_error_model_test': <Job lsst_error_model_test>, 'col_remapper_test': <Job col_remapper_test>, 'table_conv_test': <Job table_conv_test>}, [<rail.creation.engines.flowEngine.FlowCreator at 0x7f9d68a80a00>, LSSTErrorModel parameters: Model for bands: mag_u_lsst, mag_g_lsst, mag_r_lsst, mag_i_lsst, mag_z_lsst, mag_y_lsst Using error type point Exposure time = 30.0 s Number of years of observations = 10.0 Mean visits per year per band: mag_u_lsst: 5.6, mag_g_lsst: 8.0, mag_r_lsst: 18.4, mag_i_lsst: 18.4, mag_z_lsst: 16.0, mag_y_lsst: 16.0 Airmass = 1.2 Irreducible system error = 0.005 Magnitudes dimmer than 30.0 are set to nan gamma for each band: mag_u_lsst: 0.038, mag_g_lsst: 0.039, mag_r_lsst: 0.039, mag_i_lsst: 0.039, mag_z_lsst: 0.039, mag_y_lsst: 0.039 The coadded 5-sigma limiting magnitudes are: mag_u_lsst: 26.04, mag_g_lsst: 27.29, mag_r_lsst: 27.31, mag_i_lsst: 26.87, mag_z_lsst: 26.23, mag_y_lsst: 25.30 The following single-visit 5-sigma limiting magnitudes are calculated using the parameters that follow them: mag_u_lsst: 23.83, mag_g_lsst: 24.90, mag_r_lsst: 24.47, mag_i_lsst: 24.03, mag_z_lsst: 23.46, mag_y_lsst: 22.53 Cm for each band: mag_u_lsst: 23.09, mag_g_lsst: 24.42, mag_r_lsst: 24.44, mag_i_lsst: 24.32, mag_z_lsst: 24.16, mag_y_lsst: 23.73 Median zenith sky brightness in each band: mag_u_lsst: 22.99, mag_g_lsst: 22.26, mag_r_lsst: 21.2, mag_i_lsst: 20.48, mag_z_lsst: 19.6, mag_y_lsst: 18.61 Median zenith seeing FWHM (in arcseconds) for each band: mag_u_lsst: 0.81, mag_g_lsst: 0.77, mag_r_lsst: 0.73, mag_i_lsst: 0.71, mag_z_lsst: 0.69, mag_y_lsst: 0.68 Extinction coefficient for each band: mag_u_lsst: 0.491, mag_g_lsst: 0.213, mag_r_lsst: 0.126, mag_i_lsst: 0.096, mag_z_lsst: 0.069, mag_y_lsst: 0.17, Stage that applies remaps the following column names in a pandas DataFrame: f{str(self.config.columns)}, <rail.core.utilStages.TableConverter at 0x7f9d649f40d0>]), {'output_dir': '.', 'log_dir': '.', 'resume': False})
Save the pipeline¶
This will actually write two files (b/c this is what ceci
wants)
- pipe_example.yml, which will have a list of stages, with instructions on how to execute the stages (e.g., run this stage in parallel on 20 processors). For an interactive pipeline, those instructions will be trivial.
- pipe_example_config.yml, which will have a dictionary of configurations for each stage.
pipe.save('pipe_saved.yml')
Read the saved pipeline¶
pr = ceci.Pipeline.read('pipe_saved.yml')
Run the newly read pipeline¶
This will actually launch Unix process to individually run each stage of the pipeline; you can see the commands that are being executed in each case.
pr.run()
Executing flow_engine_test Command is: OMP_NUM_THREADS=1 python3 -m ceci rail.creation.engines.flowEngine.FlowCreator --model=/opt/hostedtoolcache/Python/3.9.16/x64/lib/python3.9/site-packages/rail/examples/goldenspike/data/pretrained_flow.pkl --name=flow_engine_test --config=pipe_saved_config.yml --output=./output_flow_engine_test.pq Output writing to ./flow_engine_test.out Job flow_engine_test has completed successfully! Executing lsst_error_model_test Command is: OMP_NUM_THREADS=1 python3 -m ceci rail.creation.degradation.lsst_error_model.LSSTErrorModel --input=./output_flow_engine_test.pq --name=lsst_error_model_test --config=pipe_saved_config.yml --output=./output_lsst_error_model_test.pq Output writing to ./lsst_error_model_test.out Job lsst_error_model_test has completed successfully! Executing col_remapper_test Command is: OMP_NUM_THREADS=1 python3 -m ceci rail.core.utilStages.ColumnMapper --input=./output_lsst_error_model_test.pq --name=col_remapper_test --config=pipe_saved_config.yml --output=./output_col_remapper_test.pq Output writing to ./col_remapper_test.out Job col_remapper_test has completed successfully! Executing table_conv_test Command is: OMP_NUM_THREADS=1 python3 -m ceci rail.core.utilStages.TableConverter --input=./output_col_remapper_test.pq --name=table_conv_test --config=pipe_saved_config.yml --output=./output_table_conv_test.hdf5 Output writing to ./table_conv_test.out Job table_conv_test has completed successfully!
0
Run Pipe
Run_Pipe, example of running a rail pipeline¶
author: Eric Charles last run successfully: March 16, 2022
This notbook shows how to:
- Load that pipeline from a saved yaml file
- Run the loaded pipeline
import ceci
p = ceci.Pipeline.read('pipe_example.yml')
No GPU/TPU found, falling back to CPU. (Set TF_CPP_MIN_LOG_LEVEL=0 and rerun for more info.)
Inserting handle into data store. model: ../../src/rail/examples/goldenspike/data/pretrained_flow.pkl, flow_engine_test
p.run()
Executing flow_engine_test Command is: OMP_NUM_THREADS=1 python3 -m ceci rail.creation.engines.flowEngine.FlowCreator --model=../../src/rail/examples/goldenspike/data/pretrained_flow.pkl --name=flow_engine_test --config=pipe_example_config.yml --output=./output_flow_engine_test.pq Output writing to ./flow_engine_test.out Job flow_engine_test has completed successfully! Executing lsst_error_model_test Command is: OMP_NUM_THREADS=1 python3 -m ceci rail.creation.degradation.lsst_error_model.LSSTErrorModel --input=./output_flow_engine_test.pq --name=lsst_error_model_test --config=pipe_example_config.yml --output=./output_lsst_error_model_test.pq Output writing to ./lsst_error_model_test.out Job lsst_error_model_test has completed successfully! Executing col_remapper_test Command is: OMP_NUM_THREADS=1 python3 -m ceci rail.core.utilStages.ColumnMapper --input=./output_lsst_error_model_test.pq --name=col_remapper_test --config=pipe_example_config.yml --output=./output_col_remapper_test.pq Output writing to ./col_remapper_test.out Job col_remapper_test has completed successfully! Executing table_conv_test Command is: OMP_NUM_THREADS=1 python3 -m ceci rail.core.utilStages.TableConverter --input=./output_col_remapper_test.pq --name=table_conv_test --config=pipe_example_config.yml --output=./output_table_conv_test.hdf5 Output writing to ./table_conv_test.out Job table_conv_test has completed successfully!
0
Yep, that's it.¶