"""Core emission-line fitting engine.
The main entry point is :func:`fit_lines`, which:
1. Subtracts a polynomial continuum.
2. Sets up Gaussian line models with resolution-aware widths.
3. Optimises via ``scipy.optimize.least_squares`` with bounds.
4. Returns a :class:`FitResult` with per-line fluxes, EWs, and SNR.
"""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from math import sqrt, pi
from pathlib import Path
from typing import Callable
import numpy as np
from scipy.optimize import least_squares
from .constraints import ConstraintSet
from .continuum import fit_continuum
from .io import Spectrum
from .lines import REST_LINES_A, get_line_list, observable_lines
from .models import asymmetric_gaussian, build_model, gaussian_binned, pixel_weight
from .resolution import resolve_R, sigma_inst_A
logger = logging.getLogger(__name__)
_SQRT2PI = sqrt(2.0 * pi)
[docs]
@dataclass
class LineResult:
"""Fit result for a single emission line.
Attributes
----------
name : str
Line name.
rest_wave_A : float
Rest-frame wavelength (Å).
amplitude : float
Best-fit amplitude (flux × Å, in flux-density units × Å).
centroid_A : float
Best-fit observed centroid (Å).
sigma_A : float
Best-fit observed Gaussian σ (Å).
flux : float
Integrated line flux (amplitude, since profile is area-normalised).
flux_err : float
Bootstrap uncertainty on flux.
ew_A : float
Rest-frame equivalent width (Å).
snr_int_err : float
Integrated SNR: flux / bootstrap flux error.
snr_int_cont : float
Integrated SNR: flux / (local continuum RMS × √N_pix).
snr_peak_err : float
Peak SNR: model peak / uncertainty array at peak pixel.
snr_peak_cont : float
Peak SNR: model peak / local continuum RMS.
"""
name: str
rest_wave_A: float
amplitude: float
centroid_A: float
sigma_A: float
flux: float
flux_err: float
ew_A: float
snr_int_err: float
snr_int_cont: float
snr_peak_err: float
snr_peak_cont: float
[docs]
def upper_limit(self, n_sigma: float = 3.0) -> float:
"""Return n-sigma flux upper limit."""
return n_sigma * self.flux_err
@property
def snr(self) -> float:
"""Backward-compatible alias for snr_int_err."""
return self.snr_int_err
[docs]
@dataclass
class FitResult:
"""Container for a complete line-fit result.
Attributes
----------
lines : dict of LineResult
Per-line results, keyed by line name.
params : np.ndarray
Full best-fit parameter vector.
model_flux : np.ndarray
Best-fit emission-line model (continuum-subtracted flux density).
continuum : np.ndarray
Best-fit continuum (µJy).
residuals : np.ndarray
Fit residuals (data − continuum − model), in µJy.
chi2 : float
Reduced χ² of the fit.
spectrum : Spectrum
Input spectrum.
line_names : list of str
Ordered line names.
constraints : ConstraintSet
Applied constraints.
success : bool
Whether the optimiser converged.
"""
lines: dict[str, LineResult]
params: np.ndarray
model_flux: np.ndarray
continuum: np.ndarray
residuals: np.ndarray
chi2: float
spectrum: Spectrum
line_names: list[str] = field(default_factory=list)
constraints: ConstraintSet | None = None
success: bool = True
lya_params: np.ndarray | None = None # [A_peak, mu, sigma, alpha]
[docs]
def flux_upper_limit(
self,
line_name: str,
n_sigma: float = 3.0,
) -> float | None:
"""Compute a noise-based flux upper limit for a line.
Uses the local RMS of the residuals near the line position,
multiplied by *n_sigma* and the line width.
Parameters
----------
line_name : str
Line name.
n_sigma : float
Number of sigma for the upper limit (default 3).
Returns
-------
float or None
Integrated flux upper limit in f_lam units, or ``None``
if there are insufficient pixels near the line.
"""
if line_name not in self.lines:
raise KeyError(f"Line '{line_name}' not found in result.")
lr = self.lines[line_name]
spec = self.spectrum
_SQRT2PI = np.sqrt(2.0 * np.pi)
from .io import _ujy_to_flam
resid_flam = _ujy_to_flam(self.residuals, spec.wave_um)
valid = np.isfinite(resid_flam)
wave_A = spec.wave_A
lam_obs = lr.centroid_A
sig_line = lr.sigma_A
near = np.abs(wave_A - lam_obs)
window = valid & (near < 5.0 * sig_line) & (near > 2.0 * sig_line)
if int(np.sum(window)) < 3:
window = valid & (near < 10.0 * sig_line)
if int(np.sum(window)) < 3:
return None
rms = float(np.sqrt(np.nanmean(resid_flam[window] ** 2)))
return n_sigma * rms * sig_line * _SQRT2PI
[docs]
def flux_upper_limits(
self,
line_names: list[str] | None = None,
n_sigma: float = 3.0,
snr_threshold: float = 3.0,
) -> dict[str, float]:
"""Compute noise-based upper limits for low-SNR lines.
Parameters
----------
line_names : list of str, optional
Lines to check. If ``None``, checks all fitted lines.
n_sigma : float
Number of sigma for the upper limit (default 3).
snr_threshold : float
Only compute upper limits for lines with SNR below this
(default 3.0).
Returns
-------
dict of {str: float}
``{line_name: flux_upper_limit}`` for each line below
the SNR threshold.
"""
names = line_names if line_names is not None else list(self.lines.keys())
uls: dict[str, float] = {}
for name in names:
if name not in self.lines:
continue
if self.lines[name].snr_int_err >= snr_threshold:
continue
ul = self.flux_upper_limit(name, n_sigma=n_sigma)
if ul is not None:
uls[name] = ul
return uls
def _grating_bounds(
grating: str | None,
sigma_inst: np.ndarray,
dlam_A: np.ndarray,
line_wave_obs_A: float,
sigma_factor: float = 1.0,
) -> tuple[float, float, float]:
"""Return (sigma_lo, sigma_seed, sigma_hi) in Å for a given grating.
Parameters
----------
grating : str or None
Grating name.
sigma_inst : np.ndarray
Instrumental σ array (Å).
dlam_A : np.ndarray
Pixel widths (Å).
line_wave_obs_A : float
Observed wavelength of the line (Å).
sigma_factor : float
Multiplicative factor applied to the upper width bound.
Use values > 1 for stacked spectra where redshift scatter
broadens lines beyond the instrumental resolution (default 1.0).
Returns
-------
tuple of float
``(sigma_lo, sigma_seed, sigma_hi)`` in Å.
"""
# Find σ_inst at the line wavelength.
idx = np.argmin(np.abs(dlam_A.cumsum() - line_wave_obs_A + dlam_A.cumsum()[0]))
sig = sigma_inst[min(idx, len(sigma_inst) - 1)]
pix = np.median(dlam_A)
g = (grating or "").upper()
if "PRISM" in g:
return (0.40 * pix, 0.90 * sig, 1.70 * sig * sigma_factor)
if any(k in g for k in ("G140M", "G235M", "G395M")):
return (max(0.12 * pix, 0.22 * sig), 0.60 * sig, 2.0 * sig * sigma_factor)
if any(k in g for k in ("G140H", "G235H", "G395H")):
return (0.10 * pix, 0.50 * sig, 1.8 * sig * sigma_factor)
# Default (e.g. stacked spectra): generous bounds — lines are broadened
# by galaxy velocity dispersions and the stacking process.
return (0.20 * pix, 1.0 * sig, 5.0 * sig * sigma_factor)
[docs]
def fit_lines(
spectrum: Spectrum,
z: float,
*,
grating: str | None = None,
R: float | Callable | None = None,
lines: list[str] | None = None,
wave_range_A: tuple[float, float] | None = None,
deg: int = 2,
n_boot: int = 1000,
clip_sigma: float = 2.5,
n_jobs: int = -1,
save_path: str | Path | None = None,
sigma_factor: float = 1.0,
centroid_vmax: float = 500.0,
centroid_max_sigma: float = 1.0,
moving_average: bool | int = False,
tie_balmer_to_oiii: bool = True,
tie_uv_doublets: bool = True,
tie_uv_centroids: bool = True,
tie_uv_widths: bool = True,
sigma_overrides: dict[str, tuple[float, float]] | None = None,
centroid_overrides: dict[str, tuple[float, float]] | None = None,
lya_break: bool = False,
niv_doublet_ratio: float | None = None,
ciii_doublet_ratio: float | None = None,
_label: str = "",
_p0_hint: dict[str, tuple[float, float, float]] | None = None,
) -> FitResult:
"""Fit emission lines in a spectrum.
Parameters
----------
spectrum : Spectrum
Input spectrum.
z : float
Source redshift.
grating : str, optional
Grating name. If ``None``, uses ``spectrum.grating``.
R : float or callable, optional
Resolving power. If ``None``, uses ``spectrum.R`` or derives from *grating*.
lines : list of str, optional
Lines to fit. If ``None``, auto-detected from grating and wavelength coverage.
wave_range_A : tuple of float, optional
``(lo, hi)`` observed wavelength range in Angstroms. If given, only
pixels within this range are used for continuum fitting and line
fitting. Lines outside the range are excluded.
deg : int
Continuum polynomial degree (default 2).
n_boot : int
Number of bootstrap iterations for uncertainty estimation (default 200).
clip_sigma : float
Continuum sigma-clipping threshold.
n_jobs : int
Number of parallel jobs for bootstrap. ``-1`` uses all cores,
``1`` runs sequentially. Default ``-1``.
save_path : str or Path, optional
If given, export per-line measurements to this text file after fitting.
sigma_factor : float
Multiplicative factor applied to the upper line-width bound.
Use values > 1 for stacked spectra where redshift scatter
broadens lines beyond the instrumental resolution (default 1.0).
centroid_vmax : float
Maximum centroid offset in km/s (default 500). Sets the
velocity ceiling on how far any line's centroid can wander
from its systemic prediction. Increase for stacked spectra
with larger velocity offsets between lines.
centroid_max_sigma : float
Resolution-aware cap on narrow-line centroids: each narrow
line's centroid can drift at most ``centroid_max_sigma ×
σ_inst`` from its systemic prediction, where σ_inst is the
instrumental Gaussian σ evaluated at the line. The actual
bound is ``min(centroid_vmax/c × λ_obs, centroid_max_sigma ×
σ_inst)``. Broad components (``_BROAD``/``_BROAD2``) bypass
this cap so real outflow blueshifts aren't clipped. Default
1.0 (≈ ±130 km/s at G395M OIII; ≈ ±60 km/s at G395H).
moving_average : bool or int
If ``False`` (default), use polynomial continuum. If ``True``,
use a median filter with a default window of 75 pixels. If an
``int``, use that as the median-filter window size.
tie_balmer_to_oiii : bool
Tie Balmer line widths and centroids to [OIII] 5007 in velocity
space (default ``True``). Set to ``False`` for stacked spectra
where OIII may be broader than the true narrow component, causing
Ha to absorb flux from neighbouring lines like [NII] 6585.
tie_uv_doublets : bool
Tie UV doublet kinematics and fix resonance-line flux ratios.
Recommended for stacked spectra where doublets are poorly
resolved (default ``False``).
sigma_overrides : dict, optional
Per-line sigma (width) bounds in Angstroms, keyed by line name.
Each value is ``(sigma_lo, sigma_hi)``. Overrides the automatic
grating-based bounds for the specified lines. Convert from
velocity: ``sigma_A = sigma_kms / 299792.458 * lambda_obs_A``.
centroid_overrides : dict, optional
Per-line centroid bounds in observed-frame Angstroms, keyed by
line name. Each value is ``(mu_lo, mu_hi)``. Overrides the
automatic centroid bounds for the specified lines. Useful for
anchoring closely-spaced doublet members in noisy spectra.
Returns
-------
FitResult
"""
spec = spectrum
grating = grating or spec.grating
R = R or spec.R
if grating is None and R is None:
from .resolution import R_from_pixels
logger.info("No grating or R specified; estimating R from pixel spacing.")
R = R_from_pixels(spec.wave_um)
# Apply fit window: crop spectrum to wavelength range.
if wave_range_A is not None:
lo_A, hi_A = wave_range_A
mask_win = (spec.wave_A >= lo_A) & (spec.wave_A <= hi_A)
if np.sum(mask_win) < 10:
raise ValueError(
f"Fit window [{lo_A:.0f}, {hi_A:.0f}] Å contains only "
f"{np.sum(mask_win)} pixels."
)
spec = Spectrum(
wave_um=spec.wave_um[mask_win],
flux_ujy=spec.flux_ujy[mask_win],
err_ujy=spec.err_ujy[mask_win],
grating=spec.grating,
z=spec.z,
R=spec.R,
meta=spec.meta,
)
logger.info(
"Fit window: %.0f–%.0f Å (%d pixels)", lo_A, hi_A, spec.n_pix,
)
# Determine which lines to fit.
if lines is None:
if grating is not None:
candidate_lines = get_line_list(grating)
else:
# Infer line list from estimated resolving power.
# Prism: R ~ 30–300; gratings/stacks: R ≥ 500.
R_arr = resolve_R(spec.wave_um, R=R)
R_med = float(np.median(R_arr))
if R_med > 500:
candidate_lines = get_line_list("grating")
logger.info("Median R ≈ %.0f → using resolved line list.", R_med)
else:
candidate_lines = get_line_list("prism")
logger.info("Median R ≈ %.0f → using prism line list.", R_med)
line_names = observable_lines(
candidate_lines, z, spec.wave_um.min(), spec.wave_um.max()
)
else:
line_names = list(lines)
if len(line_names) == 0:
logger.warning("No observable lines for z=%.4f in wavelength range.", z)
return FitResult(
lines={},
params=np.array([]),
model_flux=np.zeros(spec.n_pix),
continuum=np.zeros(spec.n_pix),
residuals=spec.flux_ujy.copy(),
chi2=np.nan,
spectrum=spec,
line_names=[],
success=False,
)
logger.info("Fitting %d lines at z=%.4f: %s", len(line_names), z, line_names)
# Resolution.
sig_inst = sigma_inst_A(spec.wave_um, grating=grating, R=R)
R_arr = resolve_R(spec.wave_um, grating=grating, R=R)
R_med = float(np.median(R_arr))
R_lo, R_hi = float(np.min(R_arr)), float(np.max(R_arr))
if not _label:
if abs(R_hi - R_lo) < 10:
print(f"Resolving power: R = {R_med:.0f}")
else:
print(f"Resolving power: R ≈ {R_med:.0f} (range {R_lo:.0f}–{R_hi:.0f})")
# Continuum subtraction.
# Auto-enable lya_break when Lya is in the line list.
_lya_in_lines = "Lya" in line_names
_lya_break = lya_break or _lya_in_lines
continuum = fit_continuum(
spec.wave_um,
spec.flux_ujy,
spec.err_ujy,
z,
line_names,
grating=grating,
R=R,
deg=deg,
clip_sigma=clip_sigma,
moving_average=moving_average,
lya_break=_lya_break,
)
flux_sub = spec.flux_ujy - continuum
# Convert to F_λ for fitting.
from .io import _ujy_to_flam, _flam_to_ujy
flam = _ujy_to_flam(flux_sub, spec.wave_um)
flam_err = _ujy_to_flam(spec.err_ujy, spec.wave_um)
valid = np.isfinite(flam) & np.isfinite(flam_err) & (flam_err > 0)
# Exclude pixels blueward of NV (IGM-absorbed / Lyman-break region).
# When fitting Lyα, extend to cover the Lyα region (down to ~1150 Å rest).
if _lya_in_lines:
_blue_cutoff_A = 1150.0 * (1.0 + z)
valid &= spec.wave_A >= _blue_cutoff_A
else:
nv_obs_A = REST_LINES_A["NV_1"] * (1.0 + z)
valid &= spec.wave_A >= nv_obs_A
# Drop lines that fall in detector gaps (< 50% valid pixels within ±5σ).
_kept: list[str] = []
for name in line_names:
lam_obs_A = REST_LINES_A[name] * (1.0 + z)
_, sig_seed, _ = _grating_bounds(grating, sig_inst, spec.dlam_A, lam_obs_A, sigma_factor)
near_mask = np.abs(spec.wave_A - lam_obs_A) < 5 * sig_seed
n_valid = int(np.sum(valid & near_mask))
n_total = int(np.sum(near_mask))
frac = n_valid / n_total if n_total > 0 else 0.0
if frac >= 0.5:
_kept.append(name)
else:
logger.info(
"Dropping %s (obs %.0f A): %d/%d valid pixels (%.0f%%) in ±5sigma",
name, lam_obs_A, n_valid, n_total, 100 * frac,
)
if len(_kept) < len(line_names):
logger.info(
"Kept %d / %d lines after gap filtering.", len(_kept), len(line_names),
)
line_names = _kept
if len(line_names) == 0:
logger.warning("All lines fall in detector gaps.")
return FitResult(
lines={},
params=np.array([]),
model_flux=np.zeros(spec.n_pix),
continuum=np.zeros(spec.n_pix),
residuals=spec.flux_ujy.copy(),
chi2=np.nan,
spectrum=spec,
line_names=[],
success=False,
)
# Pixel info.
edges = spec.wave_edges_A
dlam = spec.dlam_A
left = edges[:-1]
right = edges[1:]
# Pixel weights.
w_pix = pixel_weight(dlam)
# Detect unresolved intercombination doublets: if the doublet
# separation is < 2×FWHM (≈ 4.71 × sigma_inst), the components
# cannot be reliably decomposed. Fix the amplitude ratio to the
# low-density limit for fitting stability.
from .constraints import _INTERCOM_LOW_DENSITY_RATIOS
_BLEND_THRESHOLD = 4.71 # 2 × FWHM in sigma units
_blended: set[str] = set()
if tie_uv_doublets:
for (pri, sec) in _INTERCOM_LOW_DENSITY_RATIOS:
if pri in line_names and sec in line_names:
lam_pri = REST_LINES_A[pri] * (1.0 + z)
lam_sec = REST_LINES_A[sec] * (1.0 + z)
sep = abs(lam_pri - lam_sec)
idx_near_pri = np.argmin(np.abs(spec.wave_A - lam_pri))
sig_at_line = sig_inst[idx_near_pri]
if sep < _BLEND_THRESHOLD * sig_at_line:
_blended.add(sec)
print(
f" Blended: {pri}–{sec} "
f"(sep={sep:.1f} Å < {_BLEND_THRESHOLD:.1f}×σ_inst="
f"{_BLEND_THRESHOLD * sig_at_line:.1f} Å) "
f"→ fixing ratio"
)
# --- Separate Lyα from regular Gaussian lines ---
# Single asymmetric Gaussian (Owen/Bolan+2025 parameterisation):
# f(λ) = A_peak × exp(-(λ-μ)²/(2σ²)) × [1 + erf(α(λ-μ)/(√2 σ))]
# Parameters: [A_peak, mu, sigma, alpha]
_lya_params = None # (p0, lb, ub) for 4-element vector
if _lya_in_lines:
line_names = [n for n in line_names if n != "Lya"]
lya_obs_A = REST_LINES_A["Lya"] * (1.0 + z)
idx_lya = np.argmin(np.abs(spec.wave_A - lya_obs_A))
local_sig = sig_inst[idx_lya]
near_lya = np.abs(spec.wave_A - lya_obs_A) < 10 * local_sig
# Find the actual peak pixel location.
_near_idx = np.where(near_lya & valid)[0]
if len(_near_idx) > 0:
_peak_idx = _near_idx[np.argmax(flam[_near_idx])]
peak_lya = flam[_peak_idx]
peak_wave_A = spec.wave_A[_peak_idx]
else:
peak_lya = 1.0
peak_wave_A = lya_obs_A + 1.0
if not np.isfinite(peak_lya) or peak_lya <= 0:
peak_lya = 1.0
_C_KMS = 299792.458
cent_margin = max(centroid_vmax / _C_KMS * lya_obs_A, 5.0)
# σ controls the blue-side sharpness of the profile; the red
# tail width comes from α (the skewness). For stacked spectra
# the Lyα peak is resolution-limited, so σ should be tight.
sig_seed = max(local_sig, 0.5)
sig_lo = 0.2
sig_hi = 3.0 * sigma_factor # keep narrow — asymmetry handles the tail
# A_peak seed: with α>0 the actual peak exceeds A_peak, so
# seed below the observed peak to leave room.
A_peak_seed = 0.5 * peak_lya
# α seed: Lyα is generically red-asymmetric; start with
# moderate skewness.
alpha_seed = 5.0
lya_p0 = np.array([A_peak_seed, peak_wave_A, sig_seed, alpha_seed])
lya_lb = np.array([0.0, lya_obs_A - cent_margin, sig_lo, 0.0])
lya_ub = np.array([
150.0 * peak_lya, lya_obs_A + cent_margin, sig_hi, 30.0,
])
_lya_params = (lya_p0, lya_lb, lya_ub)
logger.info(
"Lyα asymmetric Gaussian: σ [%.1f, %.1f] Å, α [0, 30], "
"peak at %.1f Å (%.3e)",
sig_lo, sig_hi, peak_wave_A, peak_lya,
)
# Setup constraints.
constraints = ConstraintSet(
line_names,
tie_balmer_to_oiii=tie_balmer_to_oiii,
tie_uv_doublets=tie_uv_doublets,
tie_uv_centroids=tie_uv_centroids,
tie_uv_widths=tie_uv_widths,
blended_doublets=_blended if _blended else None,
niv_doublet_ratio=niv_doublet_ratio,
ciii_doublet_ratio=ciii_doublet_ratio,
)
# Initial parameters: [amplitudes, centroids, sigmas].
nL = len(line_names)
p0 = np.zeros(3 * nL)
lb = np.zeros(3 * nL)
ub = np.zeros(3 * nL)
for i, name in enumerate(line_names):
lam_obs_A = REST_LINES_A[name] * (1.0 + z)
sig_lo, sig_seed, sig_hi = _grating_bounds(grating, sig_inst, dlam, lam_obs_A, sigma_factor)
# Find peak/trough flux near the line for amplitude seeding.
near = np.abs(spec.wave_A - lam_obs_A)
idx_near = np.where(near < 5 * sig_seed)[0]
is_abs = name.startswith("abs_")
if is_abs:
# Absorption line: seed from the deepest trough.
if len(idx_near) > 0:
peak_flam = abs(np.nanmin(flam[idx_near]))
else:
peak_flam = 1.0
else:
# Emission line: seed from the peak.
if len(idx_near) > 0:
peak_flam = np.nanmax(flam[idx_near])
else:
peak_flam = np.nanmax(flam[valid]) if np.any(valid) else 1.0
if not np.isfinite(peak_flam) or peak_flam <= 0:
peak_flam = 1.0
# Amplitude seed = peak × sqrt(2π) × σ (area under Gaussian).
A_seed = max(peak_flam * _SQRT2PI * sig_seed, 1e-30)
# Amplitude bounds.
if is_abs:
# Absorption: amplitude must be ≤ 0 (negative Gaussian).
p0[i] = -A_seed
lb[i] = -150.0 * max(peak_flam, 1e-30) * _SQRT2PI * sig_hi
ub[i] = 0.0
else:
p0[i] = A_seed
lb[i] = 0.0
ub[i] = 150.0 * max(peak_flam, 1e-30) * _SQRT2PI * sig_hi
# Centroid bounds — combine a physical-velocity cap with a
# resolution-aware cap of ``centroid_max_sigma × σ_inst`` at the
# line so narrow lines can't wander further than ~1 instrumental
# σ from systemic at high R. Broad components keep the
# velocity-only cap so real outflow blueshifts aren't clipped.
# The margin is then floored at 2 pixel widths (coarse
# sampling) and capped at half the distance to the nearest
# neighbour so centroids cannot blend.
local_sig = sig_inst[np.argmin(np.abs(spec.wave_A - lam_obs_A))]
_C_KMS_CENT = 299792.458
cent_margin_v = centroid_vmax / _C_KMS_CENT * lam_obs_A
if "BROAD" in name:
cent_margin = cent_margin_v
else:
cent_margin_res = centroid_max_sigma * local_sig
cent_margin = min(cent_margin_v, cent_margin_res)
# Floor: at least 2 pixel widths for coarsely sampled spectra.
cent_margin = max(cent_margin, 2.0 * np.median(dlam))
# Cap at half the distance to the nearest neighbour to prevent
# lines from drifting into each other.
# Exclude the narrow counterpart of a broad line: it sits at the
# same rest wavelength, so naive nearest-neighbour distance would
# be 0 Å and collapse the centroid bounds. For Balmer broads
# this is masked by the centroid tie in ConstraintSet, but
# untied broads (e.g. OIII outflow components) need this filter.
own_rest = REST_LINES_A[name]
other_obs = [
REST_LINES_A[n] * (1.0 + z)
for j, n in enumerate(line_names)
if j != i and "BROAD" not in n and REST_LINES_A[n] != own_rest
]
if other_obs:
min_sep = min(abs(lam_obs_A - o) for o in other_obs)
cent_margin = min(cent_margin, 0.5 * min_sep)
# Apply per-line centroid overrides if provided.
if centroid_overrides and name in centroid_overrides:
cent_lo, cent_hi = centroid_overrides[name]
p0[nL + i] = 0.5 * (cent_lo + cent_hi)
lb[nL + i] = cent_lo
ub[nL + i] = cent_hi
else:
p0[nL + i] = lam_obs_A
lb[nL + i] = lam_obs_A - cent_margin
ub[nL + i] = lam_obs_A + cent_margin
# Sigma bounds — broad components use physically motivated velocity
# widths (km/s) converted to σ_λ at the observed wavelength.
_C_KMS = 299792.458 # speed of light (km/s)
if "_BROAD2" in name:
# Very broad (BLR): FWHM ~ 2000–5000 km/s.
from .broad import (
BROAD2_SIGMA_V_LO, BROAD2_SIGMA_V_SEED, BROAD2_SIGMA_V_HI,
)
sig_lo = BROAD2_SIGMA_V_LO / _C_KMS * lam_obs_A
sig_seed = BROAD2_SIGMA_V_SEED / _C_KMS * lam_obs_A
sig_hi = BROAD2_SIGMA_V_HI / _C_KMS * lam_obs_A
elif "_BROAD" in name:
# Intermediate broad (outflows / NLR): FWHM ~ 500–2000 km/s.
from .broad import (
BROAD1_SIGMA_V_LO, BROAD1_SIGMA_V_SEED, BROAD1_SIGMA_V_HI,
)
sig_lo = BROAD1_SIGMA_V_LO / _C_KMS * lam_obs_A
sig_seed = BROAD1_SIGMA_V_SEED / _C_KMS * lam_obs_A
sig_hi = BROAD1_SIGMA_V_HI / _C_KMS * lam_obs_A
# Hard cap: no component wider than 500 Å (prevents fitting noise).
_MAX_SIGMA_A = 500.0
sig_hi = min(sig_hi, _MAX_SIGMA_A)
sig_seed = min(sig_seed, 0.9 * sig_hi)
# Apply per-line sigma overrides if provided.
if sigma_overrides and name in sigma_overrides:
sig_lo, sig_hi = sigma_overrides[name]
sig_hi = min(sig_hi, _MAX_SIGMA_A)
sig_seed = 0.5 * (sig_lo + sig_hi)
p0[2 * nL + i] = sig_seed
lb[2 * nL + i] = sig_lo
ub[2 * nL + i] = sig_hi
# Seed UV doublet secondaries at the expected flux ratio of the
# primary. Without this, both members pick up the same peak flux
# as their seed, and the optimiser often dumps all flux into one
# member. The amplitude remains free — this just gives a
# physically motivated starting point.
from .constraints import _INTERCOM_LOW_DENSITY_RATIOS, NV_RATIO
idx = {name: i for i, name in enumerate(line_names)}
_ALL_DOUBLETS = [
*[(*pair, ratio) for pair, ratio in _INTERCOM_LOW_DENSITY_RATIOS.items()],
("NV_1", "NV_2", NV_RATIO),
]
for pri, sec, ratio in _ALL_DOUBLETS:
if pri in idx and sec in idx:
i_pri = idx[pri]
i_sec = idx[sec]
p0[i_sec] = np.clip(p0[i_pri] * ratio, lb[i_sec], ub[i_sec])
# Cap the secondary's upper bound: physically the secondary
# can never exceed ~2× the primary (even at extreme densities
# the ratio stays < 1.5). Without this cap, the optimizer
# can push the secondary to 150× peak and fit noise.
# Use 3× the primary seed as a generous ceiling.
ub_cap = max(3.0 * p0[i_pri], p0[i_sec] * 5.0)
ub[i_sec] = min(ub[i_sec], ub_cap)
# Override seeds from a previous fit (e.g. narrow-only results).
# For non-broad lines, also set an amplitude floor at 30% of the
# narrow-only value to prevent the optimizer from zeroing out narrow
# lines when a broad component can absorb their flux.
if _p0_hint is not None:
for hint_name, (hint_A, hint_mu, hint_sig) in _p0_hint.items():
if hint_name in idx:
j = idx[hint_name]
p0[j] = np.clip(hint_A, lb[j], ub[j])
p0[nL + j] = np.clip(hint_mu, lb[nL + j], ub[nL + j])
p0[2 * nL + j] = np.clip(hint_sig, lb[2 * nL + j], ub[2 * nL + j])
# Amplitude floor for narrow lines (not broad components).
if "BROAD" not in hint_name and hint_A > 0:
lb[j] = max(lb[j], 0.3 * hint_A)
# Mask constrained parameters: only optimise free ones.
free_mask = constraints.free_mask()
# Build combined parameter vector:
# [gaussian_free..., A_peak, mu, sigma, alpha]
_n_lya = 4 if _lya_params is not None else 0
# Pixel centres for truncation mask.
_centres = 0.5 * (left + right)
# Cap inflated errors near the Lyα peak. In stacked spectra the
# galaxy-to-galaxy Lyα scatter inflates the error at the peak to
# 2-3× the wing error, making the peak invisible to chi-squared.
# Capping at the local median gives the peak proper weight.
if _n_lya > 0:
_lya_vicinity = np.abs(spec.wave_A - lya_obs_A) < 15.0
_med_err = np.nanmedian(flam_err[_lya_vicinity & valid])
if np.isfinite(_med_err) and _med_err > 0:
_peak_region = np.abs(spec.wave_A - peak_wave_A) < 3.0
flam_err = flam_err.copy() # don't mutate the original
flam_err[_peak_region] = np.minimum(flam_err[_peak_region], _med_err)
logger.debug(
"Lyα error cap: median=%.3e, capped %d pixels",
_med_err, int(np.sum(_peak_region)),
)
def _lya_component(p_lya: np.ndarray) -> np.ndarray:
"""Evaluate the asymmetric Gaussian Lyα model.
p_lya = [A_peak, mu, sigma, alpha].
Blue-side suppression comes from the α parameter (erf term)
and the lya_break continuum, not a hard cutoff on the line.
"""
return asymmetric_gaussian(_centres, p_lya[0], p_lya[1], p_lya[2], p_lya[3])
def residual_fn(p_combined: np.ndarray) -> np.ndarray:
"""Weighted residuals for least_squares."""
p_free = p_combined[:len(p_combined) - _n_lya]
p_full = constraints.expand_free_to_full(p_free)
model = build_model(p_full, edges, nL)
if _n_lya > 0:
model = model + _lya_component(p_combined[-_n_lya:])
resid = (flam - model) / flam_err
resid *= w_pix
resid[~valid] = 0.0
return resid
p0_free = p0[free_mask]
lb_free = lb[free_mask]
ub_free = ub[free_mask]
# Append Lyα parameters if present.
if _lya_params is not None:
lya_p0, lya_lb, lya_ub = _lya_params
# --- Pre-fit: asymmetric Gaussian on ±10 Å with unweighted residuals ---
# Locks onto peak shape before the full error-weighted joint fit.
_lya_region = np.abs(spec.wave_A - peak_wave_A) < 10.0
_centres_lya = _centres[_lya_region]
_flam_lya = flam[_lya_region]
_valid_lya = valid[_lya_region]
def _lya_prefit_resid(p: np.ndarray) -> np.ndarray:
"""Unweighted residuals for Lyα pre-fit."""
model = asymmetric_gaussian(_centres_lya, p[0], p[1], p[2], p[3])
resid = _flam_lya - model
resid[~_valid_lya] = 0.0
return resid
_pre_p0 = np.clip(lya_p0, lya_lb + 1e-30, lya_ub - 1e-30)
_pre_result = least_squares(
_lya_prefit_resid, _pre_p0,
bounds=(lya_lb, lya_ub),
max_nfev=10000, xtol=1e-8, ftol=1e-8, x_scale="jac",
)
lya_p0[:] = _pre_result.x
# Tighten μ bounds around pre-fit centroid.
_prefit_mu = _pre_result.x[1]
lya_lb[1] = max(lya_lb[1], _prefit_mu - 1.0)
lya_ub[1] = min(lya_ub[1], _prefit_mu + 1.0)
logger.info(
"Lyα pre-fit: A=%.3e, μ=%.1f Å, σ=%.2f Å, α=%.1f",
*_pre_result.x,
)
p0_free = np.concatenate([p0_free, lya_p0])
lb_free = np.concatenate([lb_free, lya_lb])
ub_free = np.concatenate([ub_free, lya_ub])
# Clip seeds to bounds.
p0_free = np.clip(p0_free, lb_free + 1e-30, ub_free - 1e-30)
result = least_squares(
residual_fn,
p0_free,
bounds=(lb_free, ub_free),
max_nfev=80000,
xtol=1e-8,
ftol=1e-8,
x_scale="jac",
)
# Unpack results.
p_gauss_free = result.x[:len(result.x) - _n_lya]
p_best = constraints.expand_free_to_full(p_gauss_free)
model_flam = build_model(p_best, edges, nL)
_lya_best = None
if _n_lya > 0:
_lya_best = result.x[-_n_lya:]
model_flam = model_flam + _lya_component(_lya_best)
model_ujy = _flam_to_ujy(model_flam, spec.wave_um)
resid_ujy = flux_sub - model_ujy
# Chi-squared.
r = (flam - model_flam) / flam_err
r[~valid] = np.nan
n_data = np.sum(valid)
n_free = np.sum(free_mask)
dof = max(n_data - n_free, 1)
chi2_red = float(np.nansum(r**2)) / dof
# Bootstrap uncertainties.
flux_errs = _bootstrap_uncertainties(
flam, flam_err, valid, edges, nL, constraints, free_mask,
lb_free, ub_free, p0_free, w_pix, n_boot, label=_label,
n_jobs=n_jobs,
)
# Build per-line results.
line_results = {}
cont_flam = _ujy_to_flam(continuum, spec.wave_um)
# Pre-compute the continuum-subtracted residual in f_lam space for
# local continuum RMS estimates.
resid_flam = flam - build_model(p_best, edges, nL)
# Also need the f_lam error array for peak SNR from the uncertainty array.
# (flam_err is already available from earlier.)
for i, name in enumerate(line_names):
A = p_best[i]
mu = p_best[nL + i]
sig = p_best[2 * nL + i]
flux_line = A # Area-normalised Gaussian: integral = amplitude.
f_err = flux_errs[i] if flux_errs is not None else _analytic_flux_err(
A, sig, flam_err, spec.wave_A, mu, valid
)
# --- SNR 1: integrated, bootstrap error ---
snr_int_err = flux_line / f_err if f_err > 0 else 0.0
# --- Peak of the Gaussian model (f_lam units) ---
model_peak = A / (sig * _SQRT2PI) if sig > 0 else 0.0
# --- SNR 3: peak, uncertainty array ---
idx_peak = np.argmin(np.abs(spec.wave_A - mu))
err_at_peak = flam_err[idx_peak] if valid[idx_peak] else 0.0
snr_peak_err = model_peak / err_at_peak if err_at_peak > 0 else 0.0
# --- Local continuum RMS (±15σ window, excluding ±3σ) ---
outer_mask = np.abs(spec.wave_A - mu) < 15.0 * sig
inner_mask = np.abs(spec.wave_A - mu) < 3.0 * sig
cont_region = outer_mask & ~inner_mask & valid
if np.sum(cont_region) >= 3:
cont_rms = float(np.std(resid_flam[cont_region]))
else:
# Fallback: use the uncertainty array median near the line.
fallback = outer_mask & valid
cont_rms = float(np.median(flam_err[fallback])) if np.any(fallback) else 0.0
# --- SNR 4: peak, continuum RMS ---
snr_peak_cont = model_peak / cont_rms if cont_rms > 0 else 0.0
# --- SNR 2: integrated, continuum RMS ---
# Noise on integrated flux = cont_rms × √(N_pix in ±3σ window).
n_pix_line = int(np.sum(inner_mask & valid))
int_noise_cont = cont_rms * sqrt(n_pix_line) if n_pix_line > 0 else 0.0
snr_int_cont = flux_line / int_noise_cont if int_noise_cont > 0 else 0.0
# Equivalent width (rest-frame).
# Use continuum at the line centroid. If the polynomial continuum
# is unreliable (near-zero or negative), fall back to the local
# median flux in a ±5σ window around the line. If that is also
# non-positive, report EW as NaN.
lam_rest_A = REST_LINES_A[name]
idx_cont = np.argmin(np.abs(spec.wave_A - mu))
cont_at_line = cont_flam[idx_cont]
if cont_at_line <= 0:
near_mask = np.abs(spec.wave_A - mu) < 5.0 * sig
near_valid = near_mask & valid
if np.any(near_valid):
local_median_ujy = np.nanmedian(spec.flux_ujy[near_valid])
cont_at_line = _ujy_to_flam(
np.array([max(local_median_ujy, 0.0)]),
np.array([spec.wave_um[idx_cont]]),
)[0]
ew_rest = flux_line / cont_at_line / (1.0 + z) if cont_at_line > 0 else np.nan
line_results[name] = LineResult(
name=name,
rest_wave_A=lam_rest_A,
amplitude=A,
centroid_A=mu,
sigma_A=sig,
flux=flux_line,
flux_err=f_err,
ew_A=ew_rest,
snr_int_err=snr_int_err,
snr_int_cont=snr_int_cont,
snr_peak_err=snr_peak_err,
snr_peak_cont=snr_peak_cont,
)
# Add Lyα result if fitted.
if _lya_best is not None:
A_peak_lya = _lya_best[0]
mu_lya = _lya_best[1]
sig_lya = _lya_best[2]
alpha_lya = _lya_best[3]
# Integrated flux: numerical integration of the asymmetric Gaussian.
_lya_profile = _lya_component(_lya_best)
dlam = np.diff(edges)
flux_lya = float(np.sum(_lya_profile * dlam))
# Uncertainty from local noise.
f_err_lya = _analytic_flux_err(
flux_lya, sig_lya, flam_err, spec.wave_A, mu_lya, valid,
)
snr_lya = flux_lya / f_err_lya if f_err_lya > 0 else 0.0
# EW.
idx_cont_lya = np.argmin(np.abs(spec.wave_A - mu_lya))
cont_at_lya = cont_flam[idx_cont_lya] if cont_flam[idx_cont_lya] > 0 else np.nan
ew_lya = flux_lya / cont_at_lya / (1.0 + z) if np.isfinite(cont_at_lya) and cont_at_lya > 0 else np.nan
line_results["Lya"] = LineResult(
name="Lya",
rest_wave_A=REST_LINES_A["Lya"],
amplitude=flux_lya,
centroid_A=mu_lya,
sigma_A=sig_lya,
flux=flux_lya,
flux_err=f_err_lya,
ew_A=ew_lya,
snr_int_err=snr_lya,
snr_int_cont=snr_lya,
snr_peak_err=snr_lya,
snr_peak_cont=snr_lya,
)
line_names = ["Lya"] + line_names
logger.info(
"Lyα fit: A_peak=%.3e, μ=%.1f Å, σ=%.1f Å, α=%.1f, "
"flux=%.3e",
A_peak_lya, mu_lya, sig_lya, alpha_lya, flux_lya,
)
fit_result = FitResult(
lines=line_results,
params=p_best,
model_flux=model_ujy,
continuum=continuum,
residuals=resid_ujy,
chi2=chi2_red,
spectrum=spec,
line_names=line_names,
constraints=constraints,
success=bool(result.success),
lya_params=_lya_best,
)
if save_path is not None:
from .io import export_lines_txt
export_lines_txt(fit_result, save_path, z=z)
return fit_result
def _analytic_flux_err(
A: float,
sigma_A: float,
err_flam: np.ndarray,
wave_A: np.ndarray,
mu_A: float,
valid: np.ndarray,
) -> float:
"""Estimate flux uncertainty from local noise near the line.
Parameters
----------
A : float
Best-fit amplitude.
sigma_A : float
Best-fit line width (Å).
err_flam : np.ndarray
Flux error array (erg/s/cm²/Å).
wave_A : np.ndarray
Wavelength array (Å).
mu_A : float
Line centroid (Å).
valid : np.ndarray
Validity mask.
Returns
-------
float
Estimated flux uncertainty.
"""
near = np.abs(wave_A - mu_A) < 3.0 * sigma_A
sel = near & valid
if np.sum(sel) < 2:
return np.abs(A) * 0.5 # fallback
# RMS noise in the line region, scaled by effective width.
dlam = np.median(np.diff(wave_A[sel]))
n_eff = _SQRT2PI * sigma_A / dlam if dlam > 0 else 1.0
rms = np.sqrt(np.nanmean(err_flam[sel] ** 2))
return rms * dlam * sqrt(n_eff)
def _run_single_bootstrap(
noise_vec: np.ndarray,
flam: np.ndarray,
flam_err: np.ndarray,
valid: np.ndarray,
edges: np.ndarray,
nL: int,
constraints: ConstraintSet,
lb_free: np.ndarray,
ub_free: np.ndarray,
p0_free: np.ndarray,
w_pix: np.ndarray,
) -> np.ndarray:
"""Run a single bootstrap iteration.
Parameters
----------
noise_vec : np.ndarray
Pre-generated standard-normal noise vector (length ``n_pix``).
Returns
-------
np.ndarray
Flux (amplitude) for each line, or NaN if the fit fails.
"""
flam_b = flam + noise_vec * flam_err
def residual_b(p_free: np.ndarray) -> np.ndarray:
p_full = constraints.expand_free_to_full(p_free)
model = build_model(p_full, edges, nL)
resid = (flam_b - model) / flam_err
resid *= w_pix
resid[~valid] = 0.0
return resid
try:
res_b = least_squares(
residual_b, p0_free, bounds=(lb_free, ub_free),
max_nfev=20000, xtol=1e-6, ftol=1e-6,
x_scale="jac",
)
p_full_b = constraints.expand_free_to_full(res_b.x)
return p_full_b[:nL]
except Exception:
return np.full(nL, np.nan)
def _bootstrap_uncertainties(
flam: np.ndarray,
flam_err: np.ndarray,
valid: np.ndarray,
edges: np.ndarray,
nL: int,
constraints: ConstraintSet,
free_mask: np.ndarray,
lb_free: np.ndarray,
ub_free: np.ndarray,
p0_free: np.ndarray,
w_pix: np.ndarray,
n_boot: int,
label: str = "",
n_jobs: int = -1,
) -> np.ndarray | None:
"""Run bootstrap resampling for flux uncertainties.
Parameters
----------
n_jobs : int
Number of parallel jobs for bootstrap. ``-1`` uses all cores,
``1`` runs sequentially. Default ``-1``.
Returns
-------
np.ndarray or None
Standard deviation of flux for each line, or None if n_boot == 0.
"""
if n_boot <= 0:
return None
# Pre-generate all noise vectors for reproducibility.
rng = np.random.default_rng(42)
noise_all = rng.standard_normal((n_boot, len(flam)))
# Shared keyword arguments for _run_single_bootstrap.
shared_kwargs = dict(
flam=flam, flam_err=flam_err, valid=valid, edges=edges,
nL=nL, constraints=constraints, lb_free=lb_free, ub_free=ub_free,
p0_free=p0_free, w_pix=w_pix,
)
desc = f"Bootstrap ({label})" if label else "Bootstrap"
try:
from joblib import Parallel, delayed
from tqdm import tqdm
logger.info("%s: %d iterations, n_jobs=%d", desc, n_boot, n_jobs)
# return_as="generator" yields results as they complete,
# letting tqdm update incrementally.
gen = Parallel(n_jobs=n_jobs, return_as="generator", prefer="processes")(
delayed(_run_single_bootstrap)(noise_all[b], **shared_kwargs)
for b in range(n_boot)
)
results = list(tqdm(gen, total=n_boot, desc=desc, unit="iter"))
flux_samples = np.array(results)
except ImportError:
logger.warning("joblib not installed; falling back to sequential bootstrap.")
from tqdm import tqdm
flux_samples = np.zeros((n_boot, nL))
for b in tqdm(range(n_boot), desc=desc, unit="iter"):
flux_samples[b, :] = _run_single_bootstrap(
noise_all[b], **shared_kwargs
)
# Use sigma-clipped std to reject pathological bootstrap iterations
# where unconstrained parameters (e.g. UV secondaries) blow up.
from astropy.stats import sigma_clipped_stats
errs = np.zeros(flux_samples.shape[1])
for i in range(flux_samples.shape[1]):
col = flux_samples[:, i]
finite = col[np.isfinite(col)]
if len(finite) < 3:
errs[i] = np.nanstd(col)
else:
_, _, std = sigma_clipped_stats(finite, sigma=3.0)
# Fall back to nanstd if clipping removes all variance.
errs[i] = std if std > 0 else np.nanstd(finite)
return errs