Source code for lsst_inaf_agile.egg

#!/usr/bin/env python3
# Author: Akke Viitanen
# Email: akke.viitanen@helsinki.fi
# Date: 2023-04-12 22:44:16

"""Impelement wrapper for calling egg."""

import logging
import os
import shutil
from configparser import ConfigParser

from lsst_inaf_agile import util

[docs] logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
[docs] formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
[docs] handler = logging.StreamHandler()
handler.setLevel(logging.DEBUG) handler.setFormatter(formatter) logger.addHandler(handler) # See if EGG has been installed properly
[docs] def find_egg(): if not shutil.which("egg-gencat"): raise FileNotFoundError("Could not find EGG binary. Check the EGG installation.") return shutil.which("egg-gencat")
find_egg()
[docs] class Egg: """ EGG class. Provie a wrapper for the EGG command-line software. Examples -------- >>> # Create an EGG object with 1deg2 area and logM > 9.5 (Chabrier) >>> # follow-it by calling egg.run() ... >>> egg = Egg({'area': 1, 'mmin': 9.5}) """ def __init__(self, egg_kwargs): """ Initialize EGG. Parameters ---------- egg_kwargs: dict list of EGG key value pairs used to construct the EGG command-line arguments. """
[docs] self.egg_kwargs = egg_kwargs
[docs] def get_argument_line(self, exclude=None): """Get the EGG argument line for calling it from the terminal.""" if exclude is None: exclude = [] egg_kwargs = {k: v for k, v in self.egg_kwargs.items()} return sorted([f"{k}={str(v)}" for k, v in egg_kwargs.items() if k not in exclude])
[docs] def get_filename(self): """Return an example filename constructed from the kwargs.""" if "out" in self.egg_kwargs: return self.egg_kwargs["out"] exclude = ["verbose", "out", "bands"] filename = "data/egg/" + "/".join(self.get_argument_line(exclude=exclude)) + "/egg.fits" filename = filename.replace("[", "") filename = filename.replace("]", "") filename = filename.replace(",", "/") return filename
[docs] def get_area(self): """Return EGG catalog area.""" return self.egg_kwargs["area"]
[docs] def run(self, overwrite=False): """Run EGG using the kwargs supplied.""" cmd = " ".join(["egg-gencat"] + self.get_argument_line()) filename = self.get_filename() if not overwrite and os.path.exists(filename): logger.info("Not overwriting existing file...") return 0 util.create_directory(filename) return os.system(cmd)
[docs] def get_sed(self, i, component=None, overwrite=False): """ Return an EGG SED. Parameters ---------- i: int ID of the galaxy. component: str or None One of "bulge", "disk" or None (= bulge + disk). overwrite: bool Run egg-getsed regardless of existing filename. Raises ------ ValueError If component is invalid. """ if component not in [None, "bulge", "disk"]: raise ValueError(f"Invalid {component=}") dirname = os.path.dirname(self.egg_kwargs["out"]) fname = f"{dirname}/egg-seds" if component in ["bulge", "disk"]: fname += f"-{component}" fname += f"-{i}.fits" if overwrite or not os.path.exists(fname): cmd = f"egg-getsed seds={dirname}/egg-seds.dat id={i}" if component is not None: cmd += f" component={component}" os.system(cmd) assert os.path.exists(fname) return util.read_fits(fname)
@staticmethod
[docs] def read(filename): """ Read an EGG catalog from the filename. Note that usually and for small files, loading EGG through astropy/fitsio works fine. For large files (n_bands * n_galaxies >= 2 ** 28) these routines fail due to the dtype not fitting into a C integer. The workaround is to write these columns as binary files using a separate routine, which can then be read using numpy and shaped to the correct shape. This is a dirty hack, but the alternative is to change completely how EGG writes the FITS files. """ import fitsio columns = [ "ID", "RA", "DEC", "Z", "D", "M", "SFR", "CMD", "DISK_ANGLE", "DISK_RADIUS", "DISK_RATIO", "BULGE_ANGLE", "BULGE_RADIUS", "BULGE_RATIO", "BANDS", "LAMBDA", "RFBANDS", "RFLAMBDA", "AVLINES_BULGE", "AVLINES_DISK", "CLUSTERED", "PASSIVE", "FLUX", "FLUX_BULGE", "FLUX_DISK", "RFMAG", "RFMAG_BULGE", "RFMAG_DISK", ] ret = {k: fitsio.read(filename, columns=k) for k in columns} return ret
# NOTE: fossil code below # try: # ret = {k: fitsio.read(filename, columns=k) for k in columns} # return ret ## except NotImplementedError("FIXME: catch the correct error and remove 'raise'"): # except: # raise # logger.info("Reading (and writing) the EGG flux files one-by-one") ## Remove the fluxes # columns = columns[:-6] # ret = {k: fitsio.read(filename, columns=k) for k in columns} # Ngal = ret["ID"][0].size # Nbands = ret["BANDS"][0].size # for k1 in "FLUX", "RFMAG": # for k2 in "", "_BULGE", "_DISK": # k = k1 + k2 # fin = filename # fout = filename.replace(".fits", f"_{k}.dat") # if not os.path.exists(fout): # logger.info(f" Writing {k} from {fin} to {fout}") # os.system( # f"/home/viitanen/.local/bin/write_fits_column {fin} {fout} {k} {Ngal * Nbands}" # ) # ret[k] = np.fromfile(fout, dtype=np.float32).reshape((1, Ngal, Nbands)) # return ret @staticmethod
[docs] def get_smf(z, key, filename): """ Read an EGG-like stellar mass function from a file. Parameters ---------- z: float Redshift of the stellar mass function. key: str One of "ACTIVE" or "PASSIVE". filename: str Filename containing the stellar mass function in EGG format. """ import numpy as np smf = util.read_fits(filename) i = None for _i, (zlo, zhi) in enumerate(smf["ZB"][0].T): if zlo <= z < zhi: i = _i break x = np.mean(smf["MB"][0], axis=0) y = smf[key][0][i, :] return x, y
@staticmethod
[docs] def run_config(filename): """ Run EGG for the given configuration file. Parameters ---------- filename: str Path to configuration file. Returns ------- The EGG catalog using Egg.read. """ logger.info(f"Reading {filename}") config = ConfigParser() config.read(filename) assert "egg" in config egg = Egg(config["egg"]) egg.run() return Egg.read(config["egg"]["out"])
@staticmethod
[docs] def get_example_egg_kwargs(filename): """Return and example set of EGG kwargs for testing purposes.""" return dict( out=filename, area=0.01, mmin=9.5, zmin=0.21, zmax=5.49, bands="[lsst-u,lsst-g,lsst-r,lsst-i,lsst-z,lsst-y,mock-1000-4000,mock-4400,mock-1450]", rfbands="[lsst-u,lsst-g,lsst-r,lsst-i,lsst-z,lsst-y,mock-1000-4000,mock-4400,mock-1450]", filter_db="data/egg/share/filter-db/db.dat", save_sed=1, verbose=1, )