#!/usr/bin/env python3
# Author: Akke Viitanen
# Email: akke.viitanen@helsinki.fi
# Date: 2023-02-13 16:20:51
"""Create Galaxy+AGN mocks for the LSST Italian AGN in-kind contribution."""
import logging
import multiprocessing
import os
import time
from dataclasses import dataclass
from itertools import product
import astropy.units as u
import fitsio
import numpy as np
import pandas as pd
from mock_catalog_SED.qsogen_4_catalog import qsosed
from lsst_inaf_agile import lightcurve, lusso2010, sed, util, zou2024
from lsst_inaf_agile.catalog_galaxy import CatalogGalaxy
from lsst_inaf_agile.mbh import get_log_mbh_continuity_new2
from lsst_inaf_agile.merloni2014 import Merloni2014
[docs]
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
[docs]
handler = logging.StreamHandler()
handler.setLevel(logging.DEBUG)
handler.setFormatter(formatter)
logger.addHandler(handler)
# Eddington ratio in cgs
[docs]
LOG_LAMBDA_EDD = np.log10(1.26e38)
# Maximum allowed E(B-V)
# Optimization: store the pre-computed AGN fluxes
# key is (ID, band, is_rest_frame)
[docs]
FLUXES: dict[tuple[int, str, bool], float] = {}
@dataclass
[docs]
class QsogenPosteriorDistribution:
[docs]
filename: str = "data/posteriors/posterior_frozen.dat"
[docs]
posterior_distribution = pd.read_csv(filename, sep=" ")
[docs]
parameter_names = posterior_distribution.columns
[docs]
QPODI = QsogenPosteriorDistribution()
[docs]
class CatalogAGN:
"""
AGN catalog class.
Parameters
----------
dirname: str
directory name to write the catalog to
catalog_galaxy: CatalogGalaxy
underlying galaxy catalog
type_plambda: str
assumed p(lambda)
save_sed: bool
save AGN SEDs to file
seed: int
random number seed
merloni2014: Merloni2014
assumed Merloni2014 obscuration model
filter_db: str, optional
filename of the EGG filter database (db.dat)
overwrite: bool, optional
overwrite an existing AGN catalog
"""
def __init__(
self,
dirname: str,
catalog_galaxy: CatalogGalaxy,
type_plambda: str,
save_sed: bool,
seed: int,
merloni2014: Merloni2014,
filter_db: str = "data/egg/share/filter-db/db.dat",
qsogen_posterior_distribution: QsogenPosteriorDistribution = QPODI,
overwrite: bool = False,
):
"""
Initialize the AGN catalog.
"""
[docs]
self.catalog_galaxy = catalog_galaxy
[docs]
self.type_plambda = type_plambda
[docs]
self.save_sed = save_sed
[docs]
self.merloni2014 = merloni2014
[docs]
self.filter_db = filter_db
[docs]
self.qsogen_posterior_distribution = qsogen_posterior_distribution
# NOTE: mock-1000-4000 column is needed in the galaxy catalog
if "magabs_mock-1000-4000" not in self.catalog_galaxy.catalog.dtype.names:
raise ValueError("Galaxy catalog must have magabs_mock-1000-4000 flux column")
# Set the seed
if self.seed is not None:
self.seed = int(self.seed)
np.random.seed(self.seed)
# Create the catalog
[docs]
self.catalog = self.get_catalog(overwrite)
[docs]
def __getitem__(self, key):
"""Return AGN catalog column corresponding to key."""
if key not in self.catalog.dtype.names:
return self.catalog_galaxy[key]
return self.catalog[key]
@staticmethod
[docs]
def get_columns(bands: list[str], rfbands: list[str]):
"""
Return the names, types, descriptions, and units of each column.
Parameters
----------
bands: list
egg-style list of apparent magnitude bands
rfbands: list
egg-style list of absolute magnitude bands
Returns
-------
List of 4-tuples of column names, types, descriptions, and units.
"""
return (
[
("ID", np.int64, "unique ID", ""),
("log_lambda_SAR", np.float64, "log10 of specific accretion rate", "erg/s/Msun"),
("is_agn_ctn", bool, "is Compton-thin AGN", ""),
("is_agn_ctk", bool, "is Compton-thick AGN", ""),
("is_agn", bool, "is Compton-thin or Compton-thick AGN", ""),
("log_LX_2_10", np.float64, "log10 of 2-10 keV intrinsic X-ray luminosity", "erg/s"),
(
"log_FX_2_10",
np.float64,
"log10 of 2-10 keV intrinsic X-ray flux at redshift",
"erg/cm2/s",
),
("log_FX_2_7", np.float64, "log10 of 2-7 keV intrinsic X-ray flux at redshift", "erg/cm2/s"),
("is_optical_type2", bool, "is optical AGN type 2", ""),
("E_BV", np.float64, "B-V color extinction", "mag"),
("log_L_2_keV", np.float64, "Monochromatic 2 keV luminosity", "erg/s/Hz"),
("log_L_2500", np.float64, "Monochromatic 2500keV luminosity", "erg/s/Hz"),
("MBH", np.float64, "Black hole mass", "Msun"),
("log_L_bol", np.float64, "Bolometric AGN luminosity", "erg/s"),
("occupation_fraction", np.float64, "SMBH occupation fraction", ""),
("has_bh", bool, "Galaxy contains a SMBH according to the occupation fraction", ""),
]
+ [(b.strip() + "_point", np.float64, f"AGN {b} flux", "uJy") for b in bands]
+ [
("magabs_" + b.strip() + "_point", np.float64, f"AGN {b} absolute magnitude", "ABmag")
for b in bands
]
)
[docs]
def get_dtype(self) -> np.dtype:
"""Return the numpy dtype corresponding to the column names and types."""
i = []
for n, t, _, _ in self.get_columns(self.catalog_galaxy.bands, self.catalog_galaxy.rfbands):
i += [(n, t)]
return np.dtype(i)
[docs]
def get_catalog(self, overwrite):
"""Build the catalog column-by-column and write to FITS."""
# Short-circuit for existing catalog
filename = f"{self.dirname}/agn.fits"
if os.path.exists(filename) and not overwrite:
logger.info(f"Returning an existing AGN catalog {filename}")
return util.read_fits(filename)
logger.info(f"Creating the AGN catalog {filename}")
self.catalog = np.empty_like(self.catalog_galaxy["ID"], dtype=self.get_dtype())
for col, _, _, _ in self.get_columns(self.catalog_galaxy.bands, self.catalog_galaxy.rfbands):
if col in self.catalog_galaxy.get_dtype().names:
self.catalog[col] = self.catalog_galaxy[col]
continue
self.catalog[col] = self.get_agn(col)
# Write the catalog
fitsio.write(filename, self.catalog, clobber=True)
return self.catalog
@staticmethod
[docs]
def _get_seed(key):
"""
Set the random number seed.
The random number seed is generated programmatically from the name of
the column.
Examples
--------
>>> CatalogAGN._get_seed("foo")
33048
>>> CatalogAGN._get_seed("bar")
30282
>>> CatalogAGN._get_seed("baz")
31066
"""
# NOTE: numpy random seed must be between 0 and 2 ** 32 - 1
# NOTE: hash is not persistent between python runs. Use a custom hash
# function instead based on the name of the key
seed = ord(key[0]) * sum(ord(k) for k in key)
return seed % (2**32 - 1)
[docs]
def get_agn(self, key):
"""Get AGN attribute corresponding to the column 'key'."""
logger.info(f"Getting AGN attribute {key}")
ret = None
func = getattr(self, f"get_{key}", None)
# Get the seed
seed = self._get_seed(key)
np.random.seed(seed)
logger.info(f"Setting random seed to {seed}")
if func:
ret = func()
exec(f"self.{key} = ret")
elif "_point" in key:
ret = self.get_flux_agn(
band=key.replace("magabs_", "").replace("_point", ""), rest_frame="magabs_" in key
)
return ret
[docs]
def get_log_lambda_SAR(self, add_ctk=True) -> np.ndarray:
"""Return log10 of the specific black hole accretion rate in erg/s/Msun."""
if self.type_plambda.lower() != "zou+2024":
raise ValueError
# Get the relevant galaxy properties
m = self["M"]
z = self["Z"]
p = self["PASSIVE"]
log_lambda_SAR = np.full_like(z, np.nan)
# NOTE: short-circuit for CTN only as it is significantly faster
if not add_ctk:
logger.info("Assigning log_lambda_SAR for CTN...")
for t in ["star-forming", "quiescent"]:
select = p == (1 if t == "quiescent" else 0)
U = np.random.rand(select.sum())
log_lambda_SAR[select] = zou2024.get_inv_log_Plambda(np.log10(U), m[select], z[select], t)
return log_lambda_SAR
logger.info("Building the arguments...")
base_seed = self._get_seed("log_lambda_SAR")
arguments = [
(i, self.catalog.size, m[i], z[i], "quiescent" if p[i] == 1 else "star-forming", base_seed + i)
for i in np.arange(self.catalog.size)
]
cpu_count = multiprocessing.cpu_count()
logger.info(f"Assigning log_lambda_SAR N={log_lambda_SAR.size} with {cpu_count=}...")
t0 = time.time()
with multiprocessing.Pool(processes=cpu_count) as p:
log_lambda_SAR = p.starmap(util.get_log_lambda_SAR, arguments)
t1 = time.time()
logger.info(f"Assigned {len(log_lambda_SAR)} objects in {t1 - t0} seconds")
return np.array(log_lambda_SAR)
[docs]
def get_log_LX_2_10(self):
"""Return log10 of the intrinsic 2-10 keV X-ray luminosity in erg/s."""
# NOTE: convention of assigning negative lambda to CTK AGN
x = np.where(np.isfinite(self["log_lambda_SAR"]), np.abs(self["log_lambda_SAR"]), -np.inf)
return x + self["M"]
[docs]
def get_log_FX_2_10(self, Gamma=1.9):
"""Return log10 of the intrinsic 2-10 keV X-ray flux in erg/s/cm2 in the observer frame."""
z = self["Z"].astype(np.float64)
d = self["D"].astype(np.float64) * u.Mpc.to(u.cm)
# NOTE: 1 + z dependence from K-correction
return self["log_LX_2_10"] - np.log10(4 * np.pi * d**2) + (2 - Gamma) * np.log10(1 + z)
[docs]
def get_log_FX_2_7(self, Gamma=1.9):
"""
Return log10 of the intrinsic 2-7 keV X-ray flux in erg/s/cm2 in the observer frame.
Band conversion is done from the 2-10 keV assuming power-law index
'Gamma'.
Parameters
----------
Gamma: float
Assumed power-law index for the band conversion.
"""
return np.log10(util.convert_flux(10 ** self["log_FX_2_10"], 2.0, 10.0, 2.0, 7.0, Gamma=Gamma))
[docs]
def get_log_L_2_keV(self, Gamma=1.9, wavelength=6.2):
"""
Return monochromatic X-ray luminosity at lambda = wavelength in erg/s/Hz.
To be used for the alpha_ox Lx = restframe 2-10 kev luminosity.
maybe we can work directly with frequencies and have it in one line
"""
Lx = 10 ** self["log_LX_2_10"]
K = (Lx / (6.2 ** (Gamma - 2) - 1.24 ** (Gamma - 2))) * (Gamma - 2)
return np.log10((K * wavelength ** (Gamma - 1)) / 2.998e18)
[docs]
def get_log_L_2500(self, alpha=0.952, beta=2.138, scatter=0.40):
"""
Return the 2500 angstrom° monochromatic luminosity (in erg/s).
Uses Lusso+10 Eq. 5 (inverted) assuming Lx = alpha L_opt - beta
"""
return lusso2010.get_log_luminosity_2500(self["log_L_2_keV"], alpha, beta, scatter)
[docs]
def get_is_optical_type2(self, func_get_f_obs=None, use_f_obs_for_ctk_agn=False):
"""
Return boolean vector selecting optical type2 AGN.
The optical type2 AGN fraction is evaluated based on Merloni+2014
assuming obscured fraction as a function of intrinsic X-ray luminosity
and redshift.
"""
select_ctk = self["is_agn_ctk"]
if func_get_f_obs is None:
func_get_f_obs = self.merloni2014.get_f_obs
f_obs = func_get_f_obs(self["Z"], self["log_LX_2_10"])
# Randomize type2 based on obscured fraction
ret = np.random.rand(self["Z"].size) < f_obs
if not use_f_obs_for_ctk_agn:
ret[select_ctk] = True
return ret
[docs]
def get_E_BV(
self,
# alpha_1=15.19373112, # NOTE: These type1 AGN parameters are from COSMOS (Bon+).
# n_1=1.58310706,
alpha_1=7.93483055, # NOTE: these parameters are derived from Zou+ catalog from LSST DDFs.
n_1=2.97565676,
alpha_2=11.6133635,
n_2=1.42972,
mu_type_2=0.3,
ebv_ctk=EBV_MAX,
):
"""
Return AGN reddening E(B-V) in ABmag.
The E(B-V) is assumed to be different for type1 and type2 AGN, while
CTK AGN are assumed to be all type2.
"""
type_1_ebv = np.linspace(0, 1, 101)
type_2_ebv = np.linspace(0, 3, 301)
def sample_ebv(N_AGN, probability_distribution, ebv_range, *args):
cumulative = np.cumsum(probability_distribution(ebv_range, *args))
cumulative /= np.max(cumulative)
return np.interp(np.random.rand(N_AGN), cumulative, ebv_range)
def hopkins04(x, alpha, n):
y = 1 / (1 + (x * alpha) ** n)
return y / np.trapezoid(y, x)
ebv = np.empty(self.catalog.size)
type_1_optical = ~self["is_optical_type2"]
type_2_optical = self["is_optical_type2"]
# N_type_1 = np.sum(type_1_optical)
# N_type_2 = np.sum(type_2_optical)
ebv[type_1_optical] = sample_ebv(type_1_optical.sum(), hopkins04, type_1_ebv, alpha_1, n_1)
ebv[type_2_optical] = (
sample_ebv(type_2_optical.sum(), hopkins04, type_2_ebv, alpha_2, n_2) + mu_type_2
)
## NOTE: set CTK AGN E(B-V) to 9 (arbitrarily high number) manually...
if ebv_ctk:
is_ctk = self["is_agn_ctk"]
ebv[is_ctk] = ebv_ctk
return ebv
[docs]
def get_is_agn_ctn(self):
"""Render a random sample of AGN "active" according to the duty cycle."""
# NOTE: avoid non-AGN and CTK AGN when assigning this flag
return np.isfinite(self["log_lambda_SAR"]) * self["log_lambda_SAR"] >= 32.00
[docs]
def get_is_agn_ctk(self):
"""Assign CTK AGN fraction randomly."""
# NOTE: convention of assigning negative lambda to CTK AGN
x = self["log_lambda_SAR"]
is_agn_ctk = np.isfinite(x) * (x <= -32)
return is_agn_ctk
[docs]
def get_is_agn(self):
"""Return boolean vector selecting CTN or CTK AGN."""
return self["is_agn_ctn"] + self["is_agn_ctk"]
[docs]
def get_sed(self, i):
"""
Return an AGN SED.
Parameters
----------
i: int
ID of AGN for which SED is returned.
Returns
-------
lambda: array_like
Wavelength in microns.
flux: array_like
observer frame flux in microjanskies.
"""
filename = f"{self.dirname}/seds/agn-seds-{i}.fits"
if not os.path.exists(filename):
return None
fits = util.read_fits(filename)
return fits["LAMBDA"][0], fits["FLUX"][0]
[docs]
def _get_sed(self, i, ratio_max: float = 0.90, dlog_wav: float = 7.65e-4):
"""
Generate an AGN SED.
Parameters
----------
i: int
ID of AGN for which SED is returned.
ratio_max: float, optional
Maximum allowed ratio flux_agn / flux_total for type2 AGN.
dlog_wav: float, optional
SED wavelength resolution in dex.
"""
logger.debug(f"Getting AGN SED {i} {self.catalog.size}")
# Get the wavlen in angstrom
# NOTE: Add some interesting wavelengths for greater accuracy
wavlen = 10 ** np.arange(np.log10(500), np.log10(250000) + dlog_wav, dlog_wav)
wavlen = np.append(wavlen, [1450, 4400, 5007, 150000])
wavlen = np.sort(wavlen)
# Populate the SED
while True:
# Initialize the rng seed
np.random.seed(self["ID"][i])
agn_sed = qsosed.Quasar_sed(
LogL2500=self["log_L_2500"][i],
AGN_type=1 + self["is_optical_type2"][i],
ebv=self["E_BV"][i],
physical_units=True,
wavlen=wavlen,
LogL2kev=self["log_L_2_keV"][i],
add_NL=self["is_optical_type2"][i],
NL_normalization="lamastra",
Av_lines=self.catalog_galaxy["AVLINES_BULGE"][i] + self.catalog_galaxy["AVLINES_DISK"][i],
**dict(
zip(
self.qsogen_posterior_distribution.parameter_names,
*self.qsogen_posterior_distribution.posterior_distribution.sample().values,
strict=False,
)
),
)
# Check for type2 AGN flux NOT exceeding the host galaxy flux by
# some limit
if self["is_optical_type2"][i] and self["E_BV"][i] <= EBV_MAX:
lam, flux_agn = util.luminosity_to_flux(
agn_sed.wavlen.value,
agn_sed.lum.value,
redshift=0.0,
distance_in_cm=10 * u.pc.to(u.cm),
)
flux_agn = sed.get_flux_band(lam, flux_agn, band="mock-1000-4000", filter_db=self.filter_db)
mag_gal = self.catalog_galaxy["magabs_mock-1000-4000"][i]
flux_gal = util.mag_to_flux(mag_gal)
ratio = flux_agn / (flux_gal + flux_agn)
if ratio > ratio_max:
logger.info(
"AGN 1000-4000 angstrom >90% of the total... Incrementing E(B-V) by 0.10..."
+ f" {self['ID'][i]:6d}"
+ f" {np.log10(flux_agn):.2f}"
+ f" {np.log10(flux_gal):.2f}"
+ f" {ratio:.2f}"
+ f" {self['E_BV'][i]:.2f}"
+ f" {(self['E_BV'][i] + 0.10):.2f}"
)
self["E_BV"][i] += 0.10
continue
break
# Write the file if requested
if self.save_sed:
filename = self.dirname + f"/seds/agn-seds-{self.catalog[i]['ID']}.fits"
lam, flux = util.luminosity_to_flux(
agn_sed.wavlen.value,
agn_sed.lum.value,
redshift=self["Z"][i],
distance_in_cm=self["D"][i] * u.Mpc.to(u.cm),
)
sed.write(filename, lam, flux)
# NOTE: optimization: save values from the sed for future use
for b, r in product(map(str.strip, self.catalog_galaxy.bands), [False, True]):
FLUXES[i, b, r] = self._get_flux_single(agn_sed, b, r, self["Z"][i], self["D"][i])
return agn_sed
[docs]
def _get_flux_single(self, my_sed, band, rest_frame, redshift, distance):
"""
Return a single bandbass flux by integrating the SED.
Take a rest-frame intrinsic SED and integrate it along a filter
passband. The SED is shifted to either rest-frame (10pc) or observer
frame in accordance with the parameters.
Parameters
----------
my_sed: QSOGEN SED object
Input QSOGEN SED.
band: str
EGG-style passband name.
rest_frame: bool
Is rest_frame (else observer frame).
redshift: float
Redshift of the SED
distance: float
Distance at which to evaluate the flux.
"""
# Get the flux in observed frame or rest frame
if rest_frame:
redshift = 0
distance = 1e-5 # NOTE: 10pc in Mpc
lam, flux = util.luminosity_to_flux(
my_sed.wavlen.value,
my_sed.lum.value,
redshift=redshift,
distance_in_cm=distance * u.Mpc.to(u.cm),
)
# Get the flux for the requested band
flux_band = sed.get_flux_band(lam, flux, band, filter_db=self.filter_db)
# NOTE: convert to magnitudes for the rest_frame
if rest_frame:
flux_band = util.flux_to_mag(flux_band)
return flux_band
[docs]
def get_flux_agn(self, band, rest_frame, idxs=None):
"""
Return vector of AGN pass-band fluxes.
Parameters
----------
band: str
Band to estimate the flux in e.g. 'lsst-r'.
rest_frame: bool
Return rest-frame absolute magnitude instead of observed flux.
idxs: List[int] or None
If defined, return a vector containing the fluxes for the given
indices. Otherwise returns the fluxes for the full catalog
including zero fluxes for non-AGN galaxies.
Returns
-------
flux_or_magabs: float
Estimated AGN flux(es) or absolute magnitude(s) based on the given
flags.
"""
flux = np.full(self.catalog.size, np.inf if rest_frame else 0.0)
select_idx = None
if idxs is None:
idxs = self["ID"][self["is_agn"]]
else:
select_idx = np.isin(np.arange(flux.size), idxs)
for idx in np.atleast_1d(idxs):
if (idx, band, rest_frame) not in FLUXES:
_ = self._get_sed(idx)
flux[idx] = FLUXES[idx, band, rest_frame]
if idxs is not None:
flux = flux[select_idx]
return np.squeeze(flux)
[docs]
def get_log_L_bol(self):
"""Return log10 of the AGN bolometric luminosity in erg/s."""
# NOTE: this function initializes the SEDs...
log_L_bol = np.full(self.catalog.size, -np.inf)
for i in self["ID"][self["is_agn"]]:
my_sed = self._get_sed(i)
log_L_bol[i] = np.log10(my_sed.Lbol)
return log_L_bol
[docs]
def get_MBH(
self,
sigma_total=0.50,
offset=0.00,
sigma_intrinsic=False,
sigma_observational=None,
select=None,
*args,
**kwargs,
):
"""
Return log10 of the supermassive black hole mass in Msun.
The MBH is assigned in accordance with the continuity equation after
which log-normal scatter is added to the final value of the MBH.
"""
# Get MBH
log_mbh = get_log_mbh_continuity_new2(self["M"], self["Z"], *args, **kwargs)
# Add the scatter
sigma_tot = np.random.normal(scale=sigma_total, size=log_mbh.size)
# NOTE: add the offset?
return log_mbh + sigma_tot + offset
[docs]
def get_log_L_AGN_5_GHz(self, scatter=True):
"""Return AGN radio luminosity according to the fundamental plane Gultekin+19."""
# Eq. 5
# mu = mu0 + xi_mu_R * R + xi_mu_X * X
# mu = log10(M / 1e8)
# R = log10(L_R / 1e38)
# X = log10(L_X / 1e40)
# NOTE: check gultekin sec 4.7 eq. 19
N = self.catalog.size
X = self["log_LX_2_10"] - 40
mu = self["MBH"] - 8
if scatter:
A = np.random.normal(-0.62, 0.16, N)
B = np.random.normal(0.70, 0.085, N)
# C = np.random.normal(0.74, 0.06, N)
else:
A = np.random.normal(-0.62, 0.0, N)
B = np.random.normal(0.70, 0.0, N)
# C = np.random.normal(0.74, 0.0, N)
R = A + B * X + mu
return R + 38
[docs]
def get_log_lambda_Edd(self, log_l_edd=LOG_LAMBDA_EDD):
"""Get the accretion ratio with respect to Eddington."""
return self["log_L_bol"] - log_l_edd - self["MBH"]
@staticmethod
[docs]
def get_magnitude_agn_dereddened(key, mag, ebv):
"""
Return a band-dependent dereddened optical/UV magnitude.
If a different key is given, simply return the value corresponding to
the key.
Parameters
----------
key: str
band e.g. "mock-4400" for the B-band
mag: float or array_like
vector of magnitudes to be dereddened
ebv: float or array_like
vector of E(B-V) values for the dereddening
"""
# NOTE: discussion with Ivano on 20250227
# updated with 12.6236 for UV from Ivano 20250303
a = 0.0
if ("johnson-B" in key) or ("mock-4400" in key):
logger.info(f"Dereddening in the B-band ({key})")
a = 4.0
elif "mock-1450" in key:
logger.info(f"Dereddening in the UV-band ({key})")
a = 12.6236
return mag - a * ebv
[docs]
def get_occupation_fraction(self):
"""
Return black hole occupation fraction.
The black hole occupation fraction corresponds to the probability that
a host galaxy with mass Mstar hosts a (super)massive black hole. This
could especially prevalent in the dwarf galaxy (<1e9 Msun) regime.
The black hole occupation fraction applied here is based on Zou+2025
(accepted).
"""
from lsst_inaf_agile.mbh import get_occupation_fraction
return get_occupation_fraction(self["M"])
[docs]
def get_has_bh(self):
"""
Return whether the galaxy contains a SMBH according to the occupation fraction.
"""
U = np.random.rand(self["M"].size)
return self["occupation_fraction"] > U
[docs]
def get_lightcurve(self, i, band, *args, **kwargs):
"""
Return an AGN lightcurve.
The lightcurve for an AGN is returned for the given band and MJD.
Parameters
----------
i: int
ID of AGN for which SED is returned.
band: str
EGG-style passband name.
"""
# Short-circuit for non-AGN
is_agn = self["is_agn"][i]
if not is_agn:
return None
filename = f"{self.dirname}/lightcurves/agn/{i}/{band}.npy"
if os.path.exists(filename):
logger.debug(f"Reading AGN lightcurve {filename}")
return np.load(filename)
mjd = util.get_mjd_vec()
kwargs.update(
{
"mjd0": 0.0,
"mjd": mjd,
"band": band,
"flux": self[f"{band}_point"][i],
"z": self["Z"][i],
"mag_i": self["magabs_lsst-i_point"][i],
"logMbh": self["MBH"][i],
"type2": self["is_optical_type2"][i],
"T": mjd.max() + 1,
"deltatc": 1,
"seed": self._get_seed("get_lightcurve") + i,
}
)
logger.info(f"Estimating AGN lightcurve {i} {band}")
lc = lightcurve.get_lightcurve_agn(*args, **kwargs)
logger.info(f"Writing AGN lightcurve {filename}")
util.create_directory(filename)
np.save(filename, lc)
return lc
[docs]
def get_lightcurve_mjd(self, i, band, mjd, mjd0):
"""
Return an AGN lightcurve at given MJD.
The lightcurve is first estimated assuming the timespan starting from
0. The lightcurve is then evaluated at MJD - MJD0 to return the final
value of the lightcurve.
Parameters
----------
i: int
ID of AGN for which SED is returned.
band: str
EGG-style passband name.
mjd: float or array_like
MJD of observation.
mjd0: float
MJD offset corresponding to the first light of the survey.
Returns
-------
The value of the lightcurve at given MJD in microjanskies.
Raises
------
ValueError
If mjd + mjd0 is not within [0, ten years].
"""
is_in_range = np.atleast_1d((mjd - mjd0 >= 0) & (mjd - mjd0 < 10 * 365.25))
if not np.all(is_in_range):
raise ValueError("mjd + mjd0 must be within [0, 10 year]")
lc = self.get_lightcurve(i, band)
if lc is None:
return None
return np.interp(mjd - mjd0, util.get_mjd_vec(), lc)