import numpy as np
import astropy.units as u
import named_arrays as na
from .._spectrograph import SpectrographObservation
__all__ = [
"average",
"model_background",
"model_spectral_line",
"model_total",
"fit",
"subtract_spectral_line",
"smooth",
"estimate",
]
[docs]
def average(
obs: SpectrographObservation,
axis: str | tuple[str, ...],
) -> SpectrographObservation:
"""
Compute the average along the given axis using :func:`numpy.nanmedian`
Parameters
----------
obs
The observation to be averaged.
axis
The logical axis along which to take the average.
Examples
--------
Download an IRIS spectrograph observation and compute the average along
the raster average.
.. jupyter-execute::
import matplotlib.pyplot as plt
import astropy.visualization
import named_arrays as na
import iris
# Load a spectrograph observation
obs = iris.sg.SpectrographObservation.from_time_range(
time_start=astropy.time.Time("2021-09-23T06:00"),
time_stop=astropy.time.Time("2021-09-23T07:00"),
)
# Save the time and raster axes
axis = (obs.axis_time, obs.axis_detector_x)
# Compute the average along the time and raster axes
avg = iris.sg.background.average(
obs=obs,
axis=axis,
)
# Plot the result
with astropy.visualization.quantity_support():
fig, ax = plt.subplots()
img = na.plt.pcolormesh(
avg.inputs.wavelength,
avg.inputs.position.y,
C=avg.outputs.value,
)
ax.set_xlabel(f"wavelength ({ax.get_xlabel()})")
ax.set_ylabel(f"helioprojective $y$ ({ax.get_ylabel()})")
fig.colorbar(
mappable=img.ndarray.item(),
ax=ax,
label=f"average spectral radiance ({obs.outputs.unit})",
)
"""
obs = obs.copy_shallow()
shape = obs.inputs.shape
obs.inputs = obs.inputs.explicit.replace(
time=obs.inputs.time.ndarray.jd.mean(),
wavelength=obs.inputs.wavelength.broadcast_to(shape).mean(axis),
position=obs.inputs.position.broadcast_to(shape).mean(axis),
)
obs.outputs = np.nanmedian(obs.outputs, axis=axis)
return obs
[docs]
def model_background(
velocity: na.AbstractScalar,
bias: u.Quantity | na.AbstractScalar,
slope: u.Quantity | na.AbstractScalar,
) -> na.AbstractScalar:
r"""
A simple linear model of the background in the proximity of a spectral
line, given by
.. math::
y(v) = m v + b
where :math:`m` is the slope,
:math:`v` is the velocity,
and :math:`b` is the bias.
Parameters
----------
velocity
The test points at which to evaluate the model background.
bias
The :math:`y`-intercept of the model.
slope
The linear slope of the model.
"""
return slope * velocity + bias
[docs]
def model_spectral_line(
velocity: na.AbstractScalar,
amplitude: na.AbstractScalar,
shift: na.AbstractScalar,
width: na.AbstractScalar,
kappa: na.AbstractScalar,
) -> na.AbstractScalar:
r"""
A simple model of a spectral line profile.
This function uses a kappa distribution,
.. math::
y(v) = A \left( 1 + \frac{(v - v_0)^2}{\kappa \sigma^2} \right)^{-\kappa - 1},
to model the spectral line profile,
where :math:`A` is the amplitude,
:math:`v` is the velocity,
:math:`v_0` is the shift,
:math:`\sigma` is the width,
and :math:`\kappa` is a parameter which controls the thickness of the tails.
Parameters
----------
velocity
The test points at which to evaluate the model background.
amplitude
The height of the spectral line.
shift
The center of the spectral line.
width
The width of the spectral line.
kappa
The fatness of the tails of the spectral line.
"""
x_squared = np.square(velocity - shift)
width_squared = np.square(width)
y = amplitude * (1 + x_squared / (kappa * width_squared)) ** (-kappa - 1)
return y
[docs]
def model_total(
velocity: na.AbstractScalar,
amplitude: na.AbstractScalar,
shift: na.AbstractScalar,
width: na.AbstractScalar,
kappa: na.AbstractScalar,
bias: u.Quantity | na.AbstractScalar,
slope: u.Quantity | na.AbstractScalar,
) -> na.AbstractScalar:
"""
The sum of :func:`model_background` and :func:`model_spectral_line`.
Parameters
----------
velocity
The test points at which to evaluate the model background.
amplitude
The height of the spectral line.
shift
The center of the spectral line.
width
The width of the spectral line.
kappa
The fatness of the tails of the spectral line.
bias
The :math:`y`-intercept of the model.
slope
The linear slope of the model.
"""
line = model_spectral_line(
velocity=velocity,
amplitude=amplitude,
shift=shift,
width=width,
kappa=kappa,
)
bg = model_background(
velocity=velocity,
bias=bias,
slope=slope,
)
return line + bg
def _objective(
data: na.AbstractScalar,
velocity: na.AbstractScalar,
axis_wavelength: str,
amplitude: na.AbstractScalar,
shift: na.AbstractScalar,
width: na.AbstractScalar,
kappa: na.AbstractScalar,
bias: u.Quantity | na.AbstractScalar,
slope: u.Quantity | na.AbstractScalar,
where: bool | na.AbstractScalar = True,
) -> na.AbstractScalar:
"""
The function which is minimized during background fitting
Parameters
----------
data
The observation to be fitted.
velocity
The Doppler velocity for each point in the observation.
axis_wavelength
The logical axis corresponding to increasing wavelength (velocity).
amplitude
The height of the spectral line.
shift
The center of the spectral line.
width
The width of the spectral line.
kappa
The fatness of the tails of the spectral line.
bias
The :math:`y`-intercept of the model.
slope
The linear slope of the model.
where
The points in the observation to consider when fitting.
"""
model = model_total(
velocity=velocity,
amplitude=amplitude,
shift=shift,
width=width,
kappa=kappa,
bias=bias,
slope=slope,
)
diff = data - model
result = np.sqrt(np.sum(np.square(diff), axis=axis_wavelength, where=where))
return result
[docs]
def fit(
data: na.AbstractScalar,
velocity: na.AbstractScalar,
axis_wavelength: str,
where: bool | na.AbstractScalar = True,
) -> na.CartesianNdVectorArray:
"""
Compute the parameters of :func:`model_total` which best fit `data` using
the gradient descent method.
Parameters
----------
data
The observation to be fitted.
velocity
The Doppler velocity for each point in the observation
axis_wavelength
The logical axis corresponding to increasing wavelength (velocity).
where
The points in the observation to consider when fitting.
Examples
--------
Fit a spectrograph image and display the average actual line profile
compared to the average fitted line profile.
.. jupyter-execute::
import numpy as np
import matplotlib.pyplot as plt
import astropy.units as u
import astropy.visualization
import named_arrays as na
import iris
# Load a spectrograph observation
obs = iris.sg.SpectrographObservation.from_time_range(
time_start=astropy.time.Time("2021-09-23T06:00"),
time_stop=astropy.time.Time("2021-09-23T07:00"),
)
# Save the time and raster axes
axis = (obs.axis_time, obs.axis_detector_x)
# Compute the average along the time and raster axes
avg = iris.sg.background.average(
obs=obs,
axis=axis,
)
# Ignore line profiles that are mostly NaN
where_crop = np.isfinite(avg.outputs).mean(avg.axis_wavelength) > 0.7
data = avg.outputs[where_crop]
# Convert wavelength to velocity units
velocity = avg.inputs.velocity.cell_centers()
velocity = velocity[where_crop]
# Fit the data within +/- 150 km/s of line center
where = np.abs(velocity) < 150 * u.km / u.s
parameters = iris.sg.background.fit(
data=data,
velocity=velocity,
axis_wavelength=obs.axis_wavelength,
where=where,
)
# Evaluate the model with the best-fit parameters
data_fit = iris.sg.background.model_total(
velocity=velocity,
amplitude=parameters.components["amplitude"],
shift=parameters.components["shift"],
width=parameters.components["width"],
kappa=parameters.components["kappa"],
bias=parameters.components["bias"],
slope=parameters.components["slope"],
)
# Plot the average data and model
with astropy.visualization.quantity_support():
fig, ax = plt.subplots()
na.plt.plot(
velocity.mean(obs.axis_detector_y),
data.mean(obs.axis_detector_y),
label="data",
)
na.plt.plot(
velocity.mean(obs.axis_detector_y),
data_fit.mean(obs.axis_detector_y),
label="fit",
)
na.plt.plot(
velocity.mean(obs.axis_detector_y),
(data - data_fit).mean(obs.axis_detector_y),
label="difference"
)
ax.set_xlim(-200, 200)
ax.set_xlabel(f"Doppler velocity ({ax.get_xlabel()})")
ax.set_ylabel(f"mean spectral radiance ({ax.get_ylabel()})")
ax.legend();
"""
def function(x: na.CartesianNdVectorArray):
x = x.components
return _objective(
data=data,
velocity=velocity,
axis_wavelength=axis_wavelength,
amplitude=x["amplitude"],
shift=x["shift"],
width=x["width"],
kappa=x["kappa"],
bias=x["bias"],
slope=x["slope"],
where=where,
)
data_nan = np.where(where, data, np.nan)
guess = dict(
# amplitude=np.percentile(img, 99, axis=axis_wavelength),
amplitude=np.nanpercentile(
a=data_nan,
q=99.9,
axis=axis_wavelength,
),
shift=0 * u.km / u.s,
width=20 * u.km / u.s,
kappa=1.1,
bias=0 * u.DN,
slope=0 * u.DN / (u.km / u.s),
)
guess = na.CartesianNdVectorArray.from_components(guess)
result = na.optimize.minimum_gradient_descent(
function=function,
guess=guess,
step_size=na.CartesianNdVectorArray.from_components(
components=dict(
amplitude=0.01 * u.DN,
shift=0.01 * (u.km / u.s) ** 2 / u.DN,
width=0.01 * (u.km / u.s) ** 2 / u.DN,
bias=0.001 * u.DN,
kappa=0.0001 / u.DN,
slope=1e-6 * u.DN / (u.km / u.s) ** 2,
),
),
momentum=0.9,
min_gradient=na.CartesianNdVectorArray.from_components(
components=dict(
amplitude=0.5,
shift=0.5 * u.DN / (u.km / u.s),
width=0.5 * u.DN / (u.km / u.s),
kappa=0.5 * u.DN,
bias=1,
slope=110 * (u.km / u.s),
)
),
max_iterations=10000,
)
return result
[docs]
def subtract_spectral_line(
obs: SpectrographObservation,
axis_wavelength: str,
) -> SpectrographObservation:
"""
Fit `obs` using :func:`fit` and subtract the :func:`model_spectral_line`
component.
Parameters
----------
obs
The observation to remove the spectral line from.
axis_wavelength
The logical axis corresponding to increasing wavelength.
Examples
--------
Subtract the spectral line from an averaged spectrograph observation.
.. jupyter-execute::
import matplotlib.pyplot as plt
import astropy.units as u
import astropy.visualization
import named_arrays as na
import iris
# Load a spectrograph observation
obs = iris.sg.SpectrographObservation.from_time_range(
time_start=astropy.time.Time("2021-09-23T06:00"),
time_stop=astropy.time.Time("2021-09-23T07:00"),
)
# Save the time and raster axes
axis = (obs.axis_time, obs.axis_detector_x)
# Compute the average along the time and raster axes
avg = iris.sg.background.average(
obs=obs,
axis=axis,
)
# Subtract the spectral line from the average
bg = iris.sg.background.subtract_spectral_line(
obs=avg,
axis_wavelength=obs.axis_wavelength,
)
# Plot the result
with astropy.visualization.quantity_support():
fig, ax = plt.subplots()
img = na.plt.pcolormesh(
bg.inputs.wavelength,
bg.inputs.position.y,
C=bg.outputs.value,
vmin=-5,
vmax=+5,
)
ax.set_xlabel(f"wavelength ({ax.get_xlabel()})")
ax.set_ylabel(f"helioprojective $y$ ({ax.get_ylabel()})")
fig.colorbar(
mappable=img.ndarray.item(),
ax=ax,
label=f"average spectral radiance ({obs.outputs.unit})",
)
"""
where_crop = np.isfinite(obs.outputs).mean(obs.axis_wavelength) > 0.7
velocity = obs.inputs.velocity.cell_centers()
velocity = velocity[where_crop]
where = np.abs(velocity) < 150 * u.km / u.s
data = obs.outputs[where_crop]
parameters = fit(
data=data,
velocity=velocity,
axis_wavelength=axis_wavelength,
where=where,
)
model_fit_line = model_spectral_line(
velocity=velocity,
amplitude=parameters.components["amplitude"],
shift=parameters.components["shift"],
width=parameters.components["width"],
kappa=parameters.components["kappa"],
)
obs = obs.copy_shallow()
obs.outputs[where_crop] = data - model_fit_line
return obs
[docs]
def smooth(
obs: SpectrographObservation,
axis_wavelength: str,
axis_detector_y: str,
) -> SpectrographObservation:
"""
Smooth the given observation using
:func:`named_arrays.ndfilters.trimmed_mean_filter`.
Parameters
----------
obs
The observation to be smoothed.
axis_wavelength
The logical axis corresponding to increasing wavelength.
axis_detector_y
The logical axis corresponding to increasing position along the slit.
Examples
--------
Compute a smoothed version of an average spectrograph observation.
.. jupyter-execute::
import matplotlib.pyplot as plt
import astropy.units as u
import astropy.visualization
import named_arrays as na
import iris
# Load a spectrograph observation
obs = iris.sg.SpectrographObservation.from_time_range(
time_start=astropy.time.Time("2021-09-23T06:00"),
time_stop=astropy.time.Time("2021-09-23T07:00"),
)
# Save the time and raster axes
axis = (obs.axis_time, obs.axis_detector_x)
# Compute the average along the time and raster axes
avg = iris.sg.background.average(
obs=obs,
axis=axis,
)
# Subtract the spectral line from the average
bg = iris.sg.background.smooth(
obs=avg,
axis_wavelength=obs.axis_wavelength,
axis_detector_y=obs.axis_detector_y,
)
# Plot the result
with astropy.visualization.quantity_support():
fig, ax = plt.subplots()
img = na.plt.pcolormesh(
bg.inputs.wavelength,
bg.inputs.position.y,
C=bg.outputs.value,
)
ax.set_xlabel(f"wavelength ({ax.get_xlabel()})")
ax.set_ylabel(f"helioprojective $y$ ({ax.get_ylabel()})")
fig.colorbar(
mappable=img.ndarray.item(),
ax=ax,
label=f"average spectral radiance ({obs.outputs.unit})",
)
"""
obs.outputs = na.ndfilters.trimmed_mean_filter(
array=obs.outputs,
size={axis_wavelength: 21},
where=np.isfinite(obs.outputs),
mode="truncate",
proportion=0.2,
)
obs.outputs = na.ndfilters.trimmed_mean_filter(
array=obs.outputs,
size={axis_detector_y: 21},
where=np.isfinite(obs.outputs),
mode="truncate",
proportion=0.2,
)
return obs
[docs]
def estimate(
obs: SpectrographObservation,
axis_time: str,
axis_wavelength: str,
axis_detector_x: str,
axis_detector_y: str,
) -> SpectrographObservation:
"""
Estimate the background from a given spectrograph observation.
This function applies :func:`average`, :func:`subtract_spectral_line`,
and :func:`smooth` in succession to estimate the background.
Parameters
----------
obs
The observation to estimate the background from.
axis_time
The logical axis corresponding to changing raster number.
axis_wavelength
The logical axis corresponding to changing wavelength.
axis_detector_x
The logical axis corresponding the changing position perpendicular to
the slit.
axis_detector_y
The logical axis corresponding to changing position along the slit.
Examples
--------
Estimate the background from a spectrograph observation
.. jupyter-execute::
import numpy as np
import matplotlib.pyplot as plt
import astropy.units as u
import astropy.visualization
import named_arrays as na
import iris
# Load a spectrograph observation
obs = iris.sg.SpectrographObservation.from_time_range(
time_start=astropy.time.Time("2021-09-23T06:00"),
time_stop=astropy.time.Time("2021-09-23T07:00"),
)
# Subtract the spectral line from the average
bg = iris.sg.background.estimate(
obs=obs,
axis_time=obs.axis_time,
axis_wavelength=obs.axis_wavelength,
axis_detector_x=obs.axis_detector_x,
axis_detector_y=obs.axis_detector_y,
)
# Plot the result
with astropy.visualization.quantity_support():
fig, ax = plt.subplots()
img = na.plt.pcolormesh(
bg.inputs.wavelength,
bg.inputs.position.y,
C=bg.outputs.value,
)
ax.set_xlabel(f"wavelength ({ax.get_xlabel()})")
ax.set_ylabel(f"helioprojective $y$ ({ax.get_ylabel()})")
fig.colorbar(
mappable=img.ndarray.item(),
ax=ax,
label=f"average spectral radiance ({obs.outputs.unit})",
)
|
Subtract the background from the observation and compare to the original.
.. jupyter-execute::
# Remove background from spectrograph observation
obs_nobg = obs.copy_shallow()
obs_nobg.outputs = obs.outputs - bg.outputs
# Select the first raster to plot
index = {obs.axis_time: 0}
# Plot the original compared to the background-subtracted
# observation.
with astropy.visualization.quantity_support():
fig, ax = plt.subplots(
ncols=2,
sharex=True,
sharey=True,
constrained_layout=True,
)
mappable = plt.cm.ScalarMappable(
norm=plt.Normalize(vmin=0, vmax=5),
)
ax[0].set_title("original")
na.plt.pcolormesh(
obs.inputs.position.x[index],
obs.inputs.position.y[index],
C=np.nanmean(obs.outputs.value[index], axis=obs.axis_wavelength),
ax=ax[0],
norm=mappable.norm,
cmap=mappable.cmap,
)
ax[1].set_title("corrected")
na.plt.pcolormesh(
obs_nobg.inputs.position.x[index],
obs_nobg.inputs.position.y[index],
C=np.nanmean(obs_nobg.outputs.value[index], axis=obs.axis_wavelength),
ax=ax[1],
norm=mappable.norm,
cmap=mappable.cmap,
)
ax[0].set_xlabel(f"helioprojective $x$ ({ax[0].get_xlabel()})")
ax[0].set_ylabel(f"helioprojective $y$ ({ax[0].get_ylabel()})")
fig.colorbar(
mappable=mappable,
ax=ax,
label=f"mean spectral radiance ({obs.outputs.unit:latex_inline})",
)
|
Plot the original and corrected median spectral line profiles
.. jupyter-execute::
# Define the axes to average over
axis_txy = (
obs.axis_time,
obs.axis_detector_x,
obs.axis_detector_y,
)
# Plot the result
with astropy.visualization.quantity_support():
fig, ax = plt.subplots()
na.plt.stairs(
obs.inputs.wavelength.mean(obs.axis_time),
np.nanmedian(obs.outputs, axis=axis_txy),
label="original",
)
na.plt.stairs(
obs_nobg.inputs.wavelength.mean(obs.axis_time),
np.nanmedian(obs_nobg.outputs, axis=axis_txy),
label="corrected",
)
ax.set_xlabel(f"Doppler velocity ({ax.get_xlabel()})")
ax.set_ylabel(f"median spectral radiance ({ax.get_ylabel()})")
ax.set_ylim(top=10)
ax.legend()
"""
avg = average(
obs=obs,
axis=(axis_time, axis_detector_x),
)
bg_0 = subtract_spectral_line(
obs=avg,
axis_wavelength=axis_wavelength,
)
bg_1 = smooth(
obs=bg_0,
axis_wavelength=axis_wavelength,
axis_detector_y=axis_detector_y,
)
return bg_1