"""update_obsdb.py
Create and/or update an obsdb and obsfiledb based on some books.
The config file could be of the form:
.. code-block:: yaml
base_dir: path_to_base_directories. Can be a list or a single string.
obsdb_cols:
start_time: float
stop_time: float
n_samples: int
telescope: str
tube_slot: str
type: str
subtype: str
obsdb: dummyobsdb.sqlite
obsfiledb: dummyobsfiledb.sqlite
lat_tube_list_file: path to yaml dict matching tubes and bands
tolerate_stray_files: True
known_bad_books_file: path to file listing bad books (one obs_id per line)
extra_extra_files:
- Z_bookbinder_log.txt
extra_files:
- M_index.yaml
- M_book.yaml
"""
from sotodlib.core.metadata import ObsDb
from sotodlib.core import Context
from sotodlib.site_pipeline import check_book
from sotodlib.io import load_book
import os
import fnmatch
import glob
import yaml
import re
import numpy as np
import time
import argparse
import logging
from sotodlib.site_pipeline.utils.logging import init_logger
from typing import Optional
from itertools import product
logger = init_logger('update_obsdb', 'update-obsdb: ')
[docs]
def telescope_lookup(telescope: str):
"""
Set a number of common queries given a telescope name
Arguments
----------
telescope : str
Name of telescope in M_index
"""
if telescope == "sat" or telescope == "satp1":
return {"telescope": "satp1", "telescope_flavor": "sat",
"tube_flavor": "mf", "detector_flavor": "tes"}
elif telescope == "satp2":
return {"telescope": "satp2","telescope_flavor": "sat",
"tube_flavor": "uhf", "detector_flavor": "tes"}
elif telescope == "satp3":
return {"telescope": "satp3", "telescope_flavor": "sat",
"tube_flavor": "mf", "detector_flavor": "tes"}
elif telescope == "lat":
return {"telescope": "lat", "telescope_flavor": "lat",
"detector_flavor": "tes"}
else:
logger.error("unknown telescope type given by bookbinder")
return {}
def _find_books_deep(basedirs):
"""Use os.walk to find any directories within basedirs (including each
basedir itself) that contains a file M_index.yaml, and yield each
directory path.
"""
for b in basedirs:
for dirpath, dirnames, filenames in os.walk(b):
if 'M_index.yaml' in filenames:
yield dirpath
def _find_books_shallow(basedirs):
"""Like _find_books_deep but only checks immediate subdirectories
within each basedir.
"""
for b in basedirs:
for d in sorted(glob.glob(b + '/*/M_index.yaml')):
yield os.path.dirname(d)
[docs]
def main(config: str,
recency: float = None,
booktype: Optional[str] = "both",
verbosity: Optional[int] = 2,
overwrite: Optional[bool] = False,
fastwalk: Optional[bool] = False,
limit: Optional[int] = None,
filter: Optional[str] = None,
):
"""
Create or update an obsdb for observation or operations data.
Arguments
----------
config : str
Path to config file
recency : float
How far back in time to look for databases, in days. If None,
goes back to the UNIX start date (default: None)
booktype : str
Look for observations or operations data or both (default: both)
verbosity : int
Output verbosity. 0:Error, 1:Warning, 2:Info(default), 3:Debug
overwrite : bool
if False, do not re-check existing entries
fastwalk : bool
if True, assume the directories have a structure /base_dir/obs|oper/\\d{5}/...
Then replace base_dir with only the directories where \\d{5} is greater or
equal to recency.
limit : int
Limit the number of books to process; helpful for debugging.
If None or 0, there is no limit.
"""
logger.handlers[0].setLevel(logging.DEBUG)
if verbosity == 0:
logger.setLevel(logging.ERROR)
elif verbosity == 1:
logger.setLevel(logging.WARNING)
elif verbosity == 2:
logger.setLevel(logging.INFO)
elif verbosity == 3:
logger.setLevel(logging.DEBUG)
logger.info("Updating obsdb")
bookcart = []
if booktype not in ["obs", "oper", "both"]:
logger.warning("Specified booktype inadapted to update_obsdb")
if booktype == "both":
accept_type = ["obs", "oper"]
else:
accept_type = [booktype]
config_dict = yaml.safe_load(open(config, "r"))
try:
base_dir = config_dict["base_dir"]
except KeyError:
logger.error("No base directory base_dir specified in config file!")
if "obsdb" in config_dict:
bookcartobsdb = ObsDb(map_file=config_dict["obsdb"])
else:
logger.warning("No obsdb named in the configuration file")
bookcartobsdb = ObsDb("obsdb.sqlite")
if "obsdb_cols" in config_dict:
col_list = []
for col, typ in config_dict["obsdb_cols"].items():
col_list.append(col+" "+typ)
bookcartobsdb.add_obs_columns(col_list)
config_dict["known_bad_books"] = []
if config_dict.get("known_bad_books_file"):
try:
with open(config_dict["known_bad_books_file"], "r") as bbf:
# Read the file, dropping empty lines and lines led by #.
config_dict["known_bad_books"] = [
x.strip() for x in bbf.readlines() if x.strip()[:1] not in ["", "#"]]
except:
raise IOError("Failed to read bad books file: " +
str(config_dict["known_bad_books_file"]))
#How far back we should look
tnow = time.time()
if recency is not None:
tback = tnow - recency*86400
else:
tback = 0 #Back to the UNIX Big Bang
existing = bookcartobsdb.query()["obs_id"]
#Check if there are one or multiple base_dir specified
if isinstance(base_dir,str):
base_dir = [base_dir]
if fastwalk:
abv_tback = int(f"{int(tback):05}"[:5]) #Make sure we have at least five chars
abv_tnow = int(f"{int(tnow):05}"[:5])
abv_codes = list(range(abv_tback, abv_tnow+1))
#Build the combinations base_dir/booktype/\d{5}
bd_fmt = "{base_dir}/{obs_type}/{abv_code}"
logger.info(f"Search path limited to {bd_fmt}")
logger.info(f" where base_dir in {base_dir}")
logger.info(f" and obs_type in {accept_type}")
logger.info(f" and abv_code in [{abv_codes[0]}-{abv_codes[-1]}]")
base_dir = [f"{os.path.join(x[0], x[1], str(x[2]))}" for x in product(base_dir, accept_type, abv_codes)]
book_cand_iterable = _find_books_shallow(base_dir)
else:
book_cand_iterable = _find_books_deep(base_dir)
skipped_count = 0
for dirpath in book_cand_iterable:
_, book_id = os.path.split(dirpath)
if book_id in existing and not overwrite:
continue
if book_id in config_dict["known_bad_books"]:
logger.debug(f"{book_id} known to be bad, skipping it")
skipped_count += 1
continue
edit_time = os.path.getmtime(dirpath)
if edit_time > tback:
#Looks like a book folder and edited recently enough
bookcart.append(dirpath)
logger.info(f"Found {len(bookcart)} new books in {time.time()-tnow} s")
if skipped_count:
logger.info(f"(Excluded {skipped_count} known bad books.)")
if filter is not None:
bookcart = [bookpath for bookpath in bookcart
if fnmatch.fnmatch(bookpath, filter)]
logger.info(f"Filtering to match pattern {filter}: leaves {len(bookcart)} items.")
#Check the books for the observations we want
op_counter = 0
bad_book_counter = 0
for bookpath in sorted(bookcart):
if limit and op_counter >= limit:
break
if check_meta_type(bookpath) in accept_type:
t1 = time.time()
op_counter += 1
logger.info(f"Starting processing of book at {bookpath}")
try:
#obsfiledb creation
ok, obsfiledb_info = check_book.scan_book_dir(
bookpath, logger, config_dict, prep_obsfiledb=True)
if not ok:
raise RuntimeError("check_book found fatal errors, not adding.")
check_book.add_to_obsfiledb(
obsfiledb_info, logger, config_dict, overwrite=True)
logger.info(f"Ran check_book in {time.time()-t1} s")
except Exception as e:
logger.error(f"Failed to add {bookpath}. Error: {e}")
bad_book_counter += 1
continue
index = yaml.safe_load(open(os.path.join(bookpath, "M_index.yaml"), "rb"))
obs_id = index.pop("book_id")
tags = index.pop("tags")
detsets = index.pop("detsets")
if "obsdb_cols" in config_dict:
very_clean = {col:index[col] for col in iter(config_dict["obsdb_cols"]) if col in index}
else:
col_list = []
clean = {key:val for key, val in index.items() if val is not None}
very_clean = {key:val for key, val in clean.items() if type(val) is not list}
for key, val in very_clean.items():
col_list.append(key+" "+type(val).__name__)
bookcartobsdb.add_obs_columns(col_list)
#Adding info that should be there for all observations
#Descriptive string columns
try:
telescope = index["telescope"]
flavors = telescope_lookup(telescope)
for flav in flavors:
bookcartobsdb.add_obs_columns([flav+" str"])
very_clean[flav] = flavors[flav]
if telescope == "lat":
lat_tube_list = yaml.safe_load(
open(config_dict["lat_tube_list_file"], "rb")
)
tube_flavor = lat_tube_list[index["tube_slot"]]
bookcartobsdb.add_obs_columns("tube_flavor str")
very_clean["tube_flavor"] = tube_flavor
except KeyError:
logger.error("No telescope key in index file or error with lat_tube_list")
very_clean["telescope_flavor"] = "unknown"
#Stream_ids and wafers
try:
stream_ids = index["stream_ids"]
unused_ids_check = np.ones(len(stream_ids), bool)
wafer_slots = index["wafer_slots"]
wafer_count = 0
wafer_slots_list = ""
stream_ids_list = ",".join(stream_ids)
for slot in wafer_slots:
if slot["stream_id"] in stream_ids:
wafer_slots_list += slot["wafer_slot"]+","
wafer_count += 1
unused_ids_check[stream_ids.index(slot["stream_id"])] = False
very_clean["wafer_count"] = wafer_count
very_clean["wafer_slots_list"] = wafer_slots_list[:-1] #Eliminate last comma
very_clean["stream_ids_list"] = stream_ids_list
if np.any(unused_ids_check):
#Some stream_ids not linked to wafers.
bad_streams = [sid for sid, uic in zip(stream_ids, unused_ids_check) if uic]
bad_streams_string = ", ".join(map(str, bad_streams))
logger.error(f"Missing info on some stream_ids: {bad_streams_string}")
bad_book_counter += 1
continue
bookcartobsdb.add_obs_columns(["wafer_count int", "wafer_slots_list str", "stream_ids_list str"])
except KeyError:
logger.error("Unable to find stream_ids or wafer slots")
#Time
try:
start = index["start_time"]
end = index["stop_time"]
bookcartobsdb.add_obs_columns(["timestamp float", "duration float"])
very_clean["timestamp"] = start
very_clean["duration"] = end - start
except KeyError:
logger.error("Incomplete timing information for obs_id {obs_id}")
#SAT HWP
if very_clean["telescope_flavor"] == "sat":
try:
very_clean["hwp_freq_mean"] = index["hwp_freq_mean"]
very_clean["hwp_freq_stdev"] = index["hwp_freq_stdev"]
bookcartobsdb.add_obs_columns(["hwp_freq_mean float",
"hwp_freq_stdev float"])
except KeyError:
logger.error(f"No HWP frequency info for obs_id {obs_id}")
#Scanning motion
stream_file = os.path.join(bookpath,"*{}*.g3".format(stream_ids[0]))
stream = load_book.load_book_file(stream_file, no_signal=True)
for coor in ["az", "el"]:
try:
coor_enc = stream.ancil[coor+"_enc"]
bookcartobsdb.add_obs_columns([f"{coor}_center float",
f"{coor}_throw float"])
very_clean[f"{coor}_center"] = round(.5 * (coor_enc.max() + coor_enc.min()), 4)
very_clean[f"{coor}_throw"] = round(.5 * (coor_enc.max() - coor_enc.min()), 4)
except KeyError:
logger.error(f"No {coor} pointing in some streams for obs_id {obs_id}")
try:
if very_clean["telescope_flavor"] == "sat":
bore_enc = stream.ancil["boresight_enc"]
very_clean["roll_center"] = round(-.5 * (bore_enc.max() + bore_enc.min()), 4)
very_clean["roll_throw"] = round(.5 * (bore_enc.max() - bore_enc.min()), 4)
if very_clean["telescope_flavor"] == "lat":
el_enc = stream.ancil["el_enc"]
corot_enc = stream.ancil["corotator_enc"]
roll = el_enc - 60. - corot_enc
very_clean["roll_center"] = round(.5 * (roll.max() + roll.min()), 4)
very_clean["roll_throw"] = round(.5 * (roll.max() - roll.min()), 4)
bookcartobsdb.add_obs_columns(["roll_center float", "roll_throw float"])
except KeyError:
logger.error(f"Unable to compute roll for obs_id {obs_id}")
# Make sure no invalid tags before update.
tags = [t.strip() for t in tags if t.strip() != '']
bookcartobsdb.update_obs(obs_id, very_clean, tags=tags)
logger.info(f"Finished {obs_id} in {time.time()-t1} s")
else:
bookcart.remove(bookpath)
if bad_book_counter != 0:
logger.error(f"Failed to add {bad_book_counter} books.")
raise(Exception)
[docs]
def get_parser(parser=None):
if parser is None:
parser = argparse.ArgumentParser()
parser.add_argument("--config", help="ObsDb, ObsfileDb configuration file",
type=str, required=True)
parser.add_argument("--recency", default=None, type=float,
help="Days to subtract from now to set as minimum ctime. If None, no minimum")
parser.add_argument("--verbosity", default=2, type=int,
help="Increase output verbosity. 0:Error, 1:Warning, 2:Info(default), 3:Debug")
parser.add_argument("--booktype", default="both", type=str,
help="Select book type to look for: obs, oper, both(default)")
parser.add_argument("--overwrite", action="store_true",
help="If true, writes over existing entries")
parser.add_argument("--fastwalk", action="store_true",
help="Assume known directory tree shape and speed up walkthrough")
parser.add_argument("--limit", type=int,
help="Limit processing to only this number of book candidates.")
parser.add_argument("--filter", type=str, default=None,
help="Limit processing to books (full path) matching this fnmatch wildcard string.")
return parser
if __name__ == '__main__':
parser = get_parser(parser=None)
args = parser.parse_args()
main(**vars(args))