Source code for lsst_inaf_agile.image_simulator

#!/usr/bin/env python3
# Author: Akke Viitanen
# Email: akke.viitanen@helsinki.fi
# Date: 2025-07-04 20:39:49

"""Image simulation tools."""

import logging
import os
import sqlite3
import textwrap

import pandas as pd

from . import util

[docs] logger = logging.getLogger(__name__)
[docs] class ImageSimulator: """ ImageSimulator class. ImageSimulator interfaces with CatalogCombined in order to write instance catalogs and produce simulated images starting from the truth catalog. The class may further wrap around external image simulation software like galsim. """ def __init__(self, dirname, catalog, filename_baseline): """ Initialize ImageSimulator. Parameters ---------- dirnane: str directory name to write output products to catalog: CatalogCombined underlying combined catalog filename_baseline: str filename of the baseline sqlite3 database to read input values from """
[docs] self.dirname = dirname
[docs] self.catalog = catalog
[docs] self.filename_baseline = filename_baseline
with sqlite3.connect(filename_baseline) as con: self.mjd0 = pd.read_sql_query("SELECT * FROM observations LIMIT 1", con).iloc[0][ "observationStartMJD" ] logger.info(f"First MJD is {self.mjd0=}")
[docs] def get_visit(self, observation_id=None, limit=None): """Get the visit corresponding to the observation_id.""" # Build the query query = "SELECT * FROM observations" if observation_id is not None: query += f" WHERE observationId = {observation_id}" if limit is not None: query += f" LIMIT {limit}" # Query the database with sqlite3.connect(self.filename_baseline) as conn: df = pd.read_sql_query(query, conn) if observation_id is not None: return df.iloc[0] return df
[docs] def get_header(self, observation_id, ra_dec=None): """Return the instance catalog header as a dictionary.""" visit = self.get_visit(observation_id) if ra_dec is None: ra_dec = visit["fieldRA"], visit["fieldDec"] ret = dict( mjd=visit["observationStartMJD"], rightascension=ra_dec[0], declination=ra_dec[1], altitude=visit["altitude"], azimuth=visit["azimuth"], filter="ugrizy".index(visit["filter"]), rotskypos=visit["rotSkyPos"], dist2moon=visit["moonDistance"], moonalt=visit["moonAlt"], moondec=visit["moonDec"], moonphase=visit["moonPhase"], moonra=visit["moonRA"], nsnap=1, obshistid=0, rottelpos=visit["rotTelPos"], seed=observation_id, seeing=visit["seeingFwhmEff"], sunalt=visit["sunAlt"], vistime=visit["visitExposureTime"], numexp=0, seqnum=observation_id % 32768, ) logger.debug(ret) return ret
[docs] def write_instance_catalog( self, observation_id: int, exptime: str = "38 29.2 29.2 29.2 29.2 29.2", ra_dec=None, # override ra_dec? ) -> list[str]: """ Write an instance catalog to a file. Returns ------- filenames: list[str] Filenames that were written as output. """ # Get the header and the catalog header = self.get_header(observation_id, ra_dec) b = "ugrizy"[header["filter"]] band = f"lsst-{b}" catalog = self.get_catalog(band, header["mjd"]) # record the output filenames filenames = [] # Write the header and the catalog filename_instance_catalog = f"{self.dirname}/{observation_id}/instance_catalog.txt" util.create_directory(filename_instance_catalog) filenames.append(filename_instance_catalog) logger.info(f"Writing {filename_instance_catalog}") with open(filename_instance_catalog, "w") as f: for k, v in header.items(): print(f"{k} {v}", file=f) for line in catalog: print(line, file=f) # Write the imsim yaml for detector in range(1, 190): dirname = f"{self.dirname}/{observation_id}" texp = float(exptime.split()[header["filter"]]) imsim_yaml = textwrap.dedent( f"""\ modules: - imsim template: imsim-config-instcat input.instance_catalog.file_name: {filename_instance_catalog} input.instance_catalog.sort_mag: False input.checkpoint.dir: {dirname}/checkpoint input.tree_rings: "" image.sensor: "" output.dir: {dirname}/output output.det_num.first: {detector} output.nfiles: 1 output.nproc: 1 output.timeout: 36000 eval_variables.fexptime: {texp} eval_variables.sband: '{b}'\ """ ) filename_yaml = f"{dirname}/imsim-user-instcat_{detector}.yaml" logger.debug(f"Writing {filename_yaml}") with open(filename_yaml, "w") as f: print(imsim_yaml, file=f) filenames.append(filename_yaml) print(f"Wrote {len(filenames)} under {dirname}") return filenames
[docs] def get_catalog(self, band, mjd): """ Return the instance catalog corresponding to band and mjd. Parameters ---------- band: str flux band detection e.g. 'lsst-g' mjd: int MJD of observation """ is_agn = self.catalog["is_agn"] is_galaxy = self.catalog.get_is_galaxy() is_star = self.catalog.get_is_star() n_galaxy, n_star, n_binary = self.catalog.get_number_galaxy_star_binary() assert is_galaxy.sum() == n_galaxy assert is_star.sum() == n_star + n_binary, (is_star.sum(), n_star, n_binary) uid = self.catalog["ID"] ra = self.catalog["RA"] dec = self.catalog["DEC"] mag_bulge = util.flux_to_mag(self.catalog[f"{band}_bulge"]) mag_disk = util.flux_to_mag(self.catalog[f"{band}_disk"]) mag_point = util.flux_to_mag(self.catalog[f"{band}_point"]) angle_bulge = 90 - self.catalog["BULGE_ANGLE"] angle_disk = 90 - self.catalog["DISK_ANGLE"] d_bulge1, d_bulge2 = util.get_galaxy_ab(self.catalog["BULGE_RADIUS"], self.catalog["BULGE_RATIO"]) d_disk1, d_disk2 = util.get_galaxy_ab(self.catalog["DISK_RADIUS"], self.catalog["DISK_RATIO"]) catalog = [] for i in range(len(self.catalog.catalog_combined)): # Handle star if is_star[i]: # NOTE: dirty idx = self.catalog.get_index_star(i) if n_galaxy <= i < n_galaxy + n_star: # Handle single star lc = self.catalog.catalog_star.get_lightcurve_mjd(idx, band, mjd, self.mjd0) else: # Handle binary star lc = self.catalog.catalog_binary.get_lightcurve_mjd(idx, band, mjd, self.mjd0) mag_point[i] = util.flux_to_mag(lc) catalog += [ f"object {uid[i]} {ra[i]} {dec[i]} {mag_point[i]} " f"Constnu.Template.spec.gz 0 0 0 0 0 0 point none CCM 0.0635117705 3.1" ] # Handle galaxy if is_galaxy[i]: catalog += [ f"object {uid[i]} {ra[i]} {dec[i]} {mag_bulge[i]} " f"Constnu.Template.spec.gz 0 0 0 0 0 0 sersic2D " f"{d_bulge1[i]} {d_bulge2[i]} {angle_bulge[i]} 4 none CCM 0.0635117705 3.1" ] catalog += [ f"object {uid[i]} {ra[i]} {dec[i]} {mag_disk[i]} " f"Constnu.Template.spec.gz 0 0 0 0 0 0 sersic2D " f"{d_disk1[i]} {d_disk2[i]} {angle_disk[i]} 1 none CCM 0.0635117705 3.1" ] # Handle AGN if is_agn[i]: # NOTE: estimate the AGN lightcurve mag_point[i] = util.flux_to_mag( self.catalog.catalog_agn.get_lightcurve_mjd(i, band, mjd, self.mjd0) ) catalog += [ f"object {uid[i]} {ra[i]} {dec[i]} {mag_point[i]} " f"Constnu.Template.spec.gz 0 0 0 0 0 0 point none CCM 0.0635117705 3.1" ] return catalog
[docs] def simulate_image(self, observation_id, detector, logfile=None): """ Simulate a single image. Runs 'galsim' on a single yaml configuration file to simulate a single image. Parameters ---------- observation_id: int observation ID to be simulated from the baseline detector: int which of the 189 detectors to simulate logfile: str name of the logfile. Default is /dev/stdout. A special value of "auto" uses the .yaml file converted to a .log extension. Returns ------- exit_code: exit code from running imSim """ dirname = f"{self.dirname}/{observation_id}" filename_yaml = f"{dirname}/imsim-user-instcat_{detector}.yaml" if logfile is None: logfile = "/dev/stdout" elif logfile == "auto": logfile = filename_yaml.replace(".yaml", ".log") logger.info(f"Simulating image {observation_id=} {detector=} {filename_yaml=} {logfile=}") cmd = f"galsim {filename_yaml} > {logfile} 2>&1" logger.info(f"Running '{cmd}'") exit_code = os.system(cmd) return exit_code