#!/usr/bin/env python3
# Author: Akke Viitanen
# Email: akke.viitanen@helsinki.fi
# Date: 2024-06-14
"""Lightcurve module for the different objects."""
import logging
import os
import sys
import astropy.units as u
import numpy as np
from AGN_lightcurves_simulations.AGN_sim.DRW_sim import LC2
from lsst_inaf_agile import util
from lsst_inaf_agile.my_lsst import filter_lam_fwhm
[docs]
logger = logging.getLogger(__name__)
[docs]
def get_lightcurve_agn(
mjd0,
mjd,
flux,
band,
z,
mag_i,
logMbh,
type2,
T=3653,
deltatc=1,
seed=None,
meanmag=None,
lambda_rest=None,
):
"""Return an AGN light-curve in flux units."""
# Initialize the time vector
tt = np.arange(0, T, deltatc)
assert np.all((tt.min() <= mjd - mjd0) * (mjd - mjd0 <= tt.max()))
# Figure out the rest-frame wavelength for the filter
if lambda_rest is None:
lambda_of = filter_lam_fwhm[band.replace("lsst-", "")][0] * u.um.to(u.angstrom)
lambda_rest = lambda_of / (1 + z)
# Set the seed
np.random.seed(seed)
# Get the lightcurve
tt, yy, tau, sf_inf, meanmag, sigma = LC2(
T=T,
deltatc=deltatc,
z=z,
mag_i=mag_i,
logMbh=logMbh,
lambda_rest=lambda_rest,
oscillations=False,
A=0.14,
noise=0.00005,
frame="observed",
type2=type2,
meanmag=meanmag,
)
# Convert from magnitudes to fluxes
yy = util.mag_to_flux(yy)
# Normalize to the 10yr mean. The equation for the normalization is
# flux_final(t) = flux_lc(t) / mean(flux_lc) * flux_source
if len(yy.shape) > 1:
yy /= np.mean(yy, axis=1)[:, None]
yy *= flux[:, None]
return np.array([np.interp(mjd - mjd0, tt, _yy) for _yy in yy])
else:
# normalize the lightcurve to unity flux
yy /= np.mean(yy)
# normalizes the lightcurve to the flux of the source
yy *= flux
# plt.plot(util.flux_to_mag(yy), label="after normalization")
return np.interp(mjd - mjd0, tt, yy)
[docs]
def get_lightcurve_cepheid(mjd0, mjd, flux, band, z, y, logte, logl, mass, pmode, period, seed=None):
"""Return a cepheid light-curve."""
from CEP_lsst_quasar_project.get_lightcurve import get_lightcurve_normalized_cepheid
source = {
"z": z,
"y": y,
"logte": logte,
"logl": logl,
"mass": mass,
"pmode": pmode,
f"period{pmode}": period,
f"{band[-1]}mag": util.flux_to_mag(flux),
}
time, flux = get_lightcurve_normalized_cepheid(source, band)
# NOTE: add a random phase to the lightcurve
np.random.seed(seed)
time += np.random.uniform(0, period)
time -= time.min()
ret = np.interp((mjd - mjd0) % period, time, flux)
return ret
[docs]
def get_lightcurve_lpv(mjd0, mjd, flux, band, pmode, period, is_c_rich, seed=None):
"""Return an LPV light-curve."""
# Initialize the return variable for easy short-circuiting
ret = np.full_like(mjd, flux)
# NOTE: Refer to Michele email
if period > 700:
return ret
def get_amplitude_lpv():
lam = filter_lam_fwhm[band[-1]][0]
t = np.loadtxt("data/lpv/table1.dat")
# R = amplitude_band / amplitude_iband
std_iband = np.power(10, 1.5 * np.log10(period) - 4)
idx_r = 3 if is_c_rich else 1
R = np.interp(lam, t[:, 0], t[:, idx_r])
return R * std_iband
# NOTE:
# Implement a sine curve with
# 1) period given by period + random phase
# 2) amplitude given by the std (see Michele email)
# 3) normalization given by the magnitude of the star
#
# lc(phi) = A * np.sin(phi) + B,
#
# where phi = (mjd - mjd0) / period * 2 * np.pi
np.random.seed(seed)
tt = np.arange(0, period, 0.10) + np.random.uniform(0, period)
A = get_amplitude_lpv()
B = util.flux_to_mag(flux)
phi = (2 * np.pi * tt / period) % (2 * np.pi)
yy = A * np.sin(phi) + B
yy = util.mag_to_flux(yy)
return np.interp(mjd - mjd0, tt, yy, period=tt.max())
try:
import batman
except ImportError:
print(
"Warning, could not import 'batman'. Estimating binary lightcurves will not be possible.",
file=sys.stderr,
)
[docs]
def get_lightcurve_binary(
mjd0,
mjd,
flux1,
flux2,
radius1,
radius2,
p,
a,
i,
e,
t0,
t1,
seed=None,
filename=None,
overwrite=False,
return_full=False,
):
"""
Return the primary and secondary lightcurves for the binary system.
Parameters
----------
mjd0:
reference mjd
mjd:
mjd of the observation
flux1:
flux of the primary star
flux2:
flux of the secondary star
radius1:
radius of the primary star in Rsun
radius2:
radius of the secondary star in Rsun
p:
period of the system in units of mjd
a:
orbital semi-major axis of the system in Rsun
i:
orbital inclination of the system in degrees e.g. 90 = edge-on
e:
orbital eccentricity of the system
t0:
time of the primary transit in units of mjd e.g. period * 0.25
t1:
time of the secondary in units of mjd e.g. period * 0.75
seed:
random number seed to offset the phase curve
filename:
write the lightcurve to an output file
overwrite: bool
overwrite the potentially existing lightcurve on the disk
return_full: bool
return the full lightcurve instead of the point estimate
Returns
-------
lc: float
Combined lightucrve of the primary and secondary in the same units as
flux1 and flux2.
"""
# NOTE: short-circuit on existing filename optimization
if filename is not None and os.path.exists(filename) and not overwrite:
if filename not in LCB:
LCB[filename] = np.load(filename).T
t, lc = LCB[filename]
if return_full:
return t, lc
return np.interp((mjd - mjd0) % p + t.min(), t, lc)
# NOTE: there is some weird feature in batman with radius1 == radius2 which
# is prevented by adding a very minor offset to the secondary radius
assert radius1 >= radius2
if np.isclose(radius1, radius2):
radius2 -= 1e-9
# NOTE: longitude of periapsis is not given by dal Tio+. We randomize this
# direction as per Alessandro Mazzi email 20240905.
# Definition is here: https://en.wikipedia.org/wiki/Longitude_of_periapsis
np.random.seed(seed)
w = np.random.uniform(0, np.pi / 2) * 180
def get_param(radius1, radius2, t0):
param = batman.TransitParams() # object to store transit parameters
param.t0 = t0 # time of inferior conjunction
param.a = a / radius1 # semi-major axis (in units of stellar radii)
param.rp = radius2 / radius1 # planet radius (in units of stellar radii)
param.per = p # orbital period in days
param.inc = i # orbital inclination (in degrees)
param.ecc = e # eccentricity
param.w = w # longitude of periastron (in degrees)
param.limb_dark = "quadratic" # limb darkening model
param.u = [0.10, 0.30] # limb darkening coefficients [u1, u2, u3, u4]
# param.fp = flux2 / flux1 # planet to star flux ratio
# param.t_secondary = 0.50 * p * 365.25 # central eclipse time
return param
# NOTE: before simulating the lightcurve, do a test on the depth
param1 = get_param(radius1, radius2, t0)
param2 = get_param(radius2, radius1, t1)
t = np.array([t0, t1])
for param in param1, param2:
model = batman.TransitModel(param, t, transittype="primary")
test = model.light_curve(param)
if np.allclose(test, 1.0):
return None
# NOTE: debugging
if True:
print("", radius1, flux1)
print("", radius2, flux2)
print("", param.t0, "time of inferior conjunction")
print("", param.per, "orbital period in days")
print("", param.rp, "planet radius (in units of stellar radii)")
print("", param.a, "semi-major axis (in units of stellar radii)")
print("", param.inc, "orbital inclination (in degrees)")
print("", param.ecc, "eccentricity")
print("", param.w, "longitude of periastron (in degrees)")
print("", param.limb_dark, "limb darkening model")
print("", param.u, "limb darkening coefficients [u1, u2, u3, u4]")
###############################################################################
# Get the time coordinate for the lightcurve.
#
# Use the dt = 30s exposure time to find the start and end times t1, t2 of
# the eclipse. Then, simulate the eclipse with this accuracy, while the reminder
# of the lightcurve is just a constant value.
ts = []
dt = 15 / 3600.0 / 24.0
for param in param1, param2:
t = np.array([param.t0 - dt, param.t0 + dt])
while True:
model = batman.TransitModel(param, t, transittype="primary")
test = model.light_curve(param)
if np.allclose(test, 1.0):
break
t[0] -= dt
t[1] += dt
ts.append(t)
(t_start_1, t_stop_1), (t_start_2, t_stop_2) = ts
t = np.array(
[0.0]
+ list(np.arange(t_start_1, t_stop_1, dt))
+ list(np.arange(t_start_2, t_stop_2, dt))
+ [param1.per]
)
# Simulate the final lightcurve
def get_lightcurve(param, flux):
# NOTE: the model gives the relative flux of the primary
model = batman.TransitModel(param, t, transittype="primary")
return flux * model.light_curve(param)
lc1 = get_lightcurve(param1, flux1)
lc2 = get_lightcurve(param2, flux2)
lc = lc1 + lc2
# NOTE: offset the lightcurve by a random number
t += np.random.uniform(0, p)
# Save to file?
if filename is not None:
util.create_directory(filename)
np.save(filename, np.vstack([t, lc]).T)
logger.info("Saved", filename)
if return_full:
return t, lc
# Interpolate the lightcurves
# TODO: average over 30s?
return np.interp((mjd - mjd0) % p + t.min(), t, lc)