#!/usr/bin/env python3
# Author: Akke Viitanen
# Email: akke.viitanen@helsinki.fi
# Date: 2023-02-13 16:20:51
"""Module for the combined catalog of AGNs, galaxies, and stars."""
import glob
import logging
import os
import re
from copy import deepcopy
import fitsio
import numpy as np
from lsst_inaf_agile import util
[docs]
logger = logging.getLogger(__name__)
[docs]
class CatalogCombined:
"""
Combined catalog class of AGN, galaxies, and stars.
If the catalog does not exist, then create it. If the catalog exists as a
FITS file, try to load it. Finally, if there is a database containing the
catalog, return that one instead.
Parameters
----------
dirname: str
Directory name to contain the catalog.
catalog_galaxy: catalog_galaxy.CatalogGalaxy or None
Galaxy catalog.
catalog_agn: catalog_agn.CatalogAGN or None
AGN catalog.
catalog_star: catalog_star.CatalogStar or None
Single star catalog.
catalog_binary: catalog_star.CatalogStar or None
Binary star catalog.
sql_query: str
Input SQL query if a database file is read.
cache: bool
Use an existing catalog whenever available. Otherwise, always overwrite
the one in disk.
"""
def __init__(
self,
dirname: str,
catalog_galaxy=None,
catalog_agn=None,
catalog_star=None,
catalog_binary=None,
sql_query=None,
cache=True,
):
"""
Initialize CombinedCatalog.
"""
[docs]
self.catalog_galaxy = catalog_galaxy
[docs]
self.catalog_agn = catalog_agn
[docs]
self.catalog_star = catalog_star
[docs]
self.catalog_binary = catalog_binary
# NOTE: short-circuit for an existing truth catalog
if cache and os.path.exists(filename := self.get_filename()):
logger.info(f"Found catalog FITS file {filename}")
self.catalog_combined = util.read_fits(filename)
return
# NOTE: short-circuit for an existing database truth catalog
if (
cache
and sql_query is not None
and (filenames := glob.glob(f"{self.dirname}/db/**/master.db", recursive=True))
):
import sqlite3
import pandas as pd
from astropy.table import Table
filename = sorted(filenames)[-1]
logger.info(f"Found database file {filename}")
with sqlite3.connect(filename) as con:
df = pd.read_sql_query(sql_query, con)
self.catalog_combined = Table.from_pandas(df)
return
# Create the catalog
[docs]
self.catalog_combined = self.get_catalog_combined()
self.catalog_combined = self.postprocess()
self.write()
[docs]
def get_catalogs(self):
"""Return the non-None catalogs."""
ret = []
for c in self.catalog_galaxy, self.catalog_agn, self.catalog_star, self.catalog_binary:
if c is not None:
ret.append(c)
return ret
[docs]
def get_filename(self):
"""Return the FITS filename of the combined catalog."""
return f"{self.dirname}/catalog.fits"
[docs]
def get_dtype(self):
"""
Return the combined catalog dtype constructed from the individual input catalogs.
Returns
-------
List of (name, type) corresponding to the names and types of the columns.
"""
dtype = {}
for catalog in self.get_catalogs():
for name in catalog.catalog.dtype.names:
dtype[name] = catalog.catalog.dtype[name]
for band in self.catalog_galaxy.bands:
dtype[band + "_total"] = np.float64
for band in self.catalog_galaxy.rfbands:
dtype["magabs_" + band + "_total"] = np.float64
return [(k, v) for k, v in dtype.items()]
[docs]
def get_number_galaxy_star_binary(self):
"""
Return the tuple (N_galaxy, N_star, N_binary).
N is the number of objects in the combined catalog.
"""
return (
len(self.catalog_galaxy.catalog),
len(self.catalog_star.catalog),
len(self.catalog_binary.catalog),
)
[docs]
def get_catalog_combined(self):
r"""
Construct the combined catalog based on the dtype.
For each column in the catalog, the column is looked for in the
AGN/galaxy/star/binary catalogs. Upon a match, the corresponding rows
are assigned the values from the reference catalog. Note that some
columns may appear in multiple catalog.
The indices correspond to the following object types: AGN \in [0,
n_galaxy[, galaxy \in [0, n_galaxy[, star \in [n_galaxy, n_galaxy +
n_star], and binary \in [n_galaxy + n_star, n_total[.
Returns
-------
The combined catalog as a numpy ndarray.
"""
n_galaxy, n_star, n_binary = self.get_number_galaxy_star_binary()
n_total = n_galaxy + n_star + n_binary
catalog_combined = np.zeros(n_total, dtype=self.get_dtype())
# Set the catalog ID
catalog_combined["ID"] = np.arange(n_total)
# Fetch the values for the columns
for name in catalog_combined.dtype.names:
logger.info(f"Setting attribute {name} {catalog_combined.dtype[name]}")
# Handles galaxy
if name in self.catalog_agn.catalog.dtype.names:
catalog_combined[name][:n_galaxy] = self.catalog_agn[name]
# Handles AGN
if name in self.catalog_galaxy.catalog.dtype.names:
catalog_combined[name][:n_galaxy] = self.catalog_galaxy[name]
# Handles star
if name in self.catalog_star.catalog.dtype.names:
catalog_combined[name][n_galaxy : n_galaxy + n_star] = self.catalog_star[name]
# NOTE: set the flags to False for stars
elif catalog_combined.dtype[name] == np.dtype("bool"):
catalog_combined[name][n_galaxy : n_galaxy + n_star] = False
# Handles binary star
if name in self.catalog_binary.catalog.dtype.names:
catalog_combined[name][n_galaxy + n_star :] = self.catalog_binary[name]
# NOTE: set the flags to False for stars
elif catalog_combined.dtype[name] == np.dtype("bool"):
catalog_combined[name][n_galaxy + n_star :] = False
return catalog_combined
[docs]
def postprocess(self):
"""Add columns in post-processing."""
for band in self.catalog_galaxy.bands:
logger.info(f"Postprocessing flux {band}")
self.catalog_combined[band + "_total"] = self.get_flux_total(band, False)
for band in self.catalog_galaxy.rfbands:
logger.info(f"Postprocessing magabs {band}")
self.catalog_combined["magabs_" + band + "_total"] = self.get_flux_total(band, True)
return self.catalog_combined
[docs]
def get_flux_total(self, band, rest_frame=False):
"""
Return the total flux.
The total flux is defined for each type of object as follows:
- AGN: sum of galaxy bulge, galaxy disk, and point fluxes
- galaxy: sum of galaxy bulge and disk fluxes
- star: point fluxes
- binary star: sum of the component point fluxes
"""
# NOTE: override rest-frame behavior as EGG does not have absmags for
# disk/bulge separately
keys = ["disk", "bulge", "point"]
if rest_frame:
keys = ["", "point"]
ret = np.zeros(self.catalog_combined.size)
for p in keys:
key = "magabs_" * rest_frame + f"{band}" + f"_{p}" * (p != "")
flux = self.catalog_combined[key]
if rest_frame:
flux = util.mag_to_flux(flux)
ret += np.where(np.isfinite(flux), flux, 0.0)
if rest_frame:
ret = util.flux_to_mag(ret)
return ret
[docs]
def write(self):
"""Write the combined catalog to a FITS file."""
fitsio.write(self.get_filename(), self.catalog_combined, clobber=True)
return self.catalog_combined
[docs]
def ingest(self, filename_database, name_table, if_exists="replace"):
"""
Ingest the truth catalog into a sqlite3 database.
Parameters
----------
filename_database: str
filename of the sqlite3 database
name_table: str
name of the table in the sqlite3 database
if_exists: str
sqlite3 action to handle an existing table
"""
import sqlite3
from astropy.table import Table
logger.info("Creating the directory ...")
util.create_directory(filename_database)
logger.info("Creating the pandas dataframe ...")
table = Table(self.catalog_combined)
df = table.to_pandas()
logger.info("Ingesting ...")
with sqlite3.connect(filename_database) as con:
df.to_sql(name_table, con, index=False, if_exists=if_exists, chunksize=1024, method="multi")
[docs]
def get_is_galaxy(self):
"""Return boolean vector selecting galaxies."""
return self.catalog_combined["Z"] > 0
[docs]
def get_is_star(self):
"""Return boolean vector selecting stars."""
return ~self.get_is_galaxy()
[docs]
def get_index_star(self, i):
"""
Return the index in the stellar catalog corresponding to 'i' in the combined catalog.
Parameters
----------
i: int
ID of the truth catalog object
"""
n_galaxy, n_star, n_binary = self.get_number_galaxy_star_binary()
# Handles single star
if n_galaxy <= i < n_galaxy + n_star:
return i - n_galaxy
# Handles binary star
if n_galaxy + n_star <= i:
return i - n_galaxy - n_star
# Handles something else
return None
[docs]
def write_reference_catalog(self, filename: str, maglim: float, selection_band: str):
"""
Write a mock LSST reference catalog in CSV format to the given 'filename'.
The reference catalog is cut to brighter than the magnitude limit
'maglim' in the band 'selection_band'.
"""
logger.info("Writing the reference catalog...")
if os.path.exists(filename):
logger.info(f"{filename} exists, will not overwrite.")
return
# Cut at some magnitude limit e.g. r < 24 and non-variable sources
select = util.flux_to_mag(self.get_flux_total(selection_band)) < float(maglim)
select *= ~self["is_agn"]
c = self[select]
# Solve the parallax
parallax = np.zeros_like(c, dtype=float)
select_star = self.get_is_star()[select]
parallax[select_star] = util.distance_modulus_to_parallax(c["D"][select_star])
# Initialize the data to be written
data = {
# ID
"source_id": c["ID"],
# coordinates
"ra": c["RA"],
"dec": c["DEC"],
# magnitudes
"u": util.flux_to_mag(c["lsst-u_total"]),
"g": util.flux_to_mag(c["lsst-g_total"]),
"r": util.flux_to_mag(c["lsst-r_total"]),
"i": util.flux_to_mag(c["lsst-i_total"]),
"z": util.flux_to_mag(c["lsst-z_total"]),
"y": util.flux_to_mag(c["lsst-y_total"]),
# NOTE: for pmra see here:
# https://gea.esac.esa.int/archive/documentation/GDR2/Gaia_archive/chap_datamodel/sec_dm_main_tables/ssec_dm_gaia_source.html#:~:text=pmra%20%3A%20Proper%20motion%20in%20right,direction%20of%20increasing%20right%20ascension.
# https://pipelines.lsst.io/modules/lsst.meas.algorithms/creating-a-reference-catalog.html
"pmra": np.where(np.isfinite(c["pmracosd"]), c["pmracosd"], 0),
"pmdec": np.where(np.isfinite(c["pmdec"]), c["pmdec"], 0),
"epoch": c["RA"].size * ["2000-01-01 12:00:00"],
"parallax": parallax,
# Errors
"ra_error": np.zeros(c.size),
"dec_error": np.zeros(c.size),
"pmra_error": np.zeros(c.size),
"pmdec_error": np.zeros(c.size),
"parallax_error": np.zeros(c.size),
# Correlations/covariances
"ra_dec_corr": np.zeros(c.size),
"ra_parallax_corr": np.zeros(c.size),
"ra_pmra_corr": np.zeros(c.size),
"ra_pmdec_corr": np.zeros(c.size),
"dec_parallax_corr": np.zeros(c.size),
"dec_pmra_corr": np.zeros(c.size),
"dec_pmdec_corr": np.zeros(c.size),
"parallax_pmra_corr": np.zeros(c.size),
"parallax_pmdec_corr": np.zeros(c.size),
"pmra_pmdec_corr": np.zeros(c.size),
}
# Write reference catalog.csv
with open(filename, "w") as f:
to_write = ",".join(data.keys())
print(f"{to_write}", file=f)
for i in range(c["RA"].size):
to_write = ",".join([f"{v[i]}" for v in data.values()])
print(f"{to_write}", file=f)
# Run the LSST tools on top
dirname = f"{os.path.dirname(filename)}/reference_catalog"
if os.path.exists(dirname):
os.system(f"rm -rfv {dirname}")
util.create_directory(dirname + "/")
os.system(f"convertReferenceCatalog {dirname} etc/config_reference_catalog.py {filename}")
logger.info(f"Wrote reference catalog {filename}")
[docs]
def get_area(self):
"""Return the EGG catalog area in deg2."""
try:
cmd = util.read_fits(f"{self.dirname}/egg.fits", columns="CMD")[0]
area_string = re.findall("area=([0-9.]+)", cmd)[0]
return float(area_string)
except OSError:
logger.warning("Could not find egg.fits, defaulting to DR1 area of 24deg2")
return 24.0
[docs]
def __getitem__(self, key):
"""Return combined catalog value corresponding to 'key'."""
return self.catalog_combined[key]
[docs]
def get_luminosity_function(
self,
key,
bins,
zmin,
zmax,
select=None,
values=None,
nmin=1,
deredden=False,
use_occupation_fraction=False,
):
"""
Return the expected luminosity function.
Can be used to estimate any function e.g. LX, Mstar.
Parameters
----------
key: str
name of the column in the truth catalog
bins: array_like
bins for the histogram
zmin: float
minimum redshift
zmax: float
maximum redshift
select: array_like
boolean vector for selecting values
values: array_like
values to use instead of the value corresponding to 'key'
nmin: int
minimum allowed counts per bin
deredden: bool
for magnitudes, deredden the magnitude before computing the LF
use_occupation_fraction: bool
weight LF by occupation fraction
"""
# select in redshift
z = self["Z"]
if select is None:
select = np.ones_like(z, dtype=bool)
select = np.where((zmin <= z) & (z < zmax), select, 0.0)
if values is None:
values = deepcopy(self[key])
# deredden AGN magnitudes?
if deredden:
from lsst_inaf_agile.catalog_agn import CatalogAGN
logger.info("Dereddening the AGN magnitudes...")
is_agn = self["is_agn"] == 1
values[is_agn] = CatalogAGN.get_magnitude_agn_dereddened(
key,
self[key],
self["E_BV"],
)[is_agn]
# weight by occupation fraction?
if use_occupation_fraction:
from lsst_inaf_agile.mbh import get_occupation_fraction
logger.info("Weighting by the occupation fraction...")
is_galaxy = self["Z"] > 0
select = select.astype(np.float64)
select[is_galaxy] *= get_occupation_fraction(self["M"][is_galaxy])
# adjust zmin/zmax for the volume calculation
zvec = z[np.isfinite(z)]
if zmin < zvec.min() or zmax > zvec.max():
logger.warning(f"Clipping {zmin} {zmax} to catalog range...")
zmin, zmax = np.clip([zmin, zmax], zvec.min(), zvec.max())
# return the luminosity function
return util.get_key_function(
bins,
values,
select,
zmin=zmin,
zmax=zmax,
area_deg2=self.get_area(),
nmin=nmin,
)