import so3g.proj
import numpy as np
from pixell import enmap, wcsutils, utils
from . import healpix_utils as hp_utils
import time
import re
DEG = np.pi/180
if hasattr(so3g.proj.quat, "quat"):
g3quat = so3g.proj.quat.quat
else:
g3quat = so3g.proj.quat.Quat
def _get_csl(sight):
"""Return the CelestialSightLine equivalent of sight. If sight is
already of that class, it is returned. If it's a G3VectorQuat or
an array [n,4], those coordinates are wrapped and returned (note a
reference may be taken rather than a copy made).
"""
if isinstance(sight, so3g.proj.CelestialSightLine):
return sight
if isinstance(sight, so3g.proj.quat.G3VectorQuat):
c = so3g.proj.CelestialSightLine()
c.Q = sight
return c
qa = np.asarray(sight, dtype='float')
c = so3g.proj.CelestialSightLine()
c.Q = so3g.proj.quat.G3VectorQuat(qa)
return c
def _valid_arg(*args, src=None):
"""Return some data, possibly extracted from an AxisManager (or
dict...), based on the arguments args. This is to help with
processing function arguments that override a default behavior
which is to look up a thing in axisman. For example::
signal = _valid_arg(signal, 'signal', src=axisman)
is equivalent to::
if signal is None:
signal = 'signal'
if isinstance(signal, str):
signal = axisman[signal]
Similarly::
sight = _valid_arg(sight, src=axisman)
is a shorthand for::
if sight is not None:
if isinstance(sight, string):
sight = axisman[sight]
Each element of args should be either a data vector (non string),
a string, or None. The arguments are processed in order. The
first argument that is not None will cause the function to return;
if that argument k is a string, then axisman[k] is returned;
otherwise k is returned directly. If all arguments are None, then
None is returned.
"""
for a in args:
if a is not None:
if isinstance(a, str):
try:
a = src[a]
except TypeError:
raise TypeError(f"Tried to look up '{a}' in axisman={axisman}")
return a
return None
def _not_both(a, b, name='{item}', dtype=None):
if a is not None:
if b is not None:
raise ValueError('self.%s and kwarg %s both not None!' % (name, name))
b = a
if b is not None and dtype is not None:
b = b.astype(dtype)
return b
class Timer:
"""Context manager that prints elapsed time to terminal or a log or
whatever. For example::
with Timer('tod-to-map projection', logger.info):
P.to_map(tod)
Should log a message of the form::
INFO:tod-to-map projection: 3.212 seconds
Use the fmt keyword argument to enter your own format string,
which can include references to {msg}, {start_time}, {end_time},
{elapsed}.
To avoid repeating the same arguments, subclass this and set
FORMAT and PRINT_FUNC for your use cases::
class MyTimer(Timer):
FORMAT = 'mapmaker7: {msg} took {elapsed:10.3f} seconds'
PRINT_FUNC = lambda x: (logger.info(x), print(x))
"""
FMT = '{msg:40}: {elapsed:10.3f} seconds'
PRINT_FUNC = print
def __init__(self, msg=None, print_func=None, fmt=None):
if msg is None:
msg = 'timed operation'
if fmt is None:
fmt = self.FMT
if print_func is None:
print_func = self.PRINT_FUNC
self.msg = msg
self.fmt = fmt
self.print_func = print_func
self.start_time = time.time()
def __enter__(self):
return self
def __exit__(self, *args, **kw):
self.end_time = time.time()
self.elapsed = self.end_time - self.start_time
self.text = self.fmt.format(
msg=self.msg, start_time=self.start_time, end_time=self.end_time,
elapsed=self.elapsed)
self.print_func(self.text)
def get_fplane(tod, dets=None, focal_plane=None, hwp=False):
"""Return a FocalPlane object for the AxisManager tod.
Args:
dets (list of str): If set, the detectors will be restricted to this list.
focal_plane (AxisManager): A sub-axis-manager containing the detector
layout and response information. Defaults to tod.focal_plane.
This must contain at least the xi and eta members. Optional members are
gamma, T, P, Q and U, which are used to specify the detector response.
Typically these would be used like this:
gamma: Only detector polariztion angle specified. Detectors assumed to
have 100% response in total intensity and polarization.
gamma, P: Detector polarization angle and polarization efficiency.
gamma, T, P: As above, but also including total intensity. This is
usually unnecessary, but in demodulation mapmaking it's sometimes
useful to support polarization-sensitive detectors with no sensitivity
to total intensity.
T, Q, U: Alternative to specifying polarization angles. The detectors'
response to T, Q and U are directly specified in the focal plane frame.
Equivalent to gamma = arctan(U,Q)/2, P = (Q**2+U**2)**0.5.
Returns:
A FocalPlane object, which simply encapsulates a numpy array of detector
quaternions and a numpy array of detector responses to total intensity
and polarization.
"""
dets = _valid_arg(dets, tod.dets.vals, src=tod)
fp = _valid_arg(focal_plane, 'focal_plane', src=tod)
fp = fp.restrict("dets", dets)
kwargs = {key:getattr(fp,key) for key in ["gamma","T","P","Q","U"] if key in fp}
fplane = so3g.proj.FocalPlane.from_xieta(fp.xi, fp.eta, **kwargs, hwp=hwp)
return fplane
[docs]
def get_radec(tod, wrap=False, dets=None, timestamps=None, focal_plane=None,
boresight=None, sight=None):
"""Get the celestial coordinates of all detectors at all times.
Args:
wrap (bool): If True, the output is stored into tod['radec'].
This can also be a string, in which case the result is stored
in tod[wrap].
dets (list of str): If set, then coordinates are only computed
for the requested detectors. Note you probably can't wrap the
result, in this case, as the dets axis is non-concordant.
Returns:
The returned array has shape (n_det, n_samp, 4). The four
components in the last dimension correspond to (lon, lat,
cos(psi), sin(psi)). The lon and lat are in radians and
correspond to RA and dec of equatorial coordinates. Psi is is
the parallactic rotation, measured from North towards West
(opposite the direction of standard Position Angle).
"""
dets = _valid_arg(dets, tod.dets.vals, src=tod)
if sight is None:
timestamps = _valid_arg(timestamps, 'timestamps', src=tod)
boresight = _valid_arg(boresight, 'boresight', src=tod)
sight = so3g.proj.CelestialSightLine.az_el(
timestamps, boresight.az, boresight.el, roll=boresight.roll,
site='so', weather='typical')
else:
sight = _get_csl(_valid_arg(sight, 'sight', src=tod))
fplane = get_fplane(tod, dets=dets, focal_plane=focal_plane)
asm = so3g.proj.Assembly.attach(sight, fplane)
output = np.zeros((len(dets), len(sight.Q), 4))
proj = so3g.proj.Projectionist()
proj.get_coords(asm, output=output)
if wrap is True:
wrap = 'radec'
if wrap:
tod.wrap(wrap, output, [(0, 'dets'), (1, 'samps')])
return output
[docs]
def get_horiz(tod, wrap=False, dets=None, timestamps=None, focal_plane=None,
boresight=None):
"""Get the horizon coordinates of all detectors at all times.
Args:
wrap (bool): If True, the output is stored into tod['horiz'].
This can also be a string, in which case the result is stored
in tod[wrap].
dets (list of str): If set, then coordinates are only computed
for the requested detectors. Note you probably can't wrap the
result, in this case, as the dets axis is non-concordant.
Returns:
The returned array has shape (n_det, n_samp, 4). The four
components in the last dimension correspond to (-lon, lat,
cos(psi), sin(psi)). The -lon and lat are in radians and
correspond to horizon azimuth and elevation. Psi is is
the parallactic rotation, measured from North towards West
(opposite the direction of standard Position Angle).
"""
dets = _valid_arg(dets, tod.dets.vals, src=tod)
timestamps = _valid_arg(timestamps, 'timestamps', src=tod)
boresight = _valid_arg(boresight, 'boresight', src=tod)
sight = so3g.proj.CelestialSightLine.for_horizon(
timestamps, boresight.az, boresight.el, roll=boresight.roll)
fplane = get_fplane(tod, dets=dets, focal_plane=focal_plane)
asm = so3g.proj.Assembly.attach(sight, fplane)
output = np.zeros((len(dets), len(timestamps), 4))
proj = so3g.proj.Projectionist()
proj.get_coords(asm, output=output)
# The lonlat pair is (-az, el), so restore the az sign.
output[:,:,0] *= -1
if wrap is True:
wrap = 'horiz'
if wrap:
tod.wrap(wrap, output, [(0, 'dets'), (1, 'samps')])
return output
[docs]
def get_wcs_kernel(proj, ra=None, dec=None, res=None):
"""Construct a WCS. This fixes the projection type (e.g. CAR, TAN),
centered with respect to a reference point (at ra,dec = (0,0)), and
resolution of a pixelization, without specifying a particular grid
of pixels.
This interface is subject to change.
Args:
proj (str): Either the name of a FITS projection to use (e.g. 'car',
'cea', 'tan'), in which case "res" must also be specified, or a
string containing both the projection and the resolution, in this
format:
proj-res
where proj is a projection type (e.g. 'car', 'tan, 'gnom') and res
is the resolution, in appropriate units (deg, arcmin or arcsec).
Examples of acceptable args:
'car-0.5deg'
'tan-3arcmin'
'gnom-25arcsec'
ra: Right Ascension (longitude) of the reference position, in
radians.
dec: Declination (latitude) of the reference position, in
radians.
res: Resolution, in radians.
Returns a WCS object that captures the requested pixelization.
"""
if len(proj) > 4:
m = re.match('(?P<proj>car|tan|cea|gnom)-(?P<res>[0-9]*.?[0-9]*)(?P<unit>arcsec|arcmin|deg)',
proj)
if m is None:
raise ValueError("Input projection string is incorrectly formatted.")
proj = m['proj']
res = float(m['res'])
unit = m['unit']
# Convert to radians
res = res * np.pi/180.
if unit == 'arcmin':
res /= 60.
if unit == 'arcsec':
res /= 3600.
_, wcs = enmap.geometry(np.array((0, 0)), shape=(1, 1), proj=proj,
res=(res, -res))
else:
assert np.isscalar(res) # This ain't enlib.
_, wcs = enmap.geometry(np.array((dec, ra)), shape=(1, 1), proj=proj,
res=(res, -res))
if proj == 'car':
wcs.wcs.crpix = [1.0, 0.5]
# Check if the resolution is sensible, for SHTs
res = res * 180./np.pi # Easier to do this check in degs
div = 90./res - np.round(90./res)
if abs(div) > 1e-8:
raise ValueError(f"The requested resolution {res} deg does not divide 90 deg evenly.")
return wcs
def get_focal_plane_cover(tod=None, count=0, focal_plane=None,
xieta=None, det_weights=None):
"""Process a bunch of detector positions into a center and radius such
that a circle with that center and radius contains all the
detectors. Also return detector positions, arranged approximately
in a circle, whose convex hull contains all input detectors.
Args:
tod (AxisManager): source of focal_plane, if it is not passed.
count (int): number of points on the enclosing circle to return.
focal_plane (AxisManager): source for xi and eta, if xieta is
not passed in
xieta (array): (2, n) array (or similar) of xi and eta
detector positions.
det_weights (array): If provided, must be same length as the xi
and eta vectors. Only dets with non-zero value for det_weights
will be included in the evaluation of the cover.
Returns:
xieta0: array[2] with array center, (xi0, eta0).
radius: radius of the enclosing circle.
xietas: array with shape (2, count) carrying the xi and eta
coords of the circular convex hull.
Notes:
If count=0, an empty list is returned for xietas. Otherwise,
count must be at least 3 so that the shape is not degenerate.
Any xi, eta that are not finite (e.g. nan or inf) are excluded
from the computation. If no detectors remain after the combined
finiteness and det_weights cuts, a ValueError is raised.
Note that ``det_weights`` can be int, float, or bool
type. Sometimes you might want to only include optical dets in
the result; e.g.::
..., det_weights=(aman.det_info.wafer.type=="OPTC"), ...
In degenerate cases (all dets are in exactly the same place), a
radius of zero may be returned.
"""
if xieta is None:
if focal_plane is None:
focal_plane = tod.focal_plane
xi = focal_plane.xi
eta = focal_plane.eta
else:
xi, eta = xieta[:2]
mask = np.isfinite(xi) * np.isfinite(eta)
if det_weights is not None:
mask *= det_weights.astype(bool)
if not np.any(mask):
raise ValueError('All provided (xi, eta) coords are excluded; '
'cannot estimate a focal plane cover.')
# Restrict to only dets under consideration.
xi, eta = xi[mask], eta[mask]
qs = so3g.proj.quat.rotation_xieta(xi, eta)
# Starting guess for center
xi0, eta0 = xi.mean(), eta.mean()
for i in range(10):
q = so3g.proj.quat.rotation_xieta(xi0, eta0)
d_xi, d_eta, _ = so3g.proj.quat.decompose_xieta(~q * qs)
if abs(d_xi.mean()) + abs(d_eta.mean()) < 1e-5*DEG:
break
xi0 += d_xi.mean()*.8
eta0 += d_eta.mean()*.8
else:
raise ValueError('No convergence?')
R = (d_xi**2 + d_eta**2).max()**.5
if count == 0:
return ((xi0, eta0), R, [])
if count < 3:
raise ValueError('count must be 0 or >=3.')
dphi = 2*np.pi/count
phi = np.arange(count) * dphi
# cos(dphi/2) is the largest underestimate in radius one can make when
# replacing a circle with an n_circ-sided polygon, as we do here.
L = 1.01 * R / np.cos(dphi/2)
xi, eta = L * np.cos(phi), L * np.sin(phi)
# Rotate those into place.
xi, eta, _ = so3g.proj.quat.decompose_xieta(
q * so3g.proj.quat.rotation_xieta(xi, eta))
return (np.array([xi0, eta0]), R, np.array([xi, eta]))
[docs]
def get_supergeom(*geoms, tol=1e-3):
"""Given a set of compatible geometries [(shape0, wcs0), (shape1,
wcs1), ...], return a geometry (shape, wcs) that includes all of
them as a subset.
Each argument can be a geometry (shape, wcs) or an ndmap.
"""
def as_geom(item):
if isinstance(item, enmap.ndmap):
shape, wcs = item.geometry
else:
shape, wcs = item
return (shape[-2:], wcs)
s0, w0 = as_geom(geoms[0])
w0 = w0.deepcopy()
for item in geoms[1:]:
s, w = as_geom(item)
# is_compatible is necessary but not sufficient.
if not wcsutils.is_compatible(w0, w):
raise ValueError('Incompatible wcs: %s <- %s' % (w0, w))
# Depending on the projection, it may be possible to translate
# crval and crpix along each dimension and maintain exact
# pixel center correspondence.
translate = (False, False)
if wcsutils.is_plain(w0):
translate = (True, True)
elif wcsutils.is_cyl(w0) and w0.wcs.crval[1] == 0.:
translate = (True, False)
cdelt = w0.wcs.cdelt
if np.any(abs(w.wcs.cdelt - cdelt) / cdelt > tol):
raise ValueError("CDELT not the same.")
# Determine what shift in w.crpix would make the crval the same.
d_crpix = w.wcs.crpix - w0.wcs.crpix
d_crval = w.wcs.crval - w0.wcs.crval
tweak_crpix = [0, 0]
for axis in [0, 1]:
if d_crval[axis] != 0 and not translate[axis]:
raise ValueError(f"Incompatible CRVAL in axis {axis}")
d = d_crval[axis] / cdelt[axis] - d_crpix[axis]
if abs((d + 0.5) % 1 - 0.5) > tol:
raise ValueError(f"CRVAL not separated by integer pix in axis {axis}.")
tweak_crpix[axis] = int(np.round(d))
d = np.array(tweak_crpix[::-1])
# Position of s in w0?
corner_a = d + [0, 0]
corner_b = d + s
# Super footprint, in w0.
corner_a = np.min([corner_a, [0, 0]], axis=0)
corner_b = np.max([corner_b, s0], axis=0)
# Boost the WCS
w0.wcs.crpix -= corner_a[::-1]
s0 = corner_b - corner_a
return tuple(map(int, s0)), w0
def _confirm_wcs(*wcss):
"""Insist that all arguments are either the same wcs, or None.
Raises a ValueError if more than one argument is a wcs, and not
all wcs agree. Returns either the first valid wcs, or None if
there aren't any.
"""
wcs_to_use = None
for i, wcs in enumerate(wcss):
if wcs is None:
continue
if wcs_to_use is None:
wcs_to_use = wcs
elif not wcsutils.equal(wcs_to_use, wcs):
raise ValueError(
f'The wcs from {i}th item ({wcs}) is discordant with prior ones ({wcs_to_use})')
return wcs_to_use
def _invert_weights_map(weights, eigentol=1e-6, kill_partials=True,
UPLO='U'):
"""Compute an inverse weights matrix, using eigendecomposition methods
that are safe against singular matrices. This is similar to
scipy.linalg.pinvh, but applied to each pixel in a map in an
efficient way.
Args:
weights (array): an array (or ndarray) with at least 2
dimensions, where the leading two dimensions represent
submatrices that are to be inverted; the other dimensions
index "the pixel". Valid shapes would be, for example, (3, 3,
200, 100) or (3, 3, 10231230).
eigentol (float): sets the threshold for keeping eigenvectors.
In each sub-matrix inversion, any eigenvectors whose absolute
values are less than eigentol times the largest absolute
eigenvalue are set to zero (and thus excluded from inclusion
in the inverse).
kill_partials (bool): if True, then pixels where any
eigenvectors are zero (or have been forced to zero) will have
all their eigenvectors forced to zero. Stated another way,
all pixels with singular or nearly singular weights
sub-matrices will be treated as having no weight at all.
UPLO (str): this can be 'U' or 'L', signifying that only the
upper diagonal or lower diagonal (respectively) elements of
each weights sub-matrix should be considered. (This argument
is passed through to np.linalg.eigh.)
Returns:
A matrix with the same shape as weights, but where the submatrix
carried in the first two dimensions is the inverse of the
corresponding submatrix of weights; or possibly a pseudo-inverse
or the zero matrix depending on arguments described above.
"""
# Quick short circuit in trivial case...
if weights.shape[:2] == (1, 1):
iw = np.zeros_like(weights) # yes, this preserves wcs
iw[weights!=0] = 1./weights[weights!=0]
return iw
# Collapse and reindex weights map so it is (npix, n, n).
w = weights.reshape(weights.shape[:2] + (-1,)).transpose(2, 0, 1)
# Get eigendecomposition of each (n, n) sub-matrix
v, U = np.linalg.eigh(w, UPLO)
# Identify acceptable eigenvalues -- reject ones that are non-positive or too
# small relative to max eigenvalue in their pixel.
eig_ok = (v > 0) * (v > v[:,-1:] * eigentol)
# Does one bad eig spoil the basket?
if kill_partials:
eig_ok *= np.all(eig_ok, axis=1)[:,None]
# Force each unacceptable eigenmode to 0.
U *= eig_ok[:,None,:]
# Set bad eigenvalues to 1, to avoid the divide-by-zero.
v[~eig_ok] = 1.
# Compute the effective inverse, U (1/diag(v)) U.T.
A = (U / v[:,None,:])
B = U.transpose(0,2,1)
iw = np.matmul(A, B)
# Reshape the output to match what was passed in.
return iw.transpose(1,2,0).reshape(weights.shape)
def _apply_inverse_weights_map(inverse_weights, target, out=None):
"""Apply a map of matrices to a map of vectors.
Assumes inverse_weights.shape = (a, b, ...) and target.shape =
(b, ...); the result has shape (a, ...).
"""
if out is None:
out = np.empty(inverse_weights.shape[1:],
dtype=target.dtype)
if isinstance(target, enmap.ndmap):
out = enmap.ndmap(out, target.wcs)
# Recall matmul(a, b) operates on the last two axes of (a, b). So
# move axes, and create a second one in target; re-order at end.
iw = np.moveaxis(inverse_weights, (0,1), (-2,-1))
t = np.moveaxis(target[:,None], (0,1), (-2,-1))
out_moved = np.moveaxis(out[:,None], (0,1), (-2,-1))
np.matmul(iw, t, out=out_moved)
return out
[docs]
class ScalarLastQuat(np.ndarray):
"""Wrapper class for numpy arrays carrying quaternions with the ijk1
signature.
The practice in many codes, including TOAST, quaternionarray, and
scipy is to store quaternions in numpy arrays with shape (..., 4),
with the real part of the quaternion at index [..., 3]. In
contrast, spt3g_software inherits the 1ijk from boost.
This class serves two main purposes:
- It can be used to annotate numpy arrays as carrying quaternions
in the ijk1 signature (without disturbing their behavior as
numpy arrays).
- It provides conversion to and from G3 quaternion containers,
including signature correction.
When instantiating an object of this class with a numpy array as
an argument, the object will wrap a view of the array (not a
copy).
When instantiating an object of this class with a G3 quat as an
argument, the object will store a copy of the G3 quat, translated
to ijk1 signature::
q_g3 = spt3g.core.quat(1.,2.,3.,4.)
q_tq = ScalarLastQuat(q_g3)
q_g3_again = q_tq.to_g3()
print(q_g3, q_tq, q_g3_again)
=> (1,2,3,4) [2. 3. 4. 1.] (1,2,3,4)
"""
def __new__(cls, arr):
if isinstance(arr, so3g.proj.quat.G3VectorQuat):
arr = np.asarray(arr)
obj = np.empty(arr.shape)
obj[:,:3] = arr[:,1:]
obj[:,3] = arr[:,0]
elif isinstance(arr, g3quat):
obj = np.array((arr.b, arr.c, arr.d, arr.a))
else:
obj = np.asarray(arr)
return obj.view(cls)
[docs]
def to_g3(self):
"""Return the ijk1-signature equivalent of the enclosed quaternion
array, as a spt3g.core.quat (if 1-d) or a G3VectorQuat (if
2-d).
"""
if self.shape[-1] != 4:
raise ValueError("Last axis must have 4 elements.")
if self.ndim == 1:
b, c, d, a = self[:].astype(float)
return g3quat(a, b, c, d)
if self.ndim == 2:
temp = np.zeros(self.shape, float)
temp[..., 0] = self[..., 3]
temp[..., 1:] = self[..., :3]
return so3g.proj.quat.G3VectorQuat(temp)
raise ValueError("Can only convert 1- or 2-d arrays to G3.")
def get_deflected_sightline(aman, wobble_meta, site='so', weather='typical'):
"""
Constructs a deflected CelestialSightLine using HWP-synchronous
pointing correction using combined wobble metadata that contains
both amp and phase fields.
This function will raise ValueError unless all detectors belong to a single
wafer and frequency band. It extracts the corresponding deflection amplitude
and phase from the metadata, computes the wobble correction quaternion, and
applies it to the boresight pointing.
Parameters
----------
aman : AxisManager
AxisManager for the observation, must include hwp_angle, timestamps,
and boresight.az/el, as well as det_info with wafer and band info.
wobble_meta : AxisManager
Metadata tree containing both amp and phase fields under
wobble_meta.{amp, phase}
site : str
Observatory site identifier for sightline generation (default 'so').
weather : str
Atmospheric condition tag for sightline model (default 'typical').
Returns
-------
sight : CelestialSightLine
The sightline with the wobble correction quaternion applied.
"""
wafer_slots = np.unique(aman.det_info.wafer_slot)
bands = np.unique(aman.det_info.wafer.bandpass)
if len(wafer_slots) != 1 or len(bands) != 1:
raise ValueError("Detectors span multiple wafer_slots or bands.")
# the amp and phase are the same for a given wafer, so we can take any of them, in this case for detector index 0
# !!!!! this won't work for mixing more than one wafer.
# the metadata has amplitudes in arcmin, and phases in radians
amp = wobble_meta.amp[0]/60.*np.pi/180.0
phase = wobble_meta.phase[0]
dxi = amp * np.cos(aman.hwp_angle - phase)
deta = -amp * np.sin(aman.hwp_angle - phase)
deflq = so3g.proj.quat.rotation_xieta(xi=dxi, eta=deta)
sight = so3g.proj.CelestialSightLine.az_el(
aman.timestamps,
aman.boresight.az,
aman.boresight.el,
roll=aman.boresight.roll,
weather=weather,
site=site,
)
sight.Q = sight.Q * ~deflq
return sight
def normalize_geometry(shape, wcs):
"""Analyze (shape, wcs) and return a pixel-compatible (shape, wcs)
that positions the reference pixel in a way that works with so3g projection
routines. Only cylindrical projections are affected.
"""
# Can't freely change wcs for non-separable geometries
# (so non-cylindrical ones), as this would change the geometry
# in an incompatible way
if not wcsutils.is_separable(wcs): return shape, wcs
# The pointing matrix assumes that crval ra is in [-180,180],
# and that all points are within 180° of this
wcs = wcs.deepcopy()
x_mid = shape[-1]/2+1
ra_mid = wcs.wcs.crval[0] + (x_mid-wcs.wcs.crpix[0])*wcs.wcs.cdelt[0]
ra_mid = utils.rewind(ra_mid, ref=0, period=360)
wcs.wcs.crpix[0] = x_mid
wcs.wcs.crval[0] = ra_mid
return shape, wcs