#!/usr/bin/env python3
# Author: Akke Viitanen
# Email: akke.viitanen@helsinki.fi
# Date: 2025-07-03 19:47:54
"""Implement CatalogStar class."""
import logging
import os
import numpy as np
import pyvo as vo
from astropy import constants
from astropy import units as u
from . import lightcurve, util
[docs]
logger = logging.getLogger()
[docs]
class CatalogStar:
"""
CatalogStar class.
Catalog star correspond to the stellar catalog (either star OR binary).
Refer to dal Tio+ for the details.
"""
def __init__(self, dirname, catalog_galaxy, is_binary):
"""Init CatalogStar.
Parameters
----------
dirname: str
directory name in which to store the catalogs
catalog_galaxy: CatalogGalaxy
underlying galaxy catalog (used for determining the area)
is_binary: bool
True if is binary star catalog.
"""
[docs]
self.catalog_galaxy = catalog_galaxy
[docs]
self.is_binary = is_binary
[docs]
self.filename = dirname + "/stars.fits"
if is_binary:
self.filename = dirname + "/binaries.fits"
[docs]
self.stars = self.get_stars()
[docs]
self.catalog = self.get_catalog()
[docs]
def get_catalog(self):
"""Build the star catalog."""
vals = []
dtype = []
for c1, c2, t in [
("ra", "RA", np.float64),
("dec", "DEC", np.float64),
("mu0", "D", np.float64),
("umag", "lsst-u_point", np.float64),
("gmag", "lsst-g_point", np.float64),
("rmag", "lsst-r_point", np.float64),
("imag", "lsst-i_point", np.float64),
("zmag", "lsst-z_point", np.float64),
("ymag", "lsst-y_point", np.float64),
("pmracosd", "pmracosd", np.float64),
("pmdec", "pmdec", np.float64),
]:
if "mag" in c1:
if self.is_binary:
c1 = "c3_" + c1
value = self.stars[c1]
value = util.mag_to_flux(value)
else:
value = self.stars[c1]
vals.append(value)
dtype.append((c2, t))
self.catalog = np.empty_like(self.stars, dtype=dtype)
for val, (col, _) in zip(vals, dtype, strict=False):
self.catalog[col] = val
return self.catalog
[docs]
def __getitem__(self, k):
"""Return column corresponding to 'k'."""
return self.catalog[k]
[docs]
def get_stars(self, selection="rmag", maglim=28):
"""
Query NOIRlab to retrieve the stellar catalogs from LSST-SIM.
Parameters
----------
selection: str
Column name for selection in terms of magnitude.
maglim: float
Magnitude limit for the 'selection' column.
"""
table = "lsst_sim.simdr2"
if self.is_binary:
table = "lsst_sim.simdr2_binary"
selection = "c3_" + selection
if not os.path.exists(self.filename):
ra, dec = (self.catalog_galaxy[k] for k in ("RA", "DEC"))
ra_min = ra.min()
ra_max = ra.max()
dec_min = dec.min()
dec_max = dec.max()
query = f"""
SELECT * FROM {table}
WHERE
{ra_min} < ra AND ra < {ra_max}
AND {dec_min} < dec AND dec < {dec_max}
"""
logger.info(query)
import warnings
from astropy.units import UnitsWarning
with warnings.catch_warnings():
warnings.simplefilter("ignore", UnitsWarning)
from pyvo.dal.exceptions import DALFormatError
try:
# Run the syncronous job
tap_service = vo.dal.TAPService("https://datalab.noirlab.edu/tap")
tap_results = tap_service.search(query)
table = tap_results.to_table()
table.write(self.filename)
except DALFormatError:
logger.error(
"Could not connect to the TAP service. "
"Check your internet connection. "
"Returning None."
)
return None
table = util.read_table(self.filename)
if maglim is not None:
return table[table[selection] < maglim]
return table
[docs]
def is_cepheid(self, i):
"""Return true if star 'i' is a cepheid star."""
label = self.stars["label"][i]
pmode = self.stars["pmode"][i]
return (label >= 4) & (label <= 6) & (pmode >= 0) & (pmode <= 1)
[docs]
def is_c_rich(self, i=None):
ret = self.stars["c_o"] > 1
if i is None:
return ret
return ret[i]
[docs]
def is_lpv(self, i, orich=True, crich=True):
"""Return true if star 'i' is a long-period variable star."""
label = self.stars["label"][i]
pmode = self.stars["pmode"][i]
select = (label >= 7) & (label <= 8) & (pmode >= 0) & (pmode <= 4)
# Select crich/orich only?
if crich ^ orich:
if orich:
select &= ~self.is_c_rich(i)
if crich:
select &= self.is_c_rich(i)
return select
[docs]
def get_lightcurve_mjd(self, i, band, mjd, mjd0):
"""
Estimate a star lightcurve.
Convenience function for handling all different classes of stars.
"""
args = i, band, mjd, mjd0
# Handle binary stars
if self.is_binary:
lc = self.get_lightcurve_mjd_binary(*args)
return self.catalog[f"{band}_point"][i] if lc is None else lc
# Handle cepheids
if self.is_cepheid(i):
return self.get_lightcurve_mjd_cepheid(*args)
# Handle LPVs
if self.is_lpv(i):
return self.get_lightcurve_mjd_lpv(*args, self.is_c_rich(i))
# Fallback case: no variability
return self.catalog[f"{band}_point"][i]
[docs]
def get_lightcurve_mjd_cepheid(self, i, band, mjd, mjd0):
"""Estimate a cepheid star lightcurve."""
pmode = self.stars["pmode"][i]
period = self.stars[f"period{pmode}"][i]
kwargs = {
"mjd0": mjd0,
"mjd": mjd,
"flux": self.catalog[f"{band}_point"][i],
"band": band,
"z": self.stars["z"][i],
"y": self.stars["y"][i],
"logte": self.stars["logte"][i],
"logl": self.stars["logl"][i],
"mass": self.stars["mass"][i],
"pmode": pmode,
"period": period,
"seed": i + 43,
}
return lightcurve.get_lightcurve_cepheid(**kwargs)
[docs]
def get_lightcurve_mjd_lpv(self, i, band, mjd, mjd0, is_c_rich):
"""Estimate a long-period variable star lightcurve."""
pmode = self.stars["pmode"][i]
period = self.stars[f"period{pmode}"][i]
kwargs = {
"mjd0": mjd0,
"mjd": mjd,
"band": band,
"flux": self.catalog[f"{band}_point"][i],
"pmode": pmode,
"period": period,
"is_c_rich": is_c_rich,
"seed": i + 42,
}
return lightcurve.get_lightcurve_lpv(**kwargs)
[docs]
def get_lightcurve_mjd_binary(self, i, band, mjd, mjd0):
"""Estimate a binary star lightcurve."""
s = self.stars[i]
def get_radius_star(mass, log_gravity_surface):
g = 10**log_gravity_surface * u.cm / u.s**2
m = mass * u.M_sun
return np.sqrt(constants.G * m / g).to(u.R_sun).value
i1, i2 = 1, 2
radius1 = get_radius_star(s[f"c{i1}_mass"], s[f"c{i1}_logg"])
radius2 = get_radius_star(s[f"c{i2}_mass"], s[f"c{i2}_logg"])
if not np.isfinite(radius1 + radius2):
# NOTE: some stars have nan masses? skip them...
return None
if radius1 <= radius2:
i1, i2 = 2, 1
radius1, radius2 = radius2, radius1
assert radius1 >= radius2
flux1 = util.mag_to_flux(s[f"c{i1}_%smag" % band.replace("lsst-", "")])
flux2 = util.mag_to_flux(s[f"c{i2}_%smag" % band.replace("lsst-", "")])
ret = lightcurve.get_lightcurve_binary(
mjd0=mjd0,
mjd=mjd,
flux1=flux1,
radius1=radius1,
flux2=flux2,
radius2=radius2,
p=s["p"] * 365.25, # current orbital period in years (convert to days)
a=s["a"], # current semi-major axis in R_sun
i=s["i"], # inclination of the orbit in degrees
e=s["e"], # eccentricity
t0=s["p"] * 365.25 * 0.25, # NOTE: assume the 1st eclipse occurs at 0.25 * p
t1=s["p"] * 365.25 * 0.75, # NOTE: assume the 2nd eclipse occurs at 0.75 * p
seed=i + 1000,
filename=f"{self.dirname}/lightcurves/binaries/{i}/{band}.npy",
overwrite=False,
)
return ret
@staticmethod
[docs]
def get_star_binary_fbin(catalog_star, catalog_binary, fbin=0.40, nrepeat=4, seed=1206):
"""
Return a binary star catalog according to the description of dal Tio+ and the definition of fbin.
According to dal Tio+ Sec. 3.2:
(https://iopscience.iop.org/article/10.3847/1538-4365/ac7be6/pdf)
``
Therefore, for the moment, we recommend a fbin value of 0.4, as
being both most robust (see Dal Tio et al. 2021) and more
consistent with the way the stellar densities were originally
calibrated in TRILEGAL. As we simulated only one-tenth of expected
binaries, the fbin value of 0.4 can be achieved by randomly
selecting 60% of single stars and by multiplying by 4 times the
number of binary systems present in the same regions.
``
Parameters
----------
star: CatalogStar
Input single CatalogStar
binary: CatalogStar
Input binary CatalogStar
fbin: float
Assumed fbin value. Stars will be downsampled to 1 - fbin of the
original size
nrepeat: int
Number of repeats on binary star catalog.
seed: int
Random number seed.
Returns
-------
star2, binary2: tuple[ArrayLike, ArrayLike]
Down- and upsampled stellar and binary star catalogs.
Examples
--------
>>> from astropy.table import Table
>>> from . import CatalogStar
>>> # Create mock catalogs with sizes (1000, 100)
>>> star = Table({"a": [0.0] * 1000})
>>> binary = Table({"a": [0.0] * 100, "b": [0.0] * 100})
>>> star2, binary2 = CatalogStar.get_star_binary_fbin(star, binary)
>>> len(star2), len(binary2)
(617, 400)
>>> # Small catalog will warn about insufficient statistics but succeeds
>>> star = Table({"a": [0.0] * 5})
>>> binary = Table({"a": [0.0] * 2, "b": [0.0] * 2})
>>> star2, binary2 = CatalogStar.get_star_binary_fbin(star, binary)
>>> len(star2), len(binary2)
(2, 8)
"""
# Set the seed
np.random.seed(seed)
# Downsample the stellar catalog
is_star = np.random.rand(len(catalog_star.stars)) < 1 - fbin
# Copy the binary catalog nrepeat times
binary2 = np.repeat(catalog_binary.stars, nrepeat)
# Copy over ra/dec/etc from the REMAINING stellar catalog
# NOTE: in some small catalog cases the number of remaining stars is not
# enough to sample for the binary catalog. In these cases, set replace=True
is_not_enough = len(binary2) > (~is_star).sum()
if is_not_enough:
logger.warning(
"Small stellar catalog. " "Can not sample binary stars sufficiently. " "Will use replace=True"
)
star2 = np.random.choice(catalog_star.stars[~is_star], size=len(binary2), replace=is_not_enough)
# Copy over the relevant columns
for column in catalog_binary.stars.dtype.names:
if column in catalog_star.stars.dtype.names:
binary2[column] = star2[column]
# Update the two catalogs
catalog_star.stars = catalog_star.stars[is_star]
catalog_binary.stars = binary2
catalog_star.catalog = catalog_star.get_catalog()
catalog_binary.catalog = catalog_binary.get_catalog()
assert len(catalog_star.stars) == len(catalog_star.catalog)
assert len(catalog_binary.stars) == len(catalog_binary.catalog)
return catalog_star, catalog_binary