#!/usr/bin/env python3
# Author: Akke Viitanen
# Email: akke.viitanen@helsinki.fi
# Date: 2025-07-04 20:39:49
"""Image simulation tools."""
import logging
import os
import sqlite3
import textwrap
import pandas as pd
from . import util
[docs]
logger = logging.getLogger(__name__)
[docs]
class ImageSimulator:
"""
ImageSimulator class.
ImageSimulator interfaces with CatalogCombined in order to write instance
catalogs and produce simulated images starting from the truth catalog. The
class may further wrap around external image simulation software like
galsim.
"""
def __init__(self, dirname, catalog, filename_baseline):
"""
Initialize ImageSimulator.
Parameters
----------
dirnane: str
directory name to write output products to
catalog: CatalogCombined
underlying combined catalog
filename_baseline: str
filename of the baseline sqlite3 database to read input values from
"""
[docs]
self.filename_baseline = filename_baseline
with sqlite3.connect(filename_baseline) as con:
self.mjd0 = pd.read_sql_query("SELECT * FROM observations LIMIT 1", con).iloc[0][
"observationStartMJD"
]
logger.info(f"First MJD is {self.mjd0=}")
[docs]
def get_visit(self, observation_id=None, limit=None):
"""Get the visit corresponding to the observation_id."""
# Build the query
query = "SELECT * FROM observations"
if observation_id is not None:
query += f" WHERE observationId = {observation_id}"
if limit is not None:
query += f" LIMIT {limit}"
# Query the database
with sqlite3.connect(self.filename_baseline) as conn:
df = pd.read_sql_query(query, conn)
if observation_id is not None:
return df.iloc[0]
return df
[docs]
def write_instance_catalog(
self,
observation_id: int,
exptime: str = "38 29.2 29.2 29.2 29.2 29.2",
ra_dec=None, # override ra_dec?
) -> list[str]:
"""
Write an instance catalog to a file.
Returns
-------
filenames: list[str]
Filenames that were written as output.
"""
# Get the header and the catalog
header = self.get_header(observation_id, ra_dec)
b = "ugrizy"[header["filter"]]
band = f"lsst-{b}"
catalog = self.get_catalog(band, header["mjd"])
# record the output filenames
filenames = []
# Write the header and the catalog
filename_instance_catalog = f"{self.dirname}/{observation_id}/instance_catalog.txt"
util.create_directory(filename_instance_catalog)
filenames.append(filename_instance_catalog)
logger.info(f"Writing {filename_instance_catalog}")
with open(filename_instance_catalog, "w") as f:
for k, v in header.items():
print(f"{k} {v}", file=f)
for line in catalog:
print(line, file=f)
# Write the imsim yaml
for detector in range(1, 190):
dirname = f"{self.dirname}/{observation_id}"
texp = float(exptime.split()[header["filter"]])
imsim_yaml = textwrap.dedent(
f"""\
modules:
- imsim
template: imsim-config-instcat
input.instance_catalog.file_name: {filename_instance_catalog}
input.instance_catalog.sort_mag: False
input.checkpoint.dir: {dirname}/checkpoint
input.tree_rings: ""
image.sensor: ""
output.dir: {dirname}/output
output.det_num.first: {detector}
output.nfiles: 1
output.nproc: 1
output.timeout: 36000
eval_variables.fexptime: {texp}
eval_variables.sband: '{b}'\
"""
)
filename_yaml = f"{dirname}/imsim-user-instcat_{detector}.yaml"
logger.debug(f"Writing {filename_yaml}")
with open(filename_yaml, "w") as f:
print(imsim_yaml, file=f)
filenames.append(filename_yaml)
print(f"Wrote {len(filenames)} under {dirname}")
return filenames
[docs]
def get_catalog(self, band, mjd):
"""
Return the instance catalog corresponding to band and mjd.
Parameters
----------
band: str
flux band detection e.g. 'lsst-g'
mjd: int
MJD of observation
"""
is_agn = self.catalog["is_agn"]
is_galaxy = self.catalog.get_is_galaxy()
is_star = self.catalog.get_is_star()
n_galaxy, n_star, n_binary = self.catalog.get_number_galaxy_star_binary()
assert is_galaxy.sum() == n_galaxy
assert is_star.sum() == n_star + n_binary, (is_star.sum(), n_star, n_binary)
uid = self.catalog["ID"]
ra = self.catalog["RA"]
dec = self.catalog["DEC"]
mag_bulge = util.flux_to_mag(self.catalog[f"{band}_bulge"])
mag_disk = util.flux_to_mag(self.catalog[f"{band}_disk"])
mag_point = util.flux_to_mag(self.catalog[f"{band}_point"])
angle_bulge = 90 - self.catalog["BULGE_ANGLE"]
angle_disk = 90 - self.catalog["DISK_ANGLE"]
d_bulge1, d_bulge2 = util.get_galaxy_ab(self.catalog["BULGE_RADIUS"], self.catalog["BULGE_RATIO"])
d_disk1, d_disk2 = util.get_galaxy_ab(self.catalog["DISK_RADIUS"], self.catalog["DISK_RATIO"])
catalog = []
for i in range(len(self.catalog.catalog_combined)):
# Handle star
if is_star[i]:
# NOTE: dirty
idx = self.catalog.get_index_star(i)
if n_galaxy <= i < n_galaxy + n_star:
# Handle single star
lc = self.catalog.catalog_star.get_lightcurve_mjd(idx, band, mjd, self.mjd0)
else:
# Handle binary star
lc = self.catalog.catalog_binary.get_lightcurve_mjd(idx, band, mjd, self.mjd0)
mag_point[i] = util.flux_to_mag(lc)
catalog += [
f"object {uid[i]} {ra[i]} {dec[i]} {mag_point[i]} "
f"Constnu.Template.spec.gz 0 0 0 0 0 0 point none CCM 0.0635117705 3.1"
]
# Handle galaxy
if is_galaxy[i]:
catalog += [
f"object {uid[i]} {ra[i]} {dec[i]} {mag_bulge[i]} "
f"Constnu.Template.spec.gz 0 0 0 0 0 0 sersic2D "
f"{d_bulge1[i]} {d_bulge2[i]} {angle_bulge[i]} 4 none CCM 0.0635117705 3.1"
]
catalog += [
f"object {uid[i]} {ra[i]} {dec[i]} {mag_disk[i]} "
f"Constnu.Template.spec.gz 0 0 0 0 0 0 sersic2D "
f"{d_disk1[i]} {d_disk2[i]} {angle_disk[i]} 1 none CCM 0.0635117705 3.1"
]
# Handle AGN
if is_agn[i]:
# NOTE: estimate the AGN lightcurve
mag_point[i] = util.flux_to_mag(
self.catalog.catalog_agn.get_lightcurve_mjd(i, band, mjd, self.mjd0)
)
catalog += [
f"object {uid[i]} {ra[i]} {dec[i]} {mag_point[i]} "
f"Constnu.Template.spec.gz 0 0 0 0 0 0 point none CCM 0.0635117705 3.1"
]
return catalog
[docs]
def simulate_image(self, observation_id, detector, logfile=None):
"""
Simulate a single image.
Runs 'galsim' on a single yaml configuration file to simulate a single
image.
Parameters
----------
observation_id: int
observation ID to be simulated from the baseline
detector: int
which of the 189 detectors to simulate
logfile: str
name of the logfile. Default is /dev/stdout. A special value of "auto"
uses the .yaml file converted to a .log extension.
Returns
-------
exit_code:
exit code from running imSim
"""
dirname = f"{self.dirname}/{observation_id}"
filename_yaml = f"{dirname}/imsim-user-instcat_{detector}.yaml"
if logfile is None:
logfile = "/dev/stdout"
elif logfile == "auto":
logfile = filename_yaml.replace(".yaml", ".log")
logger.info(f"Simulating image {observation_id=} {detector=} {filename_yaml=} {logfile=}")
cmd = f"galsim {filename_yaml} > {logfile} 2>&1"
logger.info(f"Running '{cmd}'")
exit_code = os.system(cmd)
return exit_code