Source code for lsst_inaf_agile.catalog_star

#!/usr/bin/env python3
# Author: Akke Viitanen
# Email: akke.viitanen@helsinki.fi
# Date: 2025-07-03 19:47:54

"""Implement CatalogStar class."""

import logging
import os

import numpy as np
import pyvo as vo
from astropy import constants
from astropy import units as u

from . import lightcurve, util

[docs] logger = logging.getLogger()
[docs] class CatalogStar: """ CatalogStar class. Catalog star correspond to the stellar catalog (either star OR binary). Refer to dal Tio+ for the details. """ def __init__(self, dirname, catalog_galaxy, is_binary): """Init CatalogStar. Parameters ---------- dirname: str directory name in which to store the catalogs catalog_galaxy: CatalogGalaxy underlying galaxy catalog (used for determining the area) is_binary: bool True if is binary star catalog. """
[docs] self.dirname = dirname
[docs] self.catalog_galaxy = catalog_galaxy
[docs] self.is_binary = is_binary
[docs] self.filename = dirname + "/stars.fits"
if is_binary: self.filename = dirname + "/binaries.fits"
[docs] self.stars = self.get_stars()
[docs] self.catalog = self.get_catalog()
[docs] def get_catalog(self): """Build the star catalog.""" vals = [] dtype = [] for c1, c2, t in [ ("ra", "RA", np.float64), ("dec", "DEC", np.float64), ("mu0", "D", np.float64), ("umag", "lsst-u_point", np.float64), ("gmag", "lsst-g_point", np.float64), ("rmag", "lsst-r_point", np.float64), ("imag", "lsst-i_point", np.float64), ("zmag", "lsst-z_point", np.float64), ("ymag", "lsst-y_point", np.float64), ("pmracosd", "pmracosd", np.float64), ("pmdec", "pmdec", np.float64), ]: if "mag" in c1: if self.is_binary: c1 = "c3_" + c1 value = self.stars[c1] value = util.mag_to_flux(value) else: value = self.stars[c1] vals.append(value) dtype.append((c2, t)) self.catalog = np.empty_like(self.stars, dtype=dtype) for val, (col, _) in zip(vals, dtype, strict=False): self.catalog[col] = val return self.catalog
[docs] def __getitem__(self, k): """Return column corresponding to 'k'.""" return self.catalog[k]
[docs] def get_stars(self, selection="rmag", maglim=28): """ Query NOIRlab to retrieve the stellar catalogs from LSST-SIM. Parameters ---------- selection: str Column name for selection in terms of magnitude. maglim: float Magnitude limit for the 'selection' column. """ table = "lsst_sim.simdr2" if self.is_binary: table = "lsst_sim.simdr2_binary" selection = "c3_" + selection if not os.path.exists(self.filename): ra, dec = (self.catalog_galaxy[k] for k in ("RA", "DEC")) ra_min = ra.min() ra_max = ra.max() dec_min = dec.min() dec_max = dec.max() query = f""" SELECT * FROM {table} WHERE {ra_min} < ra AND ra < {ra_max} AND {dec_min} < dec AND dec < {dec_max} """ logger.info(query) import warnings from astropy.units import UnitsWarning with warnings.catch_warnings(): warnings.simplefilter("ignore", UnitsWarning) from pyvo.dal.exceptions import DALFormatError try: # Run the syncronous job tap_service = vo.dal.TAPService("https://datalab.noirlab.edu/tap") tap_results = tap_service.search(query) table = tap_results.to_table() table.write(self.filename) except DALFormatError: logger.error( "Could not connect to the TAP service. " "Check your internet connection. " "Returning None." ) return None table = util.read_table(self.filename) if maglim is not None: return table[table[selection] < maglim] return table
[docs] def is_cepheid(self, i): """Return true if star 'i' is a cepheid star.""" label = self.stars["label"][i] pmode = self.stars["pmode"][i] return (label >= 4) & (label <= 6) & (pmode >= 0) & (pmode <= 1)
[docs] def is_c_rich(self, i=None): ret = self.stars["c_o"] > 1 if i is None: return ret return ret[i]
[docs] def is_lpv(self, i, orich=True, crich=True): """Return true if star 'i' is a long-period variable star.""" label = self.stars["label"][i] pmode = self.stars["pmode"][i] select = (label >= 7) & (label <= 8) & (pmode >= 0) & (pmode <= 4) # Select crich/orich only? if crich ^ orich: if orich: select &= ~self.is_c_rich(i) if crich: select &= self.is_c_rich(i) return select
[docs] def get_lightcurve_mjd(self, i, band, mjd, mjd0): """ Estimate a star lightcurve. Convenience function for handling all different classes of stars. """ args = i, band, mjd, mjd0 # Handle binary stars if self.is_binary: lc = self.get_lightcurve_mjd_binary(*args) return self.catalog[f"{band}_point"][i] if lc is None else lc # Handle cepheids if self.is_cepheid(i): return self.get_lightcurve_mjd_cepheid(*args) # Handle LPVs if self.is_lpv(i): return self.get_lightcurve_mjd_lpv(*args, self.is_c_rich(i)) # Fallback case: no variability return self.catalog[f"{band}_point"][i]
[docs] def get_lightcurve_mjd_cepheid(self, i, band, mjd, mjd0): """Estimate a cepheid star lightcurve.""" pmode = self.stars["pmode"][i] period = self.stars[f"period{pmode}"][i] kwargs = { "mjd0": mjd0, "mjd": mjd, "flux": self.catalog[f"{band}_point"][i], "band": band, "z": self.stars["z"][i], "y": self.stars["y"][i], "logte": self.stars["logte"][i], "logl": self.stars["logl"][i], "mass": self.stars["mass"][i], "pmode": pmode, "period": period, "seed": i + 43, } return lightcurve.get_lightcurve_cepheid(**kwargs)
[docs] def get_lightcurve_mjd_lpv(self, i, band, mjd, mjd0, is_c_rich): """Estimate a long-period variable star lightcurve.""" pmode = self.stars["pmode"][i] period = self.stars[f"period{pmode}"][i] kwargs = { "mjd0": mjd0, "mjd": mjd, "band": band, "flux": self.catalog[f"{band}_point"][i], "pmode": pmode, "period": period, "is_c_rich": is_c_rich, "seed": i + 42, } return lightcurve.get_lightcurve_lpv(**kwargs)
[docs] def get_lightcurve_mjd_binary(self, i, band, mjd, mjd0): """Estimate a binary star lightcurve.""" s = self.stars[i] def get_radius_star(mass, log_gravity_surface): g = 10**log_gravity_surface * u.cm / u.s**2 m = mass * u.M_sun return np.sqrt(constants.G * m / g).to(u.R_sun).value i1, i2 = 1, 2 radius1 = get_radius_star(s[f"c{i1}_mass"], s[f"c{i1}_logg"]) radius2 = get_radius_star(s[f"c{i2}_mass"], s[f"c{i2}_logg"]) if not np.isfinite(radius1 + radius2): # NOTE: some stars have nan masses? skip them... return None if radius1 <= radius2: i1, i2 = 2, 1 radius1, radius2 = radius2, radius1 assert radius1 >= radius2 flux1 = util.mag_to_flux(s[f"c{i1}_%smag" % band.replace("lsst-", "")]) flux2 = util.mag_to_flux(s[f"c{i2}_%smag" % band.replace("lsst-", "")]) ret = lightcurve.get_lightcurve_binary( mjd0=mjd0, mjd=mjd, flux1=flux1, radius1=radius1, flux2=flux2, radius2=radius2, p=s["p"] * 365.25, # current orbital period in years (convert to days) a=s["a"], # current semi-major axis in R_sun i=s["i"], # inclination of the orbit in degrees e=s["e"], # eccentricity t0=s["p"] * 365.25 * 0.25, # NOTE: assume the 1st eclipse occurs at 0.25 * p t1=s["p"] * 365.25 * 0.75, # NOTE: assume the 2nd eclipse occurs at 0.75 * p seed=i + 1000, filename=f"{self.dirname}/lightcurves/binaries/{i}/{band}.npy", overwrite=False, ) return ret
@staticmethod
[docs] def get_star_binary_fbin(catalog_star, catalog_binary, fbin=0.40, nrepeat=4, seed=1206): """ Return a binary star catalog according to the description of dal Tio+ and the definition of fbin. According to dal Tio+ Sec. 3.2: (https://iopscience.iop.org/article/10.3847/1538-4365/ac7be6/pdf) `` Therefore, for the moment, we recommend a fbin value of 0.4, as being both most robust (see Dal Tio et al. 2021) and more consistent with the way the stellar densities were originally calibrated in TRILEGAL. As we simulated only one-tenth of expected binaries, the fbin value of 0.4 can be achieved by randomly selecting 60% of single stars and by multiplying by 4 times the number of binary systems present in the same regions. `` Parameters ---------- star: CatalogStar Input single CatalogStar binary: CatalogStar Input binary CatalogStar fbin: float Assumed fbin value. Stars will be downsampled to 1 - fbin of the original size nrepeat: int Number of repeats on binary star catalog. seed: int Random number seed. Returns ------- star2, binary2: tuple[ArrayLike, ArrayLike] Down- and upsampled stellar and binary star catalogs. Examples -------- >>> from astropy.table import Table >>> from . import CatalogStar >>> # Create mock catalogs with sizes (1000, 100) >>> star = Table({"a": [0.0] * 1000}) >>> binary = Table({"a": [0.0] * 100, "b": [0.0] * 100}) >>> star2, binary2 = CatalogStar.get_star_binary_fbin(star, binary) >>> len(star2), len(binary2) (617, 400) >>> # Small catalog will warn about insufficient statistics but succeeds >>> star = Table({"a": [0.0] * 5}) >>> binary = Table({"a": [0.0] * 2, "b": [0.0] * 2}) >>> star2, binary2 = CatalogStar.get_star_binary_fbin(star, binary) >>> len(star2), len(binary2) (2, 8) """ # Set the seed np.random.seed(seed) # Downsample the stellar catalog is_star = np.random.rand(len(catalog_star.stars)) < 1 - fbin # Copy the binary catalog nrepeat times binary2 = np.repeat(catalog_binary.stars, nrepeat) # Copy over ra/dec/etc from the REMAINING stellar catalog # NOTE: in some small catalog cases the number of remaining stars is not # enough to sample for the binary catalog. In these cases, set replace=True is_not_enough = len(binary2) > (~is_star).sum() if is_not_enough: logger.warning( "Small stellar catalog. " "Can not sample binary stars sufficiently. " "Will use replace=True" ) star2 = np.random.choice(catalog_star.stars[~is_star], size=len(binary2), replace=is_not_enough) # Copy over the relevant columns for column in catalog_binary.stars.dtype.names: if column in catalog_star.stars.dtype.names: binary2[column] = star2[column] # Update the two catalogs catalog_star.stars = catalog_star.stars[is_star] catalog_binary.stars = binary2 catalog_star.catalog = catalog_star.get_catalog() catalog_binary.catalog = catalog_binary.get_catalog() assert len(catalog_star.stars) == len(catalog_star.catalog) assert len(catalog_binary.stars) == len(catalog_binary.catalog) return catalog_star, catalog_binary