Source code for lsst_inaf_agile.catalog_agn

#!/usr/bin/env python3
# Author: Akke Viitanen
# Email: akke.viitanen@helsinki.fi
# Date: 2023-02-13 16:20:51

"""Create Galaxy+AGN mocks for the LSST Italian AGN in-kind contribution."""

import logging
import multiprocessing
import os
import time
from dataclasses import dataclass
from itertools import product

import astropy.units as u
import fitsio
import numpy as np
import pandas as pd
from mock_catalog_SED.qsogen_4_catalog import qsosed

from lsst_inaf_agile import lightcurve, lusso2010, sed, util, zou2024
from lsst_inaf_agile.catalog_galaxy import CatalogGalaxy
from lsst_inaf_agile.mbh import get_log_mbh_continuity_new2
from lsst_inaf_agile.merloni2014 import Merloni2014

[docs] logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
[docs] formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
[docs] handler = logging.StreamHandler()
handler.setLevel(logging.DEBUG) handler.setFormatter(formatter) logger.addHandler(handler) # Eddington ratio in cgs
[docs] LOG_LAMBDA_EDD = np.log10(1.26e38)
# Maximum allowed E(B-V)
[docs] EBV_MAX = 9.0
# Optimization: store the pre-computed AGN fluxes # key is (ID, band, is_rest_frame)
[docs] FLUXES: dict[tuple[int, str, bool], float] = {}
@dataclass
[docs] class QsogenPosteriorDistribution:
[docs] filename: str = "data/posteriors/posterior_frozen.dat"
[docs] posterior_distribution = pd.read_csv(filename, sep=" ")
[docs] parameter_names = posterior_distribution.columns
[docs] QPODI = QsogenPosteriorDistribution()
[docs] class CatalogAGN: """ AGN catalog class. Parameters ---------- dirname: str directory name to write the catalog to catalog_galaxy: CatalogGalaxy underlying galaxy catalog type_plambda: str assumed p(lambda) save_sed: bool save AGN SEDs to file seed: int random number seed merloni2014: Merloni2014 assumed Merloni2014 obscuration model filter_db: str, optional filename of the EGG filter database (db.dat) overwrite: bool, optional overwrite an existing AGN catalog """ def __init__( self, dirname: str, catalog_galaxy: CatalogGalaxy, type_plambda: str, save_sed: bool, seed: int, merloni2014: Merloni2014, filter_db: str = "data/egg/share/filter-db/db.dat", qsogen_posterior_distribution: QsogenPosteriorDistribution = QPODI, overwrite: bool = False, ): """ Initialize the AGN catalog. """
[docs] self.dirname = dirname
[docs] self.catalog_galaxy = catalog_galaxy
[docs] self.type_plambda = type_plambda
[docs] self.save_sed = save_sed
[docs] self.seed = seed
[docs] self.merloni2014 = merloni2014
[docs] self.filter_db = filter_db
[docs] self.qsogen_posterior_distribution = qsogen_posterior_distribution
# NOTE: mock-1000-4000 column is needed in the galaxy catalog if "magabs_mock-1000-4000" not in self.catalog_galaxy.catalog.dtype.names: raise ValueError("Galaxy catalog must have magabs_mock-1000-4000 flux column") # Set the seed if self.seed is not None: self.seed = int(self.seed) np.random.seed(self.seed) # Create the catalog
[docs] self.catalog = self.get_catalog(overwrite)
[docs] def __getitem__(self, key): """Return AGN catalog column corresponding to key.""" if key not in self.catalog.dtype.names: return self.catalog_galaxy[key] return self.catalog[key]
@staticmethod
[docs] def get_columns(bands: list[str], rfbands: list[str]): """ Return the names, types, descriptions, and units of each column. Parameters ---------- bands: list egg-style list of apparent magnitude bands rfbands: list egg-style list of absolute magnitude bands Returns ------- List of 4-tuples of column names, types, descriptions, and units. """ return ( [ ("ID", np.int64, "unique ID", ""), ("log_lambda_SAR", np.float64, "log10 of specific accretion rate", "erg/s/Msun"), ("is_agn_ctn", bool, "is Compton-thin AGN", ""), ("is_agn_ctk", bool, "is Compton-thick AGN", ""), ("is_agn", bool, "is Compton-thin or Compton-thick AGN", ""), ("log_LX_2_10", np.float64, "log10 of 2-10 keV intrinsic X-ray luminosity", "erg/s"), ( "log_FX_2_10", np.float64, "log10 of 2-10 keV intrinsic X-ray flux at redshift", "erg/cm2/s", ), ("log_FX_2_7", np.float64, "log10 of 2-7 keV intrinsic X-ray flux at redshift", "erg/cm2/s"), ("is_optical_type2", bool, "is optical AGN type 2", ""), ("E_BV", np.float64, "B-V color extinction", "mag"), ("log_L_2_keV", np.float64, "Monochromatic 2 keV luminosity", "erg/s/Hz"), ("log_L_2500", np.float64, "Monochromatic 2500keV luminosity", "erg/s/Hz"), ("MBH", np.float64, "Black hole mass", "Msun"), ("log_L_bol", np.float64, "Bolometric AGN luminosity", "erg/s"), ("occupation_fraction", np.float64, "SMBH occupation fraction", ""), ("has_bh", bool, "Galaxy contains a SMBH according to the occupation fraction", ""), ] + [(b.strip() + "_point", np.float64, f"AGN {b} flux", "uJy") for b in bands] + [ ("magabs_" + b.strip() + "_point", np.float64, f"AGN {b} absolute magnitude", "ABmag") for b in bands ] )
[docs] def get_dtype(self) -> np.dtype: """Return the numpy dtype corresponding to the column names and types.""" i = [] for n, t, _, _ in self.get_columns(self.catalog_galaxy.bands, self.catalog_galaxy.rfbands): i += [(n, t)] return np.dtype(i)
[docs] def get_catalog(self, overwrite): """Build the catalog column-by-column and write to FITS.""" # Short-circuit for existing catalog filename = f"{self.dirname}/agn.fits" if os.path.exists(filename) and not overwrite: logger.info(f"Returning an existing AGN catalog {filename}") return util.read_fits(filename) logger.info(f"Creating the AGN catalog {filename}") self.catalog = np.empty_like(self.catalog_galaxy["ID"], dtype=self.get_dtype()) for col, _, _, _ in self.get_columns(self.catalog_galaxy.bands, self.catalog_galaxy.rfbands): if col in self.catalog_galaxy.get_dtype().names: self.catalog[col] = self.catalog_galaxy[col] continue self.catalog[col] = self.get_agn(col) # Write the catalog fitsio.write(filename, self.catalog, clobber=True) return self.catalog
@staticmethod
[docs] def _get_seed(key): """ Set the random number seed. The random number seed is generated programmatically from the name of the column. Examples -------- >>> CatalogAGN._get_seed("foo") 33048 >>> CatalogAGN._get_seed("bar") 30282 >>> CatalogAGN._get_seed("baz") 31066 """ # NOTE: numpy random seed must be between 0 and 2 ** 32 - 1 # NOTE: hash is not persistent between python runs. Use a custom hash # function instead based on the name of the key seed = ord(key[0]) * sum(ord(k) for k in key) return seed % (2**32 - 1)
[docs] def get_agn(self, key): """Get AGN attribute corresponding to the column 'key'.""" logger.info(f"Getting AGN attribute {key}") ret = None func = getattr(self, f"get_{key}", None) # Get the seed seed = self._get_seed(key) np.random.seed(seed) logger.info(f"Setting random seed to {seed}") if func: ret = func() exec(f"self.{key} = ret") elif "_point" in key: ret = self.get_flux_agn( band=key.replace("magabs_", "").replace("_point", ""), rest_frame="magabs_" in key ) return ret
[docs] def get_log_lambda_SAR(self, add_ctk=True) -> np.ndarray: """Return log10 of the specific black hole accretion rate in erg/s/Msun.""" if self.type_plambda.lower() != "zou+2024": raise ValueError # Get the relevant galaxy properties m = self["M"] z = self["Z"] p = self["PASSIVE"] log_lambda_SAR = np.full_like(z, np.nan) # NOTE: short-circuit for CTN only as it is significantly faster if not add_ctk: logger.info("Assigning log_lambda_SAR for CTN...") for t in ["star-forming", "quiescent"]: select = p == (1 if t == "quiescent" else 0) U = np.random.rand(select.sum()) log_lambda_SAR[select] = zou2024.get_inv_log_Plambda(np.log10(U), m[select], z[select], t) return log_lambda_SAR logger.info("Building the arguments...") base_seed = self._get_seed("log_lambda_SAR") arguments = [ (i, self.catalog.size, m[i], z[i], "quiescent" if p[i] == 1 else "star-forming", base_seed + i) for i in np.arange(self.catalog.size) ] cpu_count = multiprocessing.cpu_count() logger.info(f"Assigning log_lambda_SAR N={log_lambda_SAR.size} with {cpu_count=}...") t0 = time.time() with multiprocessing.Pool(processes=cpu_count) as p: log_lambda_SAR = p.starmap(util.get_log_lambda_SAR, arguments) t1 = time.time() logger.info(f"Assigned {len(log_lambda_SAR)} objects in {t1 - t0} seconds") return np.array(log_lambda_SAR)
[docs] def get_log_LX_2_10(self): """Return log10 of the intrinsic 2-10 keV X-ray luminosity in erg/s.""" # NOTE: convention of assigning negative lambda to CTK AGN x = np.where(np.isfinite(self["log_lambda_SAR"]), np.abs(self["log_lambda_SAR"]), -np.inf) return x + self["M"]
[docs] def get_log_FX_2_10(self, Gamma=1.9): """Return log10 of the intrinsic 2-10 keV X-ray flux in erg/s/cm2 in the observer frame.""" z = self["Z"].astype(np.float64) d = self["D"].astype(np.float64) * u.Mpc.to(u.cm) # NOTE: 1 + z dependence from K-correction return self["log_LX_2_10"] - np.log10(4 * np.pi * d**2) + (2 - Gamma) * np.log10(1 + z)
[docs] def get_log_FX_2_7(self, Gamma=1.9): """ Return log10 of the intrinsic 2-7 keV X-ray flux in erg/s/cm2 in the observer frame. Band conversion is done from the 2-10 keV assuming power-law index 'Gamma'. Parameters ---------- Gamma: float Assumed power-law index for the band conversion. """ return np.log10(util.convert_flux(10 ** self["log_FX_2_10"], 2.0, 10.0, 2.0, 7.0, Gamma=Gamma))
[docs] def get_log_L_2_keV(self, Gamma=1.9, wavelength=6.2): """ Return monochromatic X-ray luminosity at lambda = wavelength in erg/s/Hz. To be used for the alpha_ox Lx = restframe 2-10 kev luminosity. maybe we can work directly with frequencies and have it in one line """ Lx = 10 ** self["log_LX_2_10"] K = (Lx / (6.2 ** (Gamma - 2) - 1.24 ** (Gamma - 2))) * (Gamma - 2) return np.log10((K * wavelength ** (Gamma - 1)) / 2.998e18)
[docs] def get_log_L_2500(self, alpha=0.952, beta=2.138, scatter=0.40): """ Return the 2500 angstrom° monochromatic luminosity (in erg/s). Uses Lusso+10 Eq. 5 (inverted) assuming Lx = alpha L_opt - beta """ return lusso2010.get_log_luminosity_2500(self["log_L_2_keV"], alpha, beta, scatter)
[docs] def get_is_optical_type2(self, func_get_f_obs=None, use_f_obs_for_ctk_agn=False): """ Return boolean vector selecting optical type2 AGN. The optical type2 AGN fraction is evaluated based on Merloni+2014 assuming obscured fraction as a function of intrinsic X-ray luminosity and redshift. """ select_ctk = self["is_agn_ctk"] if func_get_f_obs is None: func_get_f_obs = self.merloni2014.get_f_obs f_obs = func_get_f_obs(self["Z"], self["log_LX_2_10"]) # Randomize type2 based on obscured fraction ret = np.random.rand(self["Z"].size) < f_obs if not use_f_obs_for_ctk_agn: ret[select_ctk] = True return ret
[docs] def get_E_BV( self, # alpha_1=15.19373112, # NOTE: These type1 AGN parameters are from COSMOS (Bon+). # n_1=1.58310706, alpha_1=7.93483055, # NOTE: these parameters are derived from Zou+ catalog from LSST DDFs. n_1=2.97565676, alpha_2=11.6133635, n_2=1.42972, mu_type_2=0.3, ebv_ctk=EBV_MAX, ): """ Return AGN reddening E(B-V) in ABmag. The E(B-V) is assumed to be different for type1 and type2 AGN, while CTK AGN are assumed to be all type2. """ type_1_ebv = np.linspace(0, 1, 101) type_2_ebv = np.linspace(0, 3, 301) def sample_ebv(N_AGN, probability_distribution, ebv_range, *args): cumulative = np.cumsum(probability_distribution(ebv_range, *args)) cumulative /= np.max(cumulative) return np.interp(np.random.rand(N_AGN), cumulative, ebv_range) def hopkins04(x, alpha, n): y = 1 / (1 + (x * alpha) ** n) return y / np.trapezoid(y, x) ebv = np.empty(self.catalog.size) type_1_optical = ~self["is_optical_type2"] type_2_optical = self["is_optical_type2"] # N_type_1 = np.sum(type_1_optical) # N_type_2 = np.sum(type_2_optical) ebv[type_1_optical] = sample_ebv(type_1_optical.sum(), hopkins04, type_1_ebv, alpha_1, n_1) ebv[type_2_optical] = ( sample_ebv(type_2_optical.sum(), hopkins04, type_2_ebv, alpha_2, n_2) + mu_type_2 ) ## NOTE: set CTK AGN E(B-V) to 9 (arbitrarily high number) manually... if ebv_ctk: is_ctk = self["is_agn_ctk"] ebv[is_ctk] = ebv_ctk return ebv
[docs] def get_is_agn_ctn(self): """Render a random sample of AGN "active" according to the duty cycle.""" # NOTE: avoid non-AGN and CTK AGN when assigning this flag return np.isfinite(self["log_lambda_SAR"]) * self["log_lambda_SAR"] >= 32.00
[docs] def get_is_agn_ctk(self): """Assign CTK AGN fraction randomly.""" # NOTE: convention of assigning negative lambda to CTK AGN x = self["log_lambda_SAR"] is_agn_ctk = np.isfinite(x) * (x <= -32) return is_agn_ctk
[docs] def get_is_agn(self): """Return boolean vector selecting CTN or CTK AGN.""" return self["is_agn_ctn"] + self["is_agn_ctk"]
[docs] def get_sed(self, i): """ Return an AGN SED. Parameters ---------- i: int ID of AGN for which SED is returned. Returns ------- lambda: array_like Wavelength in microns. flux: array_like observer frame flux in microjanskies. """ filename = f"{self.dirname}/seds/agn-seds-{i}.fits" if not os.path.exists(filename): return None fits = util.read_fits(filename) return fits["LAMBDA"][0], fits["FLUX"][0]
[docs] def _get_sed(self, i, ratio_max: float = 0.90, dlog_wav: float = 7.65e-4): """ Generate an AGN SED. Parameters ---------- i: int ID of AGN for which SED is returned. ratio_max: float, optional Maximum allowed ratio flux_agn / flux_total for type2 AGN. dlog_wav: float, optional SED wavelength resolution in dex. """ logger.debug(f"Getting AGN SED {i} {self.catalog.size}") # Get the wavlen in angstrom # NOTE: Add some interesting wavelengths for greater accuracy wavlen = 10 ** np.arange(np.log10(500), np.log10(250000) + dlog_wav, dlog_wav) wavlen = np.append(wavlen, [1450, 4400, 5007, 150000]) wavlen = np.sort(wavlen) # Populate the SED while True: # Initialize the rng seed np.random.seed(self["ID"][i]) agn_sed = qsosed.Quasar_sed( LogL2500=self["log_L_2500"][i], AGN_type=1 + self["is_optical_type2"][i], ebv=self["E_BV"][i], physical_units=True, wavlen=wavlen, LogL2kev=self["log_L_2_keV"][i], add_NL=self["is_optical_type2"][i], NL_normalization="lamastra", Av_lines=self.catalog_galaxy["AVLINES_BULGE"][i] + self.catalog_galaxy["AVLINES_DISK"][i], **dict( zip( self.qsogen_posterior_distribution.parameter_names, *self.qsogen_posterior_distribution.posterior_distribution.sample().values, strict=False, ) ), ) # Check for type2 AGN flux NOT exceeding the host galaxy flux by # some limit if self["is_optical_type2"][i] and self["E_BV"][i] <= EBV_MAX: lam, flux_agn = util.luminosity_to_flux( agn_sed.wavlen.value, agn_sed.lum.value, redshift=0.0, distance_in_cm=10 * u.pc.to(u.cm), ) flux_agn = sed.get_flux_band(lam, flux_agn, band="mock-1000-4000", filter_db=self.filter_db) mag_gal = self.catalog_galaxy["magabs_mock-1000-4000"][i] flux_gal = util.mag_to_flux(mag_gal) ratio = flux_agn / (flux_gal + flux_agn) if ratio > ratio_max: logger.info( "AGN 1000-4000 angstrom >90% of the total... Incrementing E(B-V) by 0.10..." + f" {self['ID'][i]:6d}" + f" {np.log10(flux_agn):.2f}" + f" {np.log10(flux_gal):.2f}" + f" {ratio:.2f}" + f" {self['E_BV'][i]:.2f}" + f" {(self['E_BV'][i] + 0.10):.2f}" ) self["E_BV"][i] += 0.10 continue break # Write the file if requested if self.save_sed: filename = self.dirname + f"/seds/agn-seds-{self.catalog[i]['ID']}.fits" lam, flux = util.luminosity_to_flux( agn_sed.wavlen.value, agn_sed.lum.value, redshift=self["Z"][i], distance_in_cm=self["D"][i] * u.Mpc.to(u.cm), ) sed.write(filename, lam, flux) # NOTE: optimization: save values from the sed for future use for b, r in product(map(str.strip, self.catalog_galaxy.bands), [False, True]): FLUXES[i, b, r] = self._get_flux_single(agn_sed, b, r, self["Z"][i], self["D"][i]) return agn_sed
[docs] def _get_flux_single(self, my_sed, band, rest_frame, redshift, distance): """ Return a single bandbass flux by integrating the SED. Take a rest-frame intrinsic SED and integrate it along a filter passband. The SED is shifted to either rest-frame (10pc) or observer frame in accordance with the parameters. Parameters ---------- my_sed: QSOGEN SED object Input QSOGEN SED. band: str EGG-style passband name. rest_frame: bool Is rest_frame (else observer frame). redshift: float Redshift of the SED distance: float Distance at which to evaluate the flux. """ # Get the flux in observed frame or rest frame if rest_frame: redshift = 0 distance = 1e-5 # NOTE: 10pc in Mpc lam, flux = util.luminosity_to_flux( my_sed.wavlen.value, my_sed.lum.value, redshift=redshift, distance_in_cm=distance * u.Mpc.to(u.cm), ) # Get the flux for the requested band flux_band = sed.get_flux_band(lam, flux, band, filter_db=self.filter_db) # NOTE: convert to magnitudes for the rest_frame if rest_frame: flux_band = util.flux_to_mag(flux_band) return flux_band
[docs] def get_flux_agn(self, band, rest_frame, idxs=None): """ Return vector of AGN pass-band fluxes. Parameters ---------- band: str Band to estimate the flux in e.g. 'lsst-r'. rest_frame: bool Return rest-frame absolute magnitude instead of observed flux. idxs: List[int] or None If defined, return a vector containing the fluxes for the given indices. Otherwise returns the fluxes for the full catalog including zero fluxes for non-AGN galaxies. Returns ------- flux_or_magabs: float Estimated AGN flux(es) or absolute magnitude(s) based on the given flags. """ flux = np.full(self.catalog.size, np.inf if rest_frame else 0.0) select_idx = None if idxs is None: idxs = self["ID"][self["is_agn"]] else: select_idx = np.isin(np.arange(flux.size), idxs) for idx in np.atleast_1d(idxs): if (idx, band, rest_frame) not in FLUXES: _ = self._get_sed(idx) flux[idx] = FLUXES[idx, band, rest_frame] if idxs is not None: flux = flux[select_idx] return np.squeeze(flux)
[docs] def get_log_L_bol(self): """Return log10 of the AGN bolometric luminosity in erg/s.""" # NOTE: this function initializes the SEDs... log_L_bol = np.full(self.catalog.size, -np.inf) for i in self["ID"][self["is_agn"]]: my_sed = self._get_sed(i) log_L_bol[i] = np.log10(my_sed.Lbol) return log_L_bol
[docs] def get_MBH( self, sigma_total=0.50, offset=0.00, sigma_intrinsic=False, sigma_observational=None, select=None, *args, **kwargs, ): """ Return log10 of the supermassive black hole mass in Msun. The MBH is assigned in accordance with the continuity equation after which log-normal scatter is added to the final value of the MBH. """ # Get MBH log_mbh = get_log_mbh_continuity_new2(self["M"], self["Z"], *args, **kwargs) # Add the scatter sigma_tot = np.random.normal(scale=sigma_total, size=log_mbh.size) # NOTE: add the offset? return log_mbh + sigma_tot + offset
[docs] def get_log_L_AGN_5_GHz(self, scatter=True): """Return AGN radio luminosity according to the fundamental plane Gultekin+19.""" # Eq. 5 # mu = mu0 + xi_mu_R * R + xi_mu_X * X # mu = log10(M / 1e8) # R = log10(L_R / 1e38) # X = log10(L_X / 1e40) # NOTE: check gultekin sec 4.7 eq. 19 N = self.catalog.size X = self["log_LX_2_10"] - 40 mu = self["MBH"] - 8 if scatter: A = np.random.normal(-0.62, 0.16, N) B = np.random.normal(0.70, 0.085, N) # C = np.random.normal(0.74, 0.06, N) else: A = np.random.normal(-0.62, 0.0, N) B = np.random.normal(0.70, 0.0, N) # C = np.random.normal(0.74, 0.0, N) R = A + B * X + mu return R + 38
[docs] def get_log_lambda_Edd(self, log_l_edd=LOG_LAMBDA_EDD): """Get the accretion ratio with respect to Eddington.""" return self["log_L_bol"] - log_l_edd - self["MBH"]
@staticmethod
[docs] def get_magnitude_agn_dereddened(key, mag, ebv): """ Return a band-dependent dereddened optical/UV magnitude. If a different key is given, simply return the value corresponding to the key. Parameters ---------- key: str band e.g. "mock-4400" for the B-band mag: float or array_like vector of magnitudes to be dereddened ebv: float or array_like vector of E(B-V) values for the dereddening """ # NOTE: discussion with Ivano on 20250227 # updated with 12.6236 for UV from Ivano 20250303 a = 0.0 if ("johnson-B" in key) or ("mock-4400" in key): logger.info(f"Dereddening in the B-band ({key})") a = 4.0 elif "mock-1450" in key: logger.info(f"Dereddening in the UV-band ({key})") a = 12.6236 return mag - a * ebv
[docs] def get_occupation_fraction(self): """ Return black hole occupation fraction. The black hole occupation fraction corresponds to the probability that a host galaxy with mass Mstar hosts a (super)massive black hole. This could especially prevalent in the dwarf galaxy (<1e9 Msun) regime. The black hole occupation fraction applied here is based on Zou+2025 (accepted). """ from lsst_inaf_agile.mbh import get_occupation_fraction return get_occupation_fraction(self["M"])
[docs] def get_has_bh(self): """ Return whether the galaxy contains a SMBH according to the occupation fraction. """ U = np.random.rand(self["M"].size) return self["occupation_fraction"] > U
[docs] def get_lightcurve(self, i, band, *args, **kwargs): """ Return an AGN lightcurve. The lightcurve for an AGN is returned for the given band and MJD. Parameters ---------- i: int ID of AGN for which SED is returned. band: str EGG-style passband name. """ # Short-circuit for non-AGN is_agn = self["is_agn"][i] if not is_agn: return None filename = f"{self.dirname}/lightcurves/agn/{i}/{band}.npy" if os.path.exists(filename): logger.debug(f"Reading AGN lightcurve {filename}") return np.load(filename) mjd = util.get_mjd_vec() kwargs.update( { "mjd0": 0.0, "mjd": mjd, "band": band, "flux": self[f"{band}_point"][i], "z": self["Z"][i], "mag_i": self["magabs_lsst-i_point"][i], "logMbh": self["MBH"][i], "type2": self["is_optical_type2"][i], "T": mjd.max() + 1, "deltatc": 1, "seed": self._get_seed("get_lightcurve") + i, } ) logger.info(f"Estimating AGN lightcurve {i} {band}") lc = lightcurve.get_lightcurve_agn(*args, **kwargs) logger.info(f"Writing AGN lightcurve {filename}") util.create_directory(filename) np.save(filename, lc) return lc
[docs] def get_lightcurve_mjd(self, i, band, mjd, mjd0): """ Return an AGN lightcurve at given MJD. The lightcurve is first estimated assuming the timespan starting from 0. The lightcurve is then evaluated at MJD - MJD0 to return the final value of the lightcurve. Parameters ---------- i: int ID of AGN for which SED is returned. band: str EGG-style passband name. mjd: float or array_like MJD of observation. mjd0: float MJD offset corresponding to the first light of the survey. Returns ------- The value of the lightcurve at given MJD in microjanskies. Raises ------ ValueError If mjd + mjd0 is not within [0, ten years]. """ is_in_range = np.atleast_1d((mjd - mjd0 >= 0) & (mjd - mjd0 < 10 * 365.25)) if not np.all(is_in_range): raise ValueError("mjd + mjd0 must be within [0, 10 year]") lc = self.get_lightcurve(i, band) if lc is None: return None return np.interp(mjd - mjd0, util.get_mjd_vec(), lc)