#!/usr/bin/env python3
# Author: Akke Viitanen
# Email: akke.viitanen@helsinki.fi
# Date: 2023-04-12 22:44:16
"""Impelement wrapper for calling egg."""
import logging
import os
import shutil
from configparser import ConfigParser
from lsst_inaf_agile import util
[docs]
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
[docs]
handler = logging.StreamHandler()
handler.setLevel(logging.DEBUG)
handler.setFormatter(formatter)
logger.addHandler(handler)
# See if EGG has been installed properly
[docs]
def find_egg():
if not shutil.which("egg-gencat"):
raise FileNotFoundError("Could not find EGG binary. Check the EGG installation.")
return shutil.which("egg-gencat")
find_egg()
[docs]
class Egg:
"""
EGG class.
Provie a wrapper for the EGG command-line software.
Examples
--------
>>> # Create an EGG object with 1deg2 area and logM > 9.5 (Chabrier)
>>> # follow-it by calling egg.run() ...
>>> egg = Egg({'area': 1, 'mmin': 9.5})
"""
def __init__(self, egg_kwargs):
"""
Initialize EGG.
Parameters
----------
egg_kwargs: dict
list of EGG key value pairs used to construct the EGG command-line arguments.
"""
[docs]
self.egg_kwargs = egg_kwargs
[docs]
def get_argument_line(self, exclude=None):
"""Get the EGG argument line for calling it from the terminal."""
if exclude is None:
exclude = []
egg_kwargs = {k: v for k, v in self.egg_kwargs.items()}
return sorted([f"{k}={str(v)}" for k, v in egg_kwargs.items() if k not in exclude])
[docs]
def get_filename(self):
"""Return an example filename constructed from the kwargs."""
if "out" in self.egg_kwargs:
return self.egg_kwargs["out"]
exclude = ["verbose", "out", "bands"]
filename = "data/egg/" + "/".join(self.get_argument_line(exclude=exclude)) + "/egg.fits"
filename = filename.replace("[", "")
filename = filename.replace("]", "")
filename = filename.replace(",", "/")
return filename
[docs]
def get_area(self):
"""Return EGG catalog area."""
return self.egg_kwargs["area"]
[docs]
def run(self, overwrite=False):
"""Run EGG using the kwargs supplied."""
cmd = " ".join(["egg-gencat"] + self.get_argument_line())
filename = self.get_filename()
if not overwrite and os.path.exists(filename):
logger.info("Not overwriting existing file...")
return 0
util.create_directory(filename)
return os.system(cmd)
[docs]
def get_sed(self, i, component=None, overwrite=False):
"""
Return an EGG SED.
Parameters
----------
i: int
ID of the galaxy.
component: str or None
One of "bulge", "disk" or None (= bulge + disk).
overwrite: bool
Run egg-getsed regardless of existing filename.
Raises
------
ValueError
If component is invalid.
"""
if component not in [None, "bulge", "disk"]:
raise ValueError(f"Invalid {component=}")
dirname = os.path.dirname(self.egg_kwargs["out"])
fname = f"{dirname}/egg-seds"
if component in ["bulge", "disk"]:
fname += f"-{component}"
fname += f"-{i}.fits"
if overwrite or not os.path.exists(fname):
cmd = f"egg-getsed seds={dirname}/egg-seds.dat id={i}"
if component is not None:
cmd += f" component={component}"
os.system(cmd)
assert os.path.exists(fname)
return util.read_fits(fname)
@staticmethod
[docs]
def read(filename):
"""
Read an EGG catalog from the filename.
Note that usually and for small files, loading EGG through
astropy/fitsio works fine. For large files (n_bands * n_galaxies >= 2
** 28) these routines fail due to the dtype not fitting into a C
integer.
The workaround is to write these columns as binary files using a
separate routine, which can then be read using numpy and shaped to the
correct shape.
This is a dirty hack, but the alternative is to change completely how
EGG writes the FITS files.
"""
import fitsio
columns = [
"ID",
"RA",
"DEC",
"Z",
"D",
"M",
"SFR",
"CMD",
"DISK_ANGLE",
"DISK_RADIUS",
"DISK_RATIO",
"BULGE_ANGLE",
"BULGE_RADIUS",
"BULGE_RATIO",
"BANDS",
"LAMBDA",
"RFBANDS",
"RFLAMBDA",
"AVLINES_BULGE",
"AVLINES_DISK",
"CLUSTERED",
"PASSIVE",
"FLUX",
"FLUX_BULGE",
"FLUX_DISK",
"RFMAG",
"RFMAG_BULGE",
"RFMAG_DISK",
]
ret = {k: fitsio.read(filename, columns=k) for k in columns}
return ret
# NOTE: fossil code below
# try:
# ret = {k: fitsio.read(filename, columns=k) for k in columns}
# return ret
## except NotImplementedError("FIXME: catch the correct error and remove 'raise'"):
# except:
# raise
# logger.info("Reading (and writing) the EGG flux files one-by-one")
## Remove the fluxes
# columns = columns[:-6]
# ret = {k: fitsio.read(filename, columns=k) for k in columns}
# Ngal = ret["ID"][0].size
# Nbands = ret["BANDS"][0].size
# for k1 in "FLUX", "RFMAG":
# for k2 in "", "_BULGE", "_DISK":
# k = k1 + k2
# fin = filename
# fout = filename.replace(".fits", f"_{k}.dat")
# if not os.path.exists(fout):
# logger.info(f" Writing {k} from {fin} to {fout}")
# os.system(
# f"/home/viitanen/.local/bin/write_fits_column {fin} {fout} {k} {Ngal * Nbands}"
# )
# ret[k] = np.fromfile(fout, dtype=np.float32).reshape((1, Ngal, Nbands))
# return ret
@staticmethod
[docs]
def get_smf(z, key, filename):
"""
Read an EGG-like stellar mass function from a file.
Parameters
----------
z: float
Redshift of the stellar mass function.
key: str
One of "ACTIVE" or "PASSIVE".
filename: str
Filename containing the stellar mass function in EGG format.
"""
import numpy as np
smf = util.read_fits(filename)
i = None
for _i, (zlo, zhi) in enumerate(smf["ZB"][0].T):
if zlo <= z < zhi:
i = _i
break
x = np.mean(smf["MB"][0], axis=0)
y = smf[key][0][i, :]
return x, y
@staticmethod
[docs]
def run_config(filename):
"""
Run EGG for the given configuration file.
Parameters
----------
filename: str
Path to configuration file.
Returns
-------
The EGG catalog using Egg.read.
"""
logger.info(f"Reading {filename}")
config = ConfigParser()
config.read(filename)
assert "egg" in config
egg = Egg(config["egg"])
egg.run()
return Egg.read(config["egg"]["out"])
@staticmethod
[docs]
def get_example_egg_kwargs(filename):
"""Return and example set of EGG kwargs for testing purposes."""
return dict(
out=filename,
area=0.01,
mmin=9.5,
zmin=0.21,
zmax=5.49,
bands="[lsst-u,lsst-g,lsst-r,lsst-i,lsst-z,lsst-y,mock-1000-4000,mock-4400,mock-1450]",
rfbands="[lsst-u,lsst-g,lsst-r,lsst-i,lsst-z,lsst-y,mock-1000-4000,mock-4400,mock-1450]",
filter_db="data/egg/share/filter-db/db.dat",
save_sed=1,
verbose=1,
)