Source code for lsst_inaf_agile.catalog_combined

#!/usr/bin/env python3
# Author: Akke Viitanen
# Email: akke.viitanen@helsinki.fi
# Date: 2023-02-13 16:20:51

"""Module for the combined catalog of AGNs, galaxies, and stars."""

import glob
import logging
import os
import re
from copy import deepcopy

import fitsio
import numpy as np

from lsst_inaf_agile import util

[docs] logger = logging.getLogger(__name__)
[docs] class CatalogCombined: """ Combined catalog class of AGN, galaxies, and stars. If the catalog does not exist, then create it. If the catalog exists as a FITS file, try to load it. Finally, if there is a database containing the catalog, return that one instead. Parameters ---------- dirname: str Directory name to contain the catalog. catalog_galaxy: catalog_galaxy.CatalogGalaxy or None Galaxy catalog. catalog_agn: catalog_agn.CatalogAGN or None AGN catalog. catalog_star: catalog_star.CatalogStar or None Single star catalog. catalog_binary: catalog_star.CatalogStar or None Binary star catalog. sql_query: str Input SQL query if a database file is read. cache: bool Use an existing catalog whenever available. Otherwise, always overwrite the one in disk. """ def __init__( self, dirname: str, catalog_galaxy=None, catalog_agn=None, catalog_star=None, catalog_binary=None, sql_query=None, cache=True, ): """ Initialize CombinedCatalog. """
[docs] self.dirname = dirname
[docs] self.catalog_galaxy = catalog_galaxy
[docs] self.catalog_agn = catalog_agn
[docs] self.catalog_star = catalog_star
[docs] self.catalog_binary = catalog_binary
# NOTE: short-circuit for an existing truth catalog if cache and os.path.exists(filename := self.get_filename()): logger.info(f"Found catalog FITS file {filename}") self.catalog_combined = util.read_fits(filename) return # NOTE: short-circuit for an existing database truth catalog if ( cache and sql_query is not None and (filenames := glob.glob(f"{self.dirname}/db/**/master.db", recursive=True)) ): import sqlite3 import pandas as pd from astropy.table import Table filename = sorted(filenames)[-1] logger.info(f"Found database file {filename}") with sqlite3.connect(filename) as con: df = pd.read_sql_query(sql_query, con) self.catalog_combined = Table.from_pandas(df) return # Create the catalog
[docs] self.catalog_combined = self.get_catalog_combined()
self.catalog_combined = self.postprocess() self.write()
[docs] def get_catalogs(self): """Return the non-None catalogs.""" ret = [] for c in self.catalog_galaxy, self.catalog_agn, self.catalog_star, self.catalog_binary: if c is not None: ret.append(c) return ret
[docs] def get_filename(self): """Return the FITS filename of the combined catalog.""" return f"{self.dirname}/catalog.fits"
[docs] def get_dtype(self): """ Return the combined catalog dtype constructed from the individual input catalogs. Returns ------- List of (name, type) corresponding to the names and types of the columns. """ dtype = {} for catalog in self.get_catalogs(): for name in catalog.catalog.dtype.names: dtype[name] = catalog.catalog.dtype[name] for band in self.catalog_galaxy.bands: dtype[band + "_total"] = np.float64 for band in self.catalog_galaxy.rfbands: dtype["magabs_" + band + "_total"] = np.float64 return [(k, v) for k, v in dtype.items()]
[docs] def get_number_galaxy_star_binary(self): """ Return the tuple (N_galaxy, N_star, N_binary). N is the number of objects in the combined catalog. """ return ( len(self.catalog_galaxy.catalog), len(self.catalog_star.catalog), len(self.catalog_binary.catalog), )
[docs] def get_catalog_combined(self): r""" Construct the combined catalog based on the dtype. For each column in the catalog, the column is looked for in the AGN/galaxy/star/binary catalogs. Upon a match, the corresponding rows are assigned the values from the reference catalog. Note that some columns may appear in multiple catalog. The indices correspond to the following object types: AGN \in [0, n_galaxy[, galaxy \in [0, n_galaxy[, star \in [n_galaxy, n_galaxy + n_star], and binary \in [n_galaxy + n_star, n_total[. Returns ------- The combined catalog as a numpy ndarray. """ n_galaxy, n_star, n_binary = self.get_number_galaxy_star_binary() n_total = n_galaxy + n_star + n_binary catalog_combined = np.zeros(n_total, dtype=self.get_dtype()) # Set the catalog ID catalog_combined["ID"] = np.arange(n_total) # Fetch the values for the columns for name in catalog_combined.dtype.names: logger.info(f"Setting attribute {name} {catalog_combined.dtype[name]}") # Handles galaxy if name in self.catalog_agn.catalog.dtype.names: catalog_combined[name][:n_galaxy] = self.catalog_agn[name] # Handles AGN if name in self.catalog_galaxy.catalog.dtype.names: catalog_combined[name][:n_galaxy] = self.catalog_galaxy[name] # Handles star if name in self.catalog_star.catalog.dtype.names: catalog_combined[name][n_galaxy : n_galaxy + n_star] = self.catalog_star[name] # NOTE: set the flags to False for stars elif catalog_combined.dtype[name] == np.dtype("bool"): catalog_combined[name][n_galaxy : n_galaxy + n_star] = False # Handles binary star if name in self.catalog_binary.catalog.dtype.names: catalog_combined[name][n_galaxy + n_star :] = self.catalog_binary[name] # NOTE: set the flags to False for stars elif catalog_combined.dtype[name] == np.dtype("bool"): catalog_combined[name][n_galaxy + n_star :] = False return catalog_combined
[docs] def postprocess(self): """Add columns in post-processing.""" for band in self.catalog_galaxy.bands: logger.info(f"Postprocessing flux {band}") self.catalog_combined[band + "_total"] = self.get_flux_total(band, False) for band in self.catalog_galaxy.rfbands: logger.info(f"Postprocessing magabs {band}") self.catalog_combined["magabs_" + band + "_total"] = self.get_flux_total(band, True) return self.catalog_combined
[docs] def get_flux_total(self, band, rest_frame=False): """ Return the total flux. The total flux is defined for each type of object as follows: - AGN: sum of galaxy bulge, galaxy disk, and point fluxes - galaxy: sum of galaxy bulge and disk fluxes - star: point fluxes - binary star: sum of the component point fluxes """ # NOTE: override rest-frame behavior as EGG does not have absmags for # disk/bulge separately keys = ["disk", "bulge", "point"] if rest_frame: keys = ["", "point"] ret = np.zeros(self.catalog_combined.size) for p in keys: key = "magabs_" * rest_frame + f"{band}" + f"_{p}" * (p != "") flux = self.catalog_combined[key] if rest_frame: flux = util.mag_to_flux(flux) ret += np.where(np.isfinite(flux), flux, 0.0) if rest_frame: ret = util.flux_to_mag(ret) return ret
[docs] def write(self): """Write the combined catalog to a FITS file.""" fitsio.write(self.get_filename(), self.catalog_combined, clobber=True) return self.catalog_combined
[docs] def ingest(self, filename_database, name_table, if_exists="replace"): """ Ingest the truth catalog into a sqlite3 database. Parameters ---------- filename_database: str filename of the sqlite3 database name_table: str name of the table in the sqlite3 database if_exists: str sqlite3 action to handle an existing table """ import sqlite3 from astropy.table import Table logger.info("Creating the directory ...") util.create_directory(filename_database) logger.info("Creating the pandas dataframe ...") table = Table(self.catalog_combined) df = table.to_pandas() logger.info("Ingesting ...") with sqlite3.connect(filename_database) as con: df.to_sql(name_table, con, index=False, if_exists=if_exists, chunksize=1024, method="multi")
[docs] def get_is_galaxy(self): """Return boolean vector selecting galaxies.""" return self.catalog_combined["Z"] > 0
[docs] def get_is_star(self): """Return boolean vector selecting stars.""" return ~self.get_is_galaxy()
[docs] def get_index_star(self, i): """ Return the index in the stellar catalog corresponding to 'i' in the combined catalog. Parameters ---------- i: int ID of the truth catalog object """ n_galaxy, n_star, n_binary = self.get_number_galaxy_star_binary() # Handles single star if n_galaxy <= i < n_galaxy + n_star: return i - n_galaxy # Handles binary star if n_galaxy + n_star <= i: return i - n_galaxy - n_star # Handles something else return None
[docs] def write_reference_catalog(self, filename: str, maglim: float, selection_band: str): """ Write a mock LSST reference catalog in CSV format to the given 'filename'. The reference catalog is cut to brighter than the magnitude limit 'maglim' in the band 'selection_band'. """ logger.info("Writing the reference catalog...") if os.path.exists(filename): logger.info(f"{filename} exists, will not overwrite.") return # Cut at some magnitude limit e.g. r < 24 and non-variable sources select = util.flux_to_mag(self.get_flux_total(selection_band)) < float(maglim) select *= ~self["is_agn"] c = self[select] # Solve the parallax parallax = np.zeros_like(c, dtype=float) select_star = self.get_is_star()[select] parallax[select_star] = util.distance_modulus_to_parallax(c["D"][select_star]) # Initialize the data to be written data = { # ID "source_id": c["ID"], # coordinates "ra": c["RA"], "dec": c["DEC"], # magnitudes "u": util.flux_to_mag(c["lsst-u_total"]), "g": util.flux_to_mag(c["lsst-g_total"]), "r": util.flux_to_mag(c["lsst-r_total"]), "i": util.flux_to_mag(c["lsst-i_total"]), "z": util.flux_to_mag(c["lsst-z_total"]), "y": util.flux_to_mag(c["lsst-y_total"]), # NOTE: for pmra see here: # https://gea.esac.esa.int/archive/documentation/GDR2/Gaia_archive/chap_datamodel/sec_dm_main_tables/ssec_dm_gaia_source.html#:~:text=pmra%20%3A%20Proper%20motion%20in%20right,direction%20of%20increasing%20right%20ascension. # https://pipelines.lsst.io/modules/lsst.meas.algorithms/creating-a-reference-catalog.html "pmra": np.where(np.isfinite(c["pmracosd"]), c["pmracosd"], 0), "pmdec": np.where(np.isfinite(c["pmdec"]), c["pmdec"], 0), "epoch": c["RA"].size * ["2000-01-01 12:00:00"], "parallax": parallax, # Errors "ra_error": np.zeros(c.size), "dec_error": np.zeros(c.size), "pmra_error": np.zeros(c.size), "pmdec_error": np.zeros(c.size), "parallax_error": np.zeros(c.size), # Correlations/covariances "ra_dec_corr": np.zeros(c.size), "ra_parallax_corr": np.zeros(c.size), "ra_pmra_corr": np.zeros(c.size), "ra_pmdec_corr": np.zeros(c.size), "dec_parallax_corr": np.zeros(c.size), "dec_pmra_corr": np.zeros(c.size), "dec_pmdec_corr": np.zeros(c.size), "parallax_pmra_corr": np.zeros(c.size), "parallax_pmdec_corr": np.zeros(c.size), "pmra_pmdec_corr": np.zeros(c.size), } # Write reference catalog.csv with open(filename, "w") as f: to_write = ",".join(data.keys()) print(f"{to_write}", file=f) for i in range(c["RA"].size): to_write = ",".join([f"{v[i]}" for v in data.values()]) print(f"{to_write}", file=f) # Run the LSST tools on top dirname = f"{os.path.dirname(filename)}/reference_catalog" if os.path.exists(dirname): os.system(f"rm -rfv {dirname}") util.create_directory(dirname + "/") os.system(f"convertReferenceCatalog {dirname} etc/config_reference_catalog.py {filename}") logger.info(f"Wrote reference catalog {filename}")
[docs] def get_area(self): """Return the EGG catalog area in deg2.""" try: cmd = util.read_fits(f"{self.dirname}/egg.fits", columns="CMD")[0] area_string = re.findall("area=([0-9.]+)", cmd)[0] return float(area_string) except OSError: logger.warning("Could not find egg.fits, defaulting to DR1 area of 24deg2") return 24.0
[docs] def __getitem__(self, key): """Return combined catalog value corresponding to 'key'.""" return self.catalog_combined[key]
[docs] def get_luminosity_function( self, key, bins, zmin, zmax, select=None, values=None, nmin=1, deredden=False, use_occupation_fraction=False, ): """ Return the expected luminosity function. Can be used to estimate any function e.g. LX, Mstar. Parameters ---------- key: str name of the column in the truth catalog bins: array_like bins for the histogram zmin: float minimum redshift zmax: float maximum redshift select: array_like boolean vector for selecting values values: array_like values to use instead of the value corresponding to 'key' nmin: int minimum allowed counts per bin deredden: bool for magnitudes, deredden the magnitude before computing the LF use_occupation_fraction: bool weight LF by occupation fraction """ # select in redshift z = self["Z"] if select is None: select = np.ones_like(z, dtype=bool) select = np.where((zmin <= z) & (z < zmax), select, 0.0) if values is None: values = deepcopy(self[key]) # deredden AGN magnitudes? if deredden: from lsst_inaf_agile.catalog_agn import CatalogAGN logger.info("Dereddening the AGN magnitudes...") is_agn = self["is_agn"] == 1 values[is_agn] = CatalogAGN.get_magnitude_agn_dereddened( key, self[key], self["E_BV"], )[is_agn] # weight by occupation fraction? if use_occupation_fraction: from lsst_inaf_agile.mbh import get_occupation_fraction logger.info("Weighting by the occupation fraction...") is_galaxy = self["Z"] > 0 select = select.astype(np.float64) select[is_galaxy] *= get_occupation_fraction(self["M"][is_galaxy]) # adjust zmin/zmax for the volume calculation zvec = z[np.isfinite(z)] if zmin < zvec.min() or zmax > zvec.max(): logger.warning(f"Clipping {zmin} {zmax} to catalog range...") zmin, zmax = np.clip([zmin, zmax], zvec.min(), zvec.max()) # return the luminosity function return util.get_key_function( bins, values, select, zmin=zmin, zmax=zmax, area_deg2=self.get_area(), nmin=nmin, )