# Copyright (c) 2019-2024 Simons Observatory.
# Full license can be found in the top level "LICENSE" file.
import os
import sys
from datetime import datetime, timezone
import re
import numpy as np
import astropy.units as u
from astropy.table import QTable
from toast.instrument import Focalplane, GroundSite, Telescope
from toast.utils import Logger, name_UID
from ..core.hardware import Hardware, build_readout_id, parse_readout_id
from ..sim_hardware import sim_nominal
from .sim_focalplane import sim_telescope_detectors
from so3g.proj.coords import SITES
FOCALPLANE_RADII = {
"LAT": u.Quantity(3.6, u.deg),
"SAT1": u.Quantity(17.8, u.deg),
"SAT2": u.Quantity(17.8, u.deg),
"SAT3": u.Quantity(17.8, u.deg),
"SAT4": u.Quantity(17.2, u.deg),
}
def get_telescope(telescope, wafer_slots, tube_slots):
"""Determine which telescope matches the detector selections"""
if telescope is not None:
return telescope
# User did not set telescope so we infer it from the
# tube and wafer slots
hwexample = sim_nominal()
if wafer_slots is not None:
wafer_slots = wafer_slots.split(",")
wafer_map = hwexample.wafer_map()
tube_slots = [wafer_map["tube_slots"][ws] for ws in wafer_slots]
elif tube_slots is not None:
tube_slots = tube_slots.split(",")
else:
raise RuntimeError("Must set telescope, wafer_slots or tube_slots.")
for tube_slot in tube_slots:
for telescope_name, telescope_data in hwexample.data["telescopes"].items():
if tube_slot in telescope_data["tube_slots"]:
if telescope is None:
telescope = telescope_name
elif telescope != telescope_name:
raise RuntimeError(
f"Tubes '{tube_slots}' span more than one telescope"
)
if telescope is None:
raise RuntimeError(
f"Failed to match tube_slot = '{tube_slot}' with a telescope"
)
return telescope
def get_tele_wafer_band_name(tele, tube, wafer, band):
"""Return a simplified name for one band of a wafer.
There are multiple places where we need to construct a unique string
composed of the telescope name, optics tube, wafer and band within a
wafer. The formal band name includes the telescope, which results
in annoying duplication.
Args:
tele (str): Name of the telescope
tube (str): The optics tube slot
wafer (str): The wafer slot name
band (str): The name of the band in the hardware dictionary
Returns:
(str): The name.
"""
bf_pat = re.compile(r"(.*_)")
band_freq = bf_pat.sub("", band)
return f"{tele}_{tube}_{wafer}_{band_freq}"
class SOSite(GroundSite):
def __init__(
self,
name="ATACAMA",
lat=SITES["so"].lat * u.degree,
lon=SITES["so"].lon * u.degree,
alt=SITES["so"].elev * u.meter,
weather="atacama",
**kwargs,
):
super().__init__(
name,
lat,
lon,
alt,
**kwargs,
)
[docs]
class SOFocalplane(Focalplane):
"""SO Focalplane class.
This can be constructed in several ways:
- From a file while applying selection criteria.
- From an existing (pre-selected) Hardware instance.
- From a nominal sim on the fly with selections.
Note that to support serialization, the simulated hardware dictionary elements
have values in standard units. When constructing our focalplane table we restore
those units and build Quantities.
Args:
hw (Hardware): If specified, construct from a Hardware object in memory.
hwfile (str): If specified, load this hardware model from disk.
det_info_file (str): If simulating a Hardware model, optionally specify
a det_info format file to load
det_info_version (str): The version of the det_info file format.
telescope (str): If not None, select only detectors from this telescope.
sample_rate (Quantity): Use this sample rate for all detectors.
bands (str): Comma separated string of bands to use.
wafer_slots (str): Comma separated string of wafers to use.
tube_slots (str): Comma separated string of tubes to use.
thinfp (int): The factor by which to reduce the number of detectors.
creation_time (float): Optional timestamp to use when building readout_id.
comm (MPI.Comm): Optional MPI communicator.
"""
def __init__(
self,
hw=None,
hwfile=None,
det_info_file=None,
det_info_version=None,
telescope=None,
sample_rate=u.Quantity(10.0, u.Hz),
bands=None,
wafer_slots=None,
tube_slots=None,
thinfp=None,
creation_time=None,
comm=None,
):
log = Logger.get()
meta = dict()
meta["telescope"] = get_telescope(telescope, wafer_slots, tube_slots)
field_of_view = 2 * FOCALPLANE_RADII[meta["telescope"]]
if creation_time is None:
# Use the current time
creation_time = datetime.now(tz=timezone.utc).timestamp()
if hw is None:
if comm is None or comm.rank == 0:
if hwfile is not None:
log.debug(f"Loading hardware configuration from {hwfile}...")
hw = Hardware(hwfile)
elif meta["telescope"] in ["LAT", "SAT1", "SAT2", "SAT3", "SAT4"]:
log.debug("Simulating default hardware configuration")
hw = sim_nominal()
sim_telescope_detectors(
hw,
meta["telescope"],
det_info=(det_info_file, det_info_version),
no_darks=det_info_file is not None,
)
else:
raise RuntimeError(
"Must provide a path to file or a valid telescope name"
)
if (
bands is not None
or wafer_slots is not None
or tube_slots is not None
):
match = dict()
if bands is not None:
match["band"] = bands.replace(",", "|")
if wafer_slots is not None:
match["wafer_slot"] = wafer_slots.split(",")
if tube_slots is not None:
tube_slots = tube_slots.split(",")
hw = hw.select(tube_slots=tube_slots, match=match)
if thinfp is not None:
dets = list(hw.data["detectors"].keys())
for det in dets:
pixel = hw.data["detectors"][det]["pixel"]
try:
pixel_id = int(pixel)
except ValueError:
pixel_id = name_UID(pixel)
if pixel_id % thinfp != 0:
del hw.data["detectors"][det]
ndet = len(hw.data["detectors"])
if ndet == 0:
raise RuntimeError(
f"No detectors match query: telescope={meta['telescope']}, "
f"tube_slots={tube_slots}, wafer_slots={wafer_slots}, "
f"bands={bands}, thinfp={thinfp}"
)
else:
log.debug(
f"{ndet} detectors match query: telescope={meta['telescope']}, "
f"tube_slots={tube_slots}, wafer_slots={wafer_slots}, "
f"bands={bands}, thinfp={thinfp}"
)
if comm is not None:
hw = comm.bcast(hw)
def get_par_float(ddata, key, default):
if key in ddata:
return float(ddata[key])
else:
return float(default)
(
readout_id,
names,
quats,
bands,
wafer_slots,
tube_slots,
nets,
net_corrs,
fknees,
fmins,
alphas,
As,
Cs,
bandcenters,
bandwidths,
ids,
pixels,
fwhms,
pols,
pol_angs,
pol_angs_wafer,
pol_orientations_wafer,
gamma,
wafer_xs,
wafer_ys,
card_slots,
channels,
AMCs,
biases,
readout_freqs,
bondpads,
mux_positions,
tele_wf_band,
) = (
[],
[],
[],
[],
[],
[],
[],
[],
[],
[],
[],
[],
[],
[],
[],
[],
[],
[],
[],
[],
[],
[],
[],
[],
[],
[],
[],
[],
[],
[],
[],
[],
[],
)
for det_name, det_data in hw.data["detectors"].items():
readout_id.append(
build_readout_id(
creation_time, det_data["wafer_slot"], det_data["channel"]
)
)
names.append(det_name)
quats.append(np.array([float(x) for x in det_data["quat"]]))
ids.append(int(det_data["ID"]))
# Replace pixel index on the wafer with a unique identifier that
# combines telescope, wafer and pixel index
pixels.append(
f"{meta['telescope']}_{det_data['wafer_slot']}_p{det_data['pixel']}"
)
fwhms.append(float(det_data["fwhm"]) * u.arcmin)
pols.append(det_data["pol"])
pol_angs.append(det_data["pol_ang"] * u.degree)
pol_angs_wafer.append(det_data["pol_ang_wafer"] * u.degree)
pol_orientations_wafer.append(det_data["pol_orientation_wafer"] * u.degree)
gamma.append(det_data["pol_ang"] * u.degree)
wafer_xs.append(det_data["wafer_x_mm"] * u.mm)
wafer_ys.append(det_data["wafer_y_mm"] * u.mm)
card_slots.append(det_data["card_slot"])
channels.append(det_data["channel"])
AMCs.append(det_data["AMC"])
biases.append(det_data["bias"])
readout_freqs.append(float(det_data["readout_freq_GHz"]) * u.GHz)
bondpads.append(det_data["bondpad"])
mux_positions.append(det_data["mux_position"])
# Band is used to retrieve band-averaged values
band = det_data["band"]
band_data = hw.data["bands"][band]
bands.append(band)
# Get the wafer slot and translate into tube slot
wafer_slot = det_data["wafer_slot"]
wafer_slots.append(wafer_slot)
# Determine which tube_slot has this wafer
for tube_slot, tube_data in hw.data["tube_slots"].items():
if wafer_slot in tube_data["wafer_slots"]:
break
else:
raise RuntimeError(f"{wafer_slot} is not in any tube_slot")
tube_slots.append(tube_slot)
tele_wf_band.append(
get_tele_wafer_band_name(meta["telescope"], tube_slot, wafer_slot, band)
)
# Get noise parameters. If detector-specific entries are
# absent, use band averages
nets.append(
get_par_float(det_data, "NET", band_data["NET"])
* 1.0e-6
* u.K
* u.s**0.5
)
net_corrs.append(get_par_float(det_data, "NET_corr", band_data["NET_corr"]))
fknees.append(get_par_float(det_data, "fknee", band_data["fknee"]) * u.mHz)
fmins.append(get_par_float(det_data, "fmin", band_data["fmin"]) * u.mHz)
alphas.append(get_par_float(det_data, "alpha", band_data["alpha"]))
As.append(get_par_float(det_data, "A", band_data["A"]))
Cs.append(get_par_float(det_data, "C", band_data["C"]))
# bandpass
lower = get_par_float(det_data, "low", band_data["low"]) * u.GHz
# center = get_par_float(det_data, "center", band_data["center"]) * u.GHz
upper = get_par_float(det_data, "high", band_data["high"]) * u.GHz
bandcenters.append(0.5 * (lower + upper))
bandwidths.append(upper - lower)
meta["platescale"] = hw.data["telescopes"][meta["telescope"]]["platescale"] \
* u.deg / u.mm
detdata = QTable(
[
readout_id,
names,
ids,
quats,
bands,
card_slots,
wafer_slots,
tube_slots,
fwhms,
nets,
net_corrs,
fknees,
fmins,
alphas,
As,
Cs,
bandcenters,
bandwidths,
pixels,
pols,
pol_angs,
pol_angs_wafer,
pol_orientations_wafer,
gamma,
wafer_xs,
wafer_ys,
channels,
AMCs,
biases,
readout_freqs,
bondpads,
mux_positions,
tele_wf_band,
],
names=[
"readout_id",
"name",
"uid",
"quat",
"band",
"card_slot",
"wafer_slot",
"tube_slot",
"FWHM",
"psd_net",
"NET_corr",
"psd_fknee",
"psd_fmin",
"psd_alpha",
"elevation_noise_a",
"elevation_noise_c",
"bandcenter",
"bandwidth",
"pixel",
"pol",
"pol_ang",
"pol_ang_wafer",
"pol_orientation_wafer",
"gamma",
"wafer_x",
"wafer_y",
"channel",
"AMC",
"bias",
"readout_freq",
"bondpad",
"mux_position",
"tele_wf_band",
],
meta=meta,
)
super().__init__(
detector_data=detdata,
field_of_view=field_of_view,
sample_rate=sample_rate,
)
def update_creation_time(det_data, creation_time):
"""Update the readout_id column of a focalplane with a new creation time.
Args:
det_data (QTable): The detector properties table.
creation_time (float): The updated time for use in the readout_id
Returns:
None
"""
for row in range(len(det_data)):
wf, ct, chan = parse_readout_id(det_data["readout_id"][row])
det_data["readout_id"][row] = build_readout_id(creation_time, wf, chan)
def simulated_telescope(
hw=None,
hwfile=None,
det_info_file=None,
det_info_version=None,
telescope_name=None,
sample_rate=10 * u.Hz,
bands=None,
wafer_slots=None,
tube_slots=None,
thinfp=None,
comm=None,
):
if hw is not None and telescope_name is None:
# get it from the hw
if len(hw.data["telescopes"]) != 1:
raise RuntimeError("Input Hardware has multiple telescopes")
telescope_name = list(hw.data["telescopes"].keys())[0]
focalplane = SOFocalplane(
hw=hw,
hwfile=hwfile,
det_info_file=det_info_file,
det_info_version=det_info_version,
telescope=telescope_name,
sample_rate=sample_rate,
bands=bands,
wafer_slots=wafer_slots,
tube_slots=tube_slots,
thinfp=thinfp,
comm=comm,
)
site = SOSite()
# The focalplane construction above will lookup the telescope name if needed
# from the wafer / tube information
telescope = Telescope(
focalplane.detector_data.meta["telescope"],
focalplane=focalplane,
site=site,
)
return telescope