Source code for liger_iris_drp_resources.psfs

import os
import re
import zipfile
import gdown
import numpy as np
from astropy.io import fits

from .utils import get_resource_dir, to_little_endian

import logging
logger = logging.getLogger(__name__)

__all__ = [
    'load_liger_psf',
    'load_iris_psf',
    'download_liger_psfs',
    'download_iris_psfs',
    'download_keck_pupil_image',
    'load_keck_pupil_image'
]

############################
#### Keck Analytic PSFs ####
############################

[docs] def download_keck_pupil_image( output_dir: str | None = None, skip_if_exists: bool = True ) -> str: """ Download the Keck pupil FITS file from Google Drive. Parameters ---------- output_dir : str | None Directory where the FITS file should be saved. skip_if_exists : bool If True, skip download when the target FITS file already exists and is non-empty. Returns ------- str Full path to the downloaded Keck pupil FITS file. """ if output_dir is None: output_dir = _get_liger_psf_dir() os.makedirs(output_dir, exist_ok=True) filename = 'keck_pupil.fits' output_path = os.path.join(output_dir, filename) if skip_if_exists and os.path.exists(output_path) and os.path.getsize(output_path) > 0: logger.info(f"Keck pupil file {output_path} already exists. Skipping download.") return output_path url = 'https://drive.google.com/file/d/1qFT_tHktVf69RAeS09qmgOkz_uyJOd9O/view?usp=drive_link' logger.info(f"Downloading Keck pupil to {output_path}...") downloaded_path = gdown.download( url=url, output=output_path, quiet=False, ) if downloaded_path is None or not os.path.exists(downloaded_path) or os.path.getsize(downloaded_path) == 0: msg = "Failed to download Keck pupil FITS file" logger.error(msg) # Clean up any partial file at the intended output location if os.path.exists(output_path): os.remove(output_path) raise RuntimeError(msg) logger.info(f"Successfully downloaded Keck pupil to {downloaded_path}") return downloaded_path
[docs] def load_keck_pupil_image() -> np.ndarray: """ Load the Keck pupil FITS file and return the pupil image as a numpy array. Returns ------- np.ndarray The Keck pupil image. """ pupil_path = os.path.join(_get_liger_psf_dir(), 'keck_pupil.fits') if not os.path.exists(pupil_path): msg = f"Keck pupil FITS file not found at {pupil_path}. Please run download_keck_pupil_image() first." logger.error(msg) raise FileNotFoundError(msg) with fits.open(pupil_path) as hdulist: pupil_image = hdulist[0].data return to_little_endian(pupil_image)
#################### #### Liger PSFs #### #################### def _get_liger_psf_dir() -> str: return os.path.join(get_resource_dir(), 'PSFs/Liger')
[docs] def download_liger_psfs( output_dir: str | None = None, skip_if_exists: bool = True ) -> str: """ Download the LIGER PSFs from the Google Drive. Parameters ---------- output_dir : str | None The directory to save the PSF folder to. skip_if_exists : bool If True, skip downloading if the output directory already exists and is not empty. Default is True. Returns ------- str The directory containing the downloaded PSF files. """ if output_dir is None: output_dir = _get_liger_psf_dir() if skip_if_exists and os.path.exists(output_dir) and any(os.listdir(output_dir)): logger.info(f"PSF directory {output_dir} already exists and is not empty. Skipping download.") return output_dir os.makedirs(output_dir, exist_ok=True) url = 'https://drive.google.com/file/d/1ZW1ePWObhQTJnwZK02EuPwJOxjCf4xbg/view?usp=drive_link' logger.info(f"Downloading Liger PSFs to {output_dir}...") temp_zip = gdown.download(url=url, output=os.path.join(output_dir, 'liger_psfs.zip'), quiet=False) if temp_zip is None or not os.path.exists(temp_zip): msg = "Failed to download PSFs" logger.error(msg) raise RuntimeError(msg) try: with zipfile.ZipFile(temp_zip, 'r') as zip_ref: extracted_files = zip_ref.namelist() if not extracted_files: msg = "Downloaded PSF zip archive is empty" logger.error(msg) raise RuntimeError(msg) top_level_names = { f.split('/')[0] for f in extracted_files if 'MACOSX' not in f and not f.startswith('.') } if not top_level_names: msg = "No valid top-level folder found in PSF zip archive" logger.error(msg) raise RuntimeError(msg) if len(top_level_names) > 1: logger.warning(f"Multiple top-level entries found in zip: {top_level_names}. Using first sorted entry.") top_level_folder = sorted(top_level_names)[0] zip_ref.extractall(output_dir) final_dir = os.path.join(output_dir, top_level_folder) logger.info(f"Successfully downloaded and extracted Liger PSFs to {final_dir}") return final_dir except zipfile.BadZipFile as e: logger.error(f"Downloaded file {temp_zip} is not a valid zip archive: {e}") raise RuntimeError("Invalid zip archive for Liger PSFs") from e finally: if os.path.exists(temp_zip): os.remove(temp_zip)
def _get_liger_psf_filename( instrument_mode : str | None = None, wave : float | None = None, xs : float | None = None, ys : float | None = None, xdet : float | None = None, ydet : float | None = None, ) -> tuple[str, int]: # Determine "closest" PSFs in wavelength and position xs_ao = np.array([-15, -10, -5, 0, 5, 10, 15]) ys_ao = np.array([-15, -10, -5, 0, 5, 10, 15]) if instrument_mode == 'img': if xdet is not None and ydet is not None: scale = 0.01 # arcsec/pixel xs = scale * xdet + xs_ao[0] ys = scale * ydet + ys_ao[0] xs = xs_ao[np.argmin(np.abs(xs_ao - xs))] ys = ys_ao[np.argmin(np.abs(ys_ao - ys))] elif xs is not None and ys is not None: xs = xs_ao[np.argmin(np.abs(xs_ao - xs))] ys = ys_ao[np.argmin(np.abs(ys_ao - ys))] else: xs = 8.8 ys = 8.8 xs = xs_ao[np.argmin(np.abs(xs_ao - xs))] ys = ys_ao[np.argmin(np.abs(ys_ao - ys))] elif instrument_mode.lower() == 'ifs': xs = 0 ys = 0 xs = xs_ao[np.argmin(np.abs(xs_ao - xs))] ys = ys_ao[np.argmin(np.abs(ys_ao - ys))] if wave is None: logger.warning("Warning: No wavelength provided for PSF selection. Defaulting to H-band (1.65 microns).") wave = 1.65 # microns liger_psf_bps = ['Y', 'J', 'H', 'K'] liger_psf_filter_waves = np.array([1.02, 1.248, 1.65, 2.124]) bp = liger_psf_bps[np.argmin(np.abs(liger_psf_filter_waves - wave))] if bp == 'Y': filename = os.path.join( 'PSFs_LTAO_11_09_19', 'ltao_7x7_YJHK', 'ltao_7_7_hy', f"evlpsfcl_1_x{xs}_y{ys}.fits", ) hdunum = 1 elif bp == 'J': filename = os.path.join( 'PSFs_LTAO_11_09_19', 'ltao_7x7_YJHK', 'ltao_7_7_kj', f"evlpsfcl_1_x{xs}_y{ys}.fits", ) hdunum = 1 elif bp == 'H': filename = os.path.join( 'PSFs_LTAO_11_09_19', 'ltao_7x7_YJHK', 'ltao_7_7_hy', f"evlpsfcl_1_x{xs}_y{ys}.fits", ) hdunum = 0 elif bp == 'K': filename = os.path.join( 'PSFs_LTAO_11_09_19', 'ltao_7x7_YJHK', 'ltao_7_7_hy', f"evlpsfcl_1_x{xs}_y{ys}.fits", ) hdunum = 0 return filename, hdunum
[docs] def load_liger_psf( instrument_mode : str | None = None, wave : float | None = None, xs : float | None = None, ys : float | None = None, xdet : float | None = None, ydet : float | None = None, ) -> tuple[np.ndarray, dict]: """ Load a LIGER PSF for a given wavelength and position. Parameters ---------- instrument_mode : str | None The instrument mode ('img', 'ifs') to determine which PSF to load. wave : float | None The wavelength in nanometers. xs : float | None The x position in arcseconds. ys : float | None The y position in arcseconds. xdet : float | None The x detector position in pixels. ydet : float | None The y detector position in pixels. Returns ------- tuple[np.ndarray, dict] The PSF image and its metadata. """ # Get psf filepath and HDU psf_dir = _get_liger_psf_dir() filename, hdunum = _get_liger_psf_filename( instrument_mode=instrument_mode, wave=wave, xs=xs, ys=ys, xdet=xdet, ydet=ydet ) filepath = os.path.join(psf_dir, filename) # Load PSF psf, info = _read_liger_psf_file(filepath, hdunum) return psf, info
def _parse_liger_psf_loc(filename : str) -> tuple[int, int]: match = re.search(r"_x([-+]?\d+)_y([-+]?\d+)", os.path.basename(filename)) x, y = match.group(1), match.group(2) x, y = int(x), int(y) return x, y def _read_liger_psf_file( filename : str, hdunum : int | None = None, ) -> tuple[np.ndarray, dict]: with fits.open(filename) as hdulist: psf = hdulist[hdunum].data info = _parse_liger_psf_header(hdulist[hdunum].header) info['filename'] = filename info['hdunum'] = hdunum info['position'] = _parse_liger_psf_loc(filename) # Ensure odd shape # NOTE: This needs futher explanation why the PSFs have an even shape # NOTE: We may need to interpolate instead, TBD if psf.shape[0] % 2 == 0: psf = psf[1:, :].copy() if psf.shape[1] % 2 == 0: psf = psf[:, 1:].copy() psf = to_little_endian(psf) return psf, info def _parse_liger_psf_header(header : fits.Header): info = {} info['r0'] = header['R0'] * 1E6 # meters -> microns info['l0'] = header['L0'] * 1E6 # meters -> microns info['wavelength'] = header['WVL'] * 1E6 # meters -> microns info['opd_sampling'] = header['DT'] * 1E6 # meters -> microns info['fft_grid'] = int(header['NFFT'].real) info['psf_sampling'] = header['DP'] info['sum'] = header['SUM'] info['itime'] = header['DT'] info['theta'] = float(header['THETA'].real) return info ################### #### IRIS PSFs #### ################### def _get_iris_psf_dir() -> str: return os.path.join(get_resource_dir(), 'PSFs/IRIS')
[docs] def download_iris_psfs( output_dir: str | None = None, skip_if_exists: bool = True ) -> str: raise NotImplementedError("IRIS PSF download not implemented yet")
[docs] def load_iris_psf( instrument_mode : str, wave : float | None = None, xs : float | None = None, ys : float | None = None, xdet : float | None = None, ydet : float | None = None, itime : float = 300, zenith : str = '45', atm : str = '50', ): """ Get the IRIS PSF for the imager or IFS for the given input parameters. Parameters ---------- instrument_mode : str The instrument mode ('img', 'ifs'). wave : float The wavelength in microns. xs : float | None The x offset in arcsec from on-axis. Default is 0. ys : float | None The y offset in arcsec from on-axis. Default is 0. xdet : float | None The x detector position in pixels. Default is None, which will be converted to xs. ydet : float | None The y detector position in pixels. Default is None, which will be converted to ys. itime : float The integration time in seconds. Default is 300. zenith : str The zenith angle in degrees. Default is '45'. atm : str The atmosphere in percentile. Default is '50'. Returns ------- psf : np.ndarray The PSF image. info : dict The PSF info from the header. """ filename = _get_iris_psf_filename( instrument_mode=instrument_mode, xs=xs, ys=ys, xdet=xdet, ydet=ydet, itime=itime, zenith=zenith, atm=atm, ) filepath = os.path.join(_get_iris_psf_dir(), filename) psf, info = _read_iris_psf(filepath, wave=wave) info['instrument_mode'] = instrument_mode return psf, info
def _get_iris_psf_filename( instrument_mode : str, xdet : float | None = None, ydet : float | None = None, xs : float | None = None, ys : float | None = None, itime : float = 300, zenith : str = '45', atm : str = '50', ) -> str: """ Gets the filename of the PSF for the imager or IFS. Parameters ---------- instrument_mode: str The instrument mode ('img', 'ifs'). xdet : float | None The x detector position. ydet : float | None The y detector position. xs : float | None The x offset in arcsec from on-axis. ys : float | None The y offset in arcsec from on-axis. itime : float The integration time in seconds. zenith : str The zenith angle in degrees. atm : str The atmosphere in percentile. Returns ------- filename : str The filename of the PSF. """ # Resolve input params instrument_mode = instrument_mode.lower() itimes = np.array([1.4, 300]) k = np.argmin(np.abs(itimes - itime)) itime = itimes[k] if itime == int(itime): itime = int(itime) zenith = int(zenith) # Determine filename based on input parameters if instrument_mode == 'img': scale = 0.004 # arcsec/pixel xs_ao = np.array([0.6, 4.7, 8.8, 12.9, 17]) ys_ao = np.array([0.6, 4.7, 8.8, 12.9, 17]) if xdet is not None and ydet is not None: xs = scale * xdet + xs_ao[0] ys = scale * ydet + ys_ao[0] xs = xs_ao[np.argmin(np.abs(xs_ao - xs))] ys = ys_ao[np.argmin(np.abs(ys_ao - ys))] elif xs is not None and ys is not None: xs = xs_ao[np.argmin(np.abs(xs_ao - xs))] ys = ys_ao[np.argmin(np.abs(ys_ao - ys))] else: xs = 8.8 ys = 8.8 if xs == int(xs): xs = int(xs) if ys == int(ys): ys = int(ys) filename = f"za{zenith}_{int(atm)}p_im_{itime}s{os.sep}evlpsfcl_1_x{xs}_y{ys}_2mas.fits" elif instrument_mode == 'ifs': filename = f"za{zenith}_{int(atm)}p_ifu_{itime}s{os.sep}evlpsfcl_1_x0_y0_2mas.fits" else: raise ValueError(f"Unknown instrument_mode '{instrument_mode}'.") return filename def _read_iris_psf( filepath : str, hdunum : int | None = None, wave : float | None = None ) -> tuple[np.ndarray, dict]: """ Read the IRIS PSF file and return the PSF array and metadata. """ if hdunum is None: hdunum = _get_iris_psf_hdu_for_wavelength(filepath, wave) with fits.open(filepath) as hdulist: psf = hdulist[hdunum].data psf = to_little_endian(psf) info = _parse_iris_psf_header(hdulist[hdunum].header) info['filename'] = filepath info['hdunum'] = hdunum info['atm'] = filepath.split('/')[-2][2:4] info['weather'] = filepath.split('/')[-2][5:7] return psf, info def _get_iris_psf_hdu_for_wavelength(filepath : str, wave : float) -> int: """ Get the HDU number for a given wavelength in microns. """ with fits.open(filepath) as hdulist: waves = np.full(len(hdulist), np.nan) for i in range(len(hdulist)): header = hdulist[i].header info = _parse_iris_psf_header(header) waves[i] = info['wavelength'] hdunum = np.argmin(np.abs(waves - wave)) return hdunum def _parse_iris_psf_header(header : fits.Header) -> dict: """ Parse the header of a PSF file. """ # Split into a list comments = "" for comment in header['COMMENT']: comments += comment.strip() comments = comments.split(';') # Result info = {} # Position match = re.search(r'Science PSF at \(([\d.]+),\s*([\d.]+)\)\s*arcsec', comments[0]) info['x'] = match[1] info['y'] = match[2] # r0 match = re.search(r'r0=([\d.]+)', comments[1]) info['r0'] = float(match[1]) # l0 match = re.search(r'l0=([\d.]+)', comments[1]) info['l0'] = float(match[1]) # wavelength match = re.search(r'Wavelength:\s*([\d.eE+-]+)m', comments[2]) info['wavelength'] = 1E6 * float(match[1]) # convert meters to microns # OPD sampling match = re.search(r'OPD Sampling:\s*([\d.]+)m', comments[3]) info['opd_sampling'] = 1E6 * float(match[1]) # convert meters to microns # fft grid match = re.search(r'FFT Grid:\s*(\d+)x(\d+)', comments[4]) info['fft_grid'] = (int(match[1]), int(match[2])) # psf sampling match = re.search(r'PSF Sampling:\s*([\d.eE+-]+)\s*arcsec', comments[5]) info['psf_sampling'] = float(match[1]) # arcsec # Sum match = re.search(r'PSF Sum to:\s*([\d.eE+-]+)', comments[6]) info['sum'] = float(match[1]) # itime match = re.search(r'Exposure:\s*(\d+)s', comments[7]) info['itime'] = float(match[1]) return info