Source code for jwspecfit.dla

"""DLA column density fitter — measure N_HI from Lya damping wings.

Fits a UV power-law continuum attenuated by a damped Lyman-alpha
absorber (DLA) to derive the neutral hydrogen column density N_HI
in the galaxy's ISM.

The model (following Pollock et al. 2026, Eq. 4) is:

    F(lambda) = F0 * (lambda/lambda_pivot)^beta_UV * exp(-tau_DLA)

where tau_DLA = C * a * N_HI * H(a, x) uses the Voigt-Hjerting
function H(a, x) evaluated via the Faddeeva function (exact).

The model is convolved with the instrumental line-spread function
(Gaussian with FWHM = lambda / R) before comparison to data, as
in Pollock et al. (2026).

Sampling is performed with ``dynesty`` nested sampling (matching
the paper's methodology).

References
----------
Pollock, C. L., et al. 2026, A&A, arXiv:2602.11783.
    Method and application to z > 9 galaxies.
Tepper-Garcia, T. 2006, MNRAS, 369, 2025.
    Voigt-Hjerting function framework.
"""

from __future__ import annotations

import logging
from dataclasses import dataclass, field
from typing import Any

import numpy as np
from scipy.special import wofz

from .lines import REST_LINES_A

# NumPy 2.0 renamed ``np.trapz`` to ``np.trapezoid`` and removed the old
# name.  Resolve once at import so the package works under both old and
# new NumPy without per-call overhead.
_trapezoid = np.trapezoid if hasattr(np, "trapezoid") else np.trapz

logger = logging.getLogger(__name__)

# --------------------------------------------------------------------------
# Physical constants (CGS)
# --------------------------------------------------------------------------
_LAMBDA_LYA_A = 1215.670       # Lya rest wavelength (Angstrom)
_F_ALPHA = 0.4162              # Lya oscillator strength
_GAMMA_ALPHA = 6.265e8         # Lya damping constant (s^-1)
_E_CGS = 4.8032e-10            # electron charge (esu)
_ME_CGS = 9.1094e-28           # electron mass (g)
_C_CGS = 2.9979e10             # speed of light (cm/s)
_B_DEFAULT_KMS = 30.0          # default Doppler parameter (km/s)
_LAMBDA_PIVOT_A = 1500.0       # pivot wavelength for power-law normalisation
_R_ALPHA = _GAMMA_ALPHA * _LAMBDA_LYA_A * 1e-8 / (4.0 * np.pi * _C_CGS)  # ≈ 2.02e-8
_IGM_Z_MIN_DEFAULT = 5.3       # IGM integration lower bound (Bosman+22 reionisation end)


# --------------------------------------------------------------------------
# IGM damping wing (Miralda-Escudé 1998 / Totani et al. 2006 closed form)
# --------------------------------------------------------------------------

def _IGM_I(x: np.ndarray) -> np.ndarray:
    """Helper integral for the Miralda-Escudé (1998) damping wing.

    I(x) = x^(9/2)/(1-x) + (9/7) x^(7/2) + (9/5) x^(5/2) + 3 x^(3/2)
           + 9 x^(1/2) - (9/2) ln[(1+x^(1/2))/(1-x^(1/2))]

    Defined for x ∈ (0, 1).
    """
    sqx = np.sqrt(x)
    return (
        x ** 4.5 / (1.0 - x)
        + (9.0 / 7.0) * x ** 3.5
        + (9.0 / 5.0) * x ** 2.5
        + 3.0 * x ** 1.5
        + 9.0 * sqx
        - 4.5 * np.log((1.0 + sqx) / (1.0 - sqx))
    )


[docs] def tau_IGM_DW( wave_A: np.ndarray, x_HI: float, z_source: float, z_min: float = _IGM_Z_MIN_DEFAULT, cosmo: Any = None, ) -> np.ndarray: """IGM damping-wing optical depth from a uniformly neutral IGM. Implements the closed-form solution of Miralda-Escudé (1998), as written in Totani et al. (2006) Eq. 3 and used in Pollock et al. (2026) Eq. 3. Computes the redward Lyα damping-wing absorption by neutral hydrogen of mean fraction ``x_HI`` distributed uniformly between redshift ``z_min`` and ``z_source``. Parameters ---------- wave_A : np.ndarray Observed-frame wavelength array in Angstrom. x_HI : float Volume-averaged neutral hydrogen fraction of the IGM. z_source : float Source redshift (upper bound of the IGM integration). z_min : float Lower bound of the IGM integration; default 5.3 (Bosman+22). cosmo : astropy.cosmology, optional Cosmology used to compute the line-centre Gunn-Peterson optical depth. Defaults to Planck18. Returns ------- np.ndarray Optical depth τ_IGM(λ). Zero for pixels redward of the source's Lyα (the formula returns zero by construction outside the integration range), and a finite damping-wing contribution between λ_α(1+z_source) and longer wavelengths. Notes ----- For pixels blueward of λ_α(1+z_source) the photon would have been absorbed inside the GP forest already; the IGM damping-wing formula is not the right description there. Callers should enforce a separate model = 0 cut blueward of source Lyα. """ if cosmo is None: from astropy.cosmology import Planck18 as cosmo wave_A = np.asarray(wave_A, dtype=float) out = np.zeros_like(wave_A) if x_HI <= 0.0: return out # Redshift at which a photon of observed λ would have hit Lyα. z_obs = wave_A / _LAMBDA_LYA_A - 1.0 # The damping wing applies redward of source Lyα (z_obs > z_source). valid = z_obs > z_source if not np.any(valid): return out z_o = z_obs[valid] x1 = (1.0 + z_min) / (1.0 + z_o) x2 = (1.0 + z_source) / (1.0 + z_o) eps = 1e-9 x1 = np.clip(x1, eps, 1.0 - eps) x2 = np.clip(x2, eps, 1.0 - eps) # Line-centre Gunn-Peterson optical depth at z_obs (Madau 1995 form). h = float(cosmo.h) Ob_h2 = float(cosmo.Ob0) * h * h Om_h2 = float(cosmo.Om0) * h * h tau_GP_z = ( 6.45e5 * (Ob_h2 / 0.022) * (Om_h2 / 0.13) ** -0.5 * ((1.0 + z_o) / 10.0) ** 1.5 ) out[valid] = ( tau_GP_z * x_HI * (_R_ALPHA / np.pi) * (_IGM_I(x2) - _IGM_I(x1)) ) return np.maximum(out, 0.0)
# -------------------------------------------------------------------------- # Voigt-Hjerting function (exact via Faddeeva) # --------------------------------------------------------------------------
[docs] def voigt_H(a: float, u: np.ndarray) -> np.ndarray: """Voigt-Hjerting function H(a, u) via the Faddeeva function. Computes the exact H(a, u) = Re[w(u + i*a)] where w(z) is the Faddeeva function. This is the function that Tepper-Garcia (2006) approximates analytically; here we use the full solution. Parameters ---------- a : float Voigt damping parameter. u : np.ndarray Dimensionless frequency offset from line centre. Returns ------- np.ndarray H(a, u) at each u. """ z = u + 1j * a return wofz(z).real
# -------------------------------------------------------------------------- # DLA optical depth # --------------------------------------------------------------------------
[docs] def tau_DLA( wave_A: np.ndarray, log_NHI: float, z: float = 0.0, b_kms: float = _B_DEFAULT_KMS, ) -> np.ndarray: """Compute DLA optical depth at each wavelength. Implements Eq. 1 of Pollock et al. (2026): tau_DLA = N_HI * sigma_0 * H(a, u) where sigma_0 = sqrt(pi) * e^2 * f_alpha / (m_e * c * Delta_nu_D), a = Gamma_alpha / (4 pi Delta_nu_D), and u is the dimensionless frequency offset from Lya. Parameters ---------- wave_A : np.ndarray Observed-frame wavelength in Angstrom. log_NHI : float log10(N_HI / cm^-2). z : float Source redshift (0 for rest-frame spectra). b_kms : float Doppler parameter in km/s (default 30). Returns ------- np.ndarray Optical depth tau(lambda). """ NHI = 10.0 ** log_NHI # The Voigt absorption profile is a property of the absorber's rest # frame. An observed-frame photon at wave_A interacts with the gas # at rest-frame wavelength wave_A / (1+z), so we evaluate everything # in the rest frame of the gas. Using observed-frame frequencies # would inflate the wing optical depth by (1+z)^2 because both # sigma_0 ~ 1/Delta_nu_D and a (= gamma/(4 pi Delta_nu_D)) scale # with (1+z) when Delta_nu_D is built from the observed-frame # Lyalpha frequency. b_cms = b_kms * 1e5 # km/s -> cm/s lambda_0_cm = _LAMBDA_LYA_A * 1e-8 # rest-frame Lya in cm nu_0 = _C_CGS / lambda_0_cm # rest-frame Lya freq delta_nu_D = (b_cms / _C_CGS) * nu_0 # rest-frame Doppler width # Voigt damping parameter (rest-frame). a = _GAMMA_ALPHA / (4.0 * np.pi * delta_nu_D) # Rest-frame frequency offset of each pixel. wave_rest_cm = np.asarray(wave_A) / (1.0 + z) * 1e-8 nu = _C_CGS / wave_rest_cm u = (nu - nu_0) / delta_nu_D # Line-centre cross section. sigma_0 = ( np.sqrt(np.pi) * _E_CGS ** 2 * _F_ALPHA / (_ME_CGS * _C_CGS * delta_nu_D) ) H = voigt_H(a, u) tau = NHI * sigma_0 * H return np.maximum(tau, 0.0)
# -------------------------------------------------------------------------- # Spectral resolution convolution # -------------------------------------------------------------------------- def _convolve_resolution( wave_A: np.ndarray, flux: np.ndarray, R, ) -> np.ndarray: """Convolve a spectrum with a Gaussian LSF at resolving power R. Parameters ---------- wave_A : np.ndarray Wavelength array (must be uniformly or near-uniformly spaced). flux : np.ndarray Flux array to convolve. R : float, callable, or array_like Spectral resolving power (λ / FWHM). If a callable, it is invoked as ``R(lam_um)`` (wavelength in microns) and must return an array the same shape as the input. If an array, it must already be evaluated on ``wave_A``. If a scalar, a single Gaussian kernel at the median wavelength is used. Returns ------- np.ndarray Convolved flux array (same length as input). """ wave_A = np.asarray(wave_A, dtype=float) flux = np.asarray(flux, dtype=float) # Resolve R into either a scalar or a per-pixel array. if callable(R): R_arr = np.asarray(R(wave_A * 1e-4), dtype=float) elif np.isscalar(R): R_arr = None else: R_arr = np.asarray(R, dtype=float) if R_arr.shape != wave_A.shape: raise ValueError( "R array must have same shape as wave_A " f"({R_arr.shape} != {wave_A.shape})" ) if R_arr is None: # Single-kernel fast path (constant R). dlam = np.median(np.diff(wave_A)) lam_mid = np.median(wave_A) sigma_A = (lam_mid / float(R)) / 2.3548 sigma_pix = sigma_A / dlam hw = int(4.0 * sigma_pix) + 1 x = np.arange(-hw, hw + 1) kernel = np.exp(-0.5 * (x / sigma_pix) ** 2) kernel /= kernel.sum() padded = np.pad(flux, hw, mode="edge") convolved = np.convolve(padded, kernel, mode="same") return convolved[hw:-hw] # Wavelength-dependent kernel: build a per-pixel Gaussian and apply. sigma_A = (wave_A / R_arr) / 2.3548 dlam = np.median(np.diff(wave_A)) sigma_max = float(np.max(sigma_A)) hw = int(4.0 * sigma_max / dlam) + 1 out = np.empty_like(flux) pad = np.pad(flux, hw, mode="edge") # Offsets in wavelength relative to each pixel (uniform grid assumed). offsets = np.arange(-hw, hw + 1) * dlam for i in range(len(wave_A)): kernel = np.exp(-0.5 * (offsets / sigma_A[i]) ** 2) kernel /= kernel.sum() out[i] = np.dot(pad[i:i + 2 * hw + 1], kernel) return out # -------------------------------------------------------------------------- # Emission line masking # -------------------------------------------------------------------------- def _mask_emission_lines( wave_A: np.ndarray, z: float = 0.0, width_A: float = 10.0, exclude: set[str] | None = None, ) -> np.ndarray: """Create a boolean mask that is True for pixels to *keep*. Parameters ---------- wave_A : np.ndarray Observed-frame wavelength array. z : float Source redshift. width_A : float Half-width of each mask region in rest-frame Angstrom. exclude : set of str, optional Line names to skip (not mask). Returns ------- np.ndarray Boolean array (True = keep, False = masked). """ keep = np.ones(len(wave_A), dtype=bool) skip = exclude or set() for name, lam_rest in REST_LINES_A.items(): if name in skip: continue lam_obs = lam_rest * (1.0 + z) width_obs = width_A * (1.0 + z) keep &= (wave_A < lam_obs - width_obs) | (wave_A > lam_obs + width_obs) return keep # -------------------------------------------------------------------------- # Dust correction helpers # -------------------------------------------------------------------------- def _apply_dust_correction( wave_A: np.ndarray, flux: np.ndarray, flux_err: np.ndarray, Av: float, dust_law: str, Rv: float, ) -> tuple[np.ndarray, np.ndarray]: """Apply dust correction to flux and errors. Parameters ---------- wave_A : np.ndarray Wavelength array in Angstrom. flux : np.ndarray Observed flux. flux_err : np.ndarray Observed flux errors. Av : float V-band extinction. dust_law : str ``"cardelli"`` or ``"salim"``. Rv : float Total-to-selective extinction ratio. Returns ------- tuple of np.ndarray (corrected_flux, corrected_err). """ if Av == 0.0: return flux.copy(), flux_err.copy() if dust_law == "cardelli": from jwspecabund.dust import cardelli_extinction A_lambda = cardelli_extinction(wave_A, Av, Rv=Rv) elif dust_law == "salim": from jwspecabund.dust import salim_attenuation A_lambda = salim_attenuation(wave_A, Av, Rv=Rv) else: raise ValueError(f"Unknown dust_law: {dust_law!r}. Use 'cardelli' or 'salim'.") correction = 10.0 ** (0.4 * A_lambda) return flux * correction, flux_err * correction # -------------------------------------------------------------------------- # Result dataclass # --------------------------------------------------------------------------
[docs] @dataclass class DLAResult: """Result of a DLA column density fit. Attributes ---------- log_NHI : float Median posterior log10(N_HI / cm^-2). log_NHI_err : tuple of float (lower, upper) 68% CI half-widths. beta_UV : float Median posterior UV spectral slope. beta_UV_err : tuple of float (lower, upper) 68% CI half-widths. log_F0 : float Median posterior log10(F0) continuum normalisation. log_F0_err : tuple of float (lower, upper) 68% CI half-widths. Sigma_HI : float H I gas surface density in M_sun pc^-2 (from log_NHI). samples : dict Full posterior samples: {"log_NHI": arr, "beta_UV": arr, "log_F0": arr}. wave_fit : np.ndarray Wavelengths used in fit (after masking), observed frame. flux_fit : np.ndarray Dust-corrected fluxes used in fit. flux_err_fit : np.ndarray Dust-corrected flux errors used in fit. model_best : np.ndarray Best-fit model evaluated on wave_fit. z : float Redshift used in fit. Av : float Dust correction applied. log_evidence : float Log-evidence (logZ) from dynesty. """ log_NHI: float log_NHI_err: tuple[float, float] beta_UV: float beta_UV_err: tuple[float, float] log_F0: float log_F0_err: tuple[float, float] Sigma_HI: float samples: dict[str, np.ndarray] = field(repr=False) wave_fit: np.ndarray = field(repr=False) flux_fit: np.ndarray = field(repr=False) flux_err_fit: np.ndarray = field(repr=False) model_best: np.ndarray = field(repr=False) z: float = 0.0 Av: float = 0.0 log_evidence: float = 0.0 _lya_subtracted: bool = False lya_params: np.ndarray | None = None # --- IGM damping-wing extension (Pollock+26) --- x_HI: float = 0.0 x_HI_err: tuple[float, float] = (0.0, 0.0) fit_x_HI: bool = False igm_z_min: float = _IGM_Z_MIN_DEFAULT # --- Upper-limit reporting for unconstrained sources --- log_NHI_upper95: float = 0.0 is_upper_limit: bool = False # --- Continuum pivot wavelength used in F0 * (lam/lam_pivot)^beta_UV --- # None means the legacy default 1500 * (1+z) Å is in effect. lam_pivot_A: float | None = None
[docs] def summary(self) -> str: """Return a formatted summary string.""" lines = [ "DLA Fit Result (Pollock+26-style joint sampling)", "=" * 50, ] if self.is_upper_limit: lines.append( f"log(N_HI/cm^-2) < {self.log_NHI_upper95:.2f} (95% upper limit)" ) else: lines.append( f"log(N_HI/cm^-2) = {self.log_NHI:.2f} " f"(+{self.log_NHI_err[1]:.2f}, -{self.log_NHI_err[0]:.2f})" ) lines.extend([ f"Sigma_HI = {self.Sigma_HI:.1f} Msun/pc^2", f"beta_UV = {self.beta_UV:.2f} " f"(+{self.beta_UV_err[1]:.2f}, -{self.beta_UV_err[0]:.2f})", f"log(F0) = {self.log_F0:.2f} " f"(+{self.log_F0_err[1]:.2f}, -{self.log_F0_err[0]:.2f})", ]) if self.fit_x_HI: lines.append( f"x_HI (IGM) = {self.x_HI:.2f} " f"(+{self.x_HI_err[1]:.2f}, -{self.x_HI_err[0]:.2f})" ) if self.lya_params is not None: lp = self.lya_params lines.append( f"Lya A_peak = {lp[0]:.2e} " f"sigma={lp[2]:.2f} A alpha={lp[3]:.2f}" ) lines.append( f"Lya fitted = {'log_A_lya' in self.samples}" ) lines.extend([ f"z = {self.z}", f"Av = {self.Av}", f"log(Z) = {self.log_evidence:.1f}", f"N pixels = {len(self.wave_fit)}", f"N samples = {len(self.samples['log_NHI'])}", ]) return "\n".join(lines)
[docs] def plot( self, ax: Any = None, show_residuals: bool = True, flux_unit: str = "fnu", **kwargs: Any, ) -> Any: """Plot the DLA fit over the data. Parameters ---------- ax : matplotlib Axes, optional Axes to plot on. If None, creates a new figure. If show_residuals is True, this is ignored and a new figure with two panels is created. show_residuals : bool If True, show a residual panel below the main plot. flux_unit : str ``"fnu"`` for F_nu (default, same units as input) or ``"flam"`` for F_lambda (converted via F_lam = F_nu * c / lam^2). **kwargs Passed to the data plot (e.g. ``color``, ``alpha``). Returns ------- matplotlib Figure The figure object. """ import matplotlib.pyplot as plt if show_residuals: # constrained layout keeps tidy margins while respecting the # small hspace; tight_layout() would conflict with the manual # gridspec spacing and warn. fig, (ax_main, ax_res) = plt.subplots( 2, 1, figsize=(8, 5), sharex=True, gridspec_kw={"height_ratios": [3, 1], "hspace": 0.05}, layout="constrained", ) _own_fig = True else: if ax is None: fig, ax_main = plt.subplots(figsize=(8, 4)) _own_fig = True else: ax_main = ax fig = ax.get_figure() _own_fig = False ax_res = None # Convert to rest frame for display. wave_rest = self.wave_fit / (1.0 + self.z) # Unit conversion factor. if flux_unit == "flam": conv = 1.0 / (self.wave_fit ** 2) conv = conv / np.median(conv) ylabel = r"$F_\lambda$ (relative)" else: conv = np.ones_like(self.wave_fit) ylabel = r"$F_\nu$ (flux density)" flux_plot = self.flux_fit * conv err_plot = self.flux_err_fit * conv model_plot = self.model_best * conv # Data. _data_label = (r"Data (Ly$\alpha$-subtracted)" if self._lya_subtracted else "Data") data_kw = {"color": "k", "lw": 0.8, "alpha": 0.6, "label": _data_label} data_kw.update(kwargs) ax_main.step(wave_rest, flux_plot, where="mid", **data_kw) # Error band. ax_main.fill_between( wave_rest, flux_plot - err_plot, flux_plot + err_plot, color="grey", alpha=0.2, step="mid", ) # --- Component decomposition --- lam_pivot = ( self.lam_pivot_A if self.lam_pivot_A is not None else _LAMBDA_PIVOT_A * (1.0 + self.z) ) continuum = 10.0 ** self.log_F0 * (self.wave_fit / lam_pivot) ** self.beta_UV lya_obs = _LAMBDA_LYA_A * (1.0 + self.z) _R = getattr(self, '_R', None) # 1) Intrinsic continuum (no DLA, no IGM). ax_main.plot( wave_rest, continuum * conv, color="blue", lw=1, ls="--", alpha=0.5, label=rf"Intrinsic cont. ($\beta_{{UV}}={self.beta_UV:.2f}$)", ) # 2) Continuum × DLA × IGM (no Lyα). tau = tau_DLA(self.wave_fit, self.log_NHI, z=self.z) cont_dla = continuum * np.exp(-tau) cont_dla = np.where(self.wave_fit < lya_obs, 0.0, cont_dla) if _R is not None: cont_dla = _convolve_resolution(self.wave_fit, cont_dla, _R) ax_main.plot( wave_rest, cont_dla * conv, color="purple", lw=1.5, alpha=0.8, label=rf"Cont. $\times$ DLA ($\log N_{{HI}}={self.log_NHI:.1f}$)", ) # 3) Lyα component (convolved, with IGM cutoff). if self.lya_params is not None: from .models import asymmetric_gaussian from .io import _flam_to_ujy lp = self.lya_params lya_flam = asymmetric_gaussian( self.wave_fit, lp[0], lp[1], lp[2], lp[3], ) lya_ujy = _flam_to_ujy(lya_flam, self.wave_fit * 1e-4) lya_ujy = np.where(self.wave_fit < lya_obs, 0.0, lya_ujy) if _R is not None: lya_ujy = _convolve_resolution(self.wave_fit, lya_ujy, _R) ax_main.plot( wave_rest, lya_ujy * conv, color="green", lw=1.5, alpha=0.8, label=r"Ly$\alpha$ (escaped)", ) # Lya marker. lya_rest = _LAMBDA_LYA_A ax_main.axvline(lya_rest, color="orange", ls=":", alpha=0.5, lw=1) ax_main.text( lya_rest + 5, ax_main.get_ylim()[1] * 0.9, r"Ly$\alpha$", color="orange", fontsize=9, ) # Set y-axis from data, not the error envelope. finite = np.isfinite(flux_plot) if finite.any(): ylo = float(np.nanpercentile(flux_plot[finite], 2)) yhi = float(np.nanpercentile(flux_plot[finite], 98)) margin = 0.15 * max(yhi - ylo, abs(yhi) * 0.1) ax_main.set_ylim(ylo - margin, yhi + margin) ax_main.set_ylabel(ylabel) ax_main.legend(fontsize=9, frameon=False) ax_main.set_title( rf"$\log(N_{{\rm HI}}/\mathrm{{cm}}^{{-2}}) = " rf"{self.log_NHI:.2f}^{{+{self.log_NHI_err[1]:.2f}}}" rf"_{{-{self.log_NHI_err[0]:.2f}}}$", fontsize=11, ) if ax_res is not None: residuals = self.flux_fit - self.model_best normalised = residuals / self.flux_err_fit ax_res.step(wave_rest, normalised, where="mid", color="k", lw=0.5) ax_res.axhline(0, color="red", ls="-", lw=0.5) ax_res.fill_between( wave_rest, -1, 1, color="grey", alpha=0.15, step="mid", ) ax_res.set_ylabel(r"Residual ($\sigma$)") ax_res.set_xlabel(r"Rest wavelength ($\mathrm{\AA}$)") ax_res.set_ylim(-5, 5) else: ax_main.set_xlabel(r"Rest wavelength ($\mathrm{\AA}$)") # Single-panel figures we created ourselves benefit from # tight_layout; the 2-panel case already uses constrained layout, # and a user-supplied ax's figure must not be relaid out here. if ax_res is None and _own_fig: fig.tight_layout() return fig
[docs] def corner(self, **kwargs: Any) -> Any: """Plot a corner plot of the posterior samples. Requires the ``corner`` package. Parameters ---------- **kwargs Passed to ``corner.corner()``. Returns ------- matplotlib Figure The corner plot figure. """ import corner as corner_pkg cols = [self.samples["log_NHI"]] labels = [r"$\log(N_{\rm HI}/\mathrm{cm}^{-2})$"] truths = [self.log_NHI] # Include x_HI if it was sampled (i.e. not fixed at 0). if self.fit_x_HI and "x_HI" in self.samples and np.std(self.samples["x_HI"]) > 1e-10: cols.append(self.samples["x_HI"]) labels.append(r"$x_{\rm HI}$") truths.append(self.x_HI) # Only include beta/F0 if they have a real posterior (not fixed). beta_samples = self.samples["beta_UV"] if np.std(beta_samples) > 1e-10: cols.append(beta_samples) cols.append(self.samples["log_F0"]) labels.extend([r"$\beta_{UV}$", r"$\log(F_0)$"]) truths.extend([self.beta_UV, self.log_F0]) # Include Lyα params if fitted jointly. if "log_A_lya" in self.samples: cols.append(self.samples["log_A_lya"]) cols.append(self.samples["sigma_lya"]) cols.append(self.samples["alpha_lya"]) labels.extend([ r"$\log(A_{\rm Ly\alpha})$", r"$\sigma_{\rm Ly\alpha}$ (\AA)", r"$\alpha_{\rm Ly\alpha}$", ]) truths.extend([ float(np.median(self.samples["log_A_lya"])), float(np.median(self.samples["sigma_lya"])), float(np.median(self.samples["alpha_lya"])), ]) if len(cols) == 1: # 1D posterior — corner can't handle it, plot histogram. import matplotlib.pyplot as plt fig, ax = plt.subplots(figsize=(5, 4)) ax.hist(cols[0], bins=50, density=True, color="steelblue", alpha=0.7) ax.axvline(truths[0], color="red", lw=1.5, label="Median") lo = np.percentile(cols[0], 16) hi = np.percentile(cols[0], 84) ax.axvline(lo, color="grey", ls="--", lw=1) ax.axvline(hi, color="grey", ls="--", lw=1) ax.set_xlabel(labels[0], fontsize=12) ax.set_ylabel("Posterior density") ax.set_title( f"${np.median(cols[0]):.2f}^{{+{hi - np.median(cols[0]):.2f}}}" f"_{{-{np.median(cols[0]) - lo:.2f}}}$", fontsize=12, ) ax.legend(fontsize=9) fig.tight_layout() return fig data = np.column_stack(cols) defaults = dict( labels=labels, truths=truths, show_titles=True, title_kwargs={"fontsize": 11}, quantiles=[0.16, 0.5, 0.84], ) defaults.update(kwargs) fig = corner_pkg.corner(data, **defaults) return fig
# -------------------------------------------------------------------------- # Model evaluation (numpy — used by dynesty likelihood) # -------------------------------------------------------------------------- def _evaluate_model( wave_A: np.ndarray, log_F0: float, beta_UV: float, log_NHI: float, z: float, R=None, b_kms: float = _B_DEFAULT_KMS, x_HI: float = 0.0, igm_z_min: float = _IGM_Z_MIN_DEFAULT, lya_ujy_func: object | None = None, lya_free_params: tuple[float, float, float] | None = None, emission_lines: list[tuple[float, float, float]] | None = None, lam_pivot_A: float | None = None, ) -> np.ndarray: """Evaluate the joint DLA + IGM damping-wing model. Implements Pollock et al. (2026) Eq. 4 — the intrinsic continuum (single power-law) plus any Lyα emission is attenuated by the local DLA and the IGM damping wing simultaneously: F(λ) = [F0 (λ/λ_pivot)^β + Lyα(λ)] · exp(−τ_DLA) · exp(−τ_IGM) Parameters ---------- wave_A : np.ndarray Observed-frame wavelengths in Angstrom. log_F0 : float log10 of the continuum normalisation at the (observer-frame) pivot wavelength λ_pivot · (1 + z), in the same flux units as the input data (typically µJy). beta_UV : float UV spectral slope of the intrinsic continuum. log_NHI : float log10(N_HI / cm^-2) of the local DLA. z : float Source redshift. R : float or None Spectral resolving power. If given, the model is convolved with a Gaussian LSF of FWHM = λ / R. b_kms : float Doppler parameter in km/s used in the Voigt profile. The damping wing is insensitive to ``b`` for log N_HI > 20.3. x_HI : float Volume-averaged neutral fraction of the IGM (Miralda-Escudé damping-wing term). Default 0 (no IGM contribution). igm_z_min : float Lower-redshift bound of the IGM integration (default 5.3, Bosman+22 reionisation end). lya_ujy_func : callable or None Function returning a fixed Lyα emission profile in µJy. Added to the intrinsic continuum before attenuation. lya_free_params : tuple of (log_A, sigma, alpha), optional Free Lyα parameters: log10(A_peak) in F_λ (erg/s/cm²/Å), sigma in Å, and skewness α. Centroid fixed at Lyα rest × (1+z). The profile is added to the continuum and attenuated by both DLA and IGM (Pollock+26 convention). Returns ------- np.ndarray Model flux at each wavelength. """ from .io import _flam_to_ujy from .models import asymmetric_gaussian F0 = 10.0 ** log_F0 lam_pivot = lam_pivot_A if lam_pivot_A is not None else _LAMBDA_PIVOT_A * (1.0 + z) continuum = F0 * (wave_A / lam_pivot) ** beta_UV # Build the intrinsic spectrum: continuum + (optional) Lyα emission + # (optional) fixed Gaussian emission lines (Pollock+26 Eq. 4 convention: # the lines are added *before* DLA+IGM attenuation). intrinsic = continuum.copy() if lya_ujy_func is not None: intrinsic = intrinsic + lya_ujy_func(wave_A) if lya_free_params is not None: log_A, sigma, alpha = lya_free_params A_peak = 10.0 ** log_A mu = _LAMBDA_LYA_A * (1.0 + z) lya_flam = asymmetric_gaussian(wave_A, A_peak, mu, sigma, alpha) intrinsic = intrinsic + _flam_to_ujy(lya_flam, wave_A * 1e-4) if emission_lines is not None: for A_peak, mu_obs, sigma_obs in emission_lines: intrinsic = intrinsic + A_peak * np.exp( -0.5 * ((wave_A - mu_obs) / sigma_obs) ** 2 ) # DLA + IGM attenuation (Pollock+26 Eq. 4). tau_total = tau_DLA(wave_A, log_NHI, z=z, b_kms=b_kms) if x_HI > 0.0: tau_total = tau_total + tau_IGM_DW( wave_A, x_HI, z_source=z, z_min=igm_z_min, ) model = intrinsic * np.exp(-tau_total) # Hard zero blueward of source Lyα — the GP forest is essentially # saturated there. The Miralda-Escudé damping wing only models the # red side of source Lyα; the formula is not applicable to forest # photons that would have hit Lyα at z < z_source. lya_obs = _LAMBDA_LYA_A * (1.0 + z) model = np.where(wave_A < lya_obs, 0.0, model) if R is not None: model = _convolve_resolution(wave_A, model, R) return model # -------------------------------------------------------------------------- # Main fitting function # --------------------------------------------------------------------------
[docs] def fit_NHI( wave_A: np.ndarray, flux: np.ndarray, flux_err: np.ndarray, z: float = 0.0, *, fit_x_HI: bool = False, igm_z_min: float = _IGM_Z_MIN_DEFAULT, b_kms: float = _B_DEFAULT_KMS, continuum: np.ndarray | None = None, Av: float = 0.0, dust_law: str = "cardelli", Rv: float = 3.1, R=None, mask_lines: bool = True, mask_width_A: float = 20.0, mask_lya_emission_width_A: float = 8.0, fit_range_A: tuple[float, float] = (1216.0, 3000.0), mask_regions_A: list[tuple[float, float]] | None = None, fit_lya: bool = False, lya_params: np.ndarray | list | None = None, emission_lines: list[tuple[float, float, float]] | None = None, lam_pivot_A: float | None = None, prior_log_NHI: tuple[float, float] = (18.0, 24.0), prior_x_HI: tuple[float, float] = (0.0, 1.0), prior_beta_UV: tuple[float, float] = (-4.0, 0.0), prior_log_F0: tuple[float, float] | None = None, prior_log_A_lya: tuple[float, float] = (-22.0, -15.0), prior_sigma_lya: tuple[float, float] = (1.0, 30.0), prior_alpha_lya: tuple[float, float] = (0.0, 5.0), n_live: int = 500, dlogz: float = 0.5, seed: int = 42, ) -> DLAResult: """Joint Bayesian fit of the DLA + IGM damping wing (Pollock+26-style). Implements the Pollock et al. (2026) joint nested-sampling fit over (log_F0, β_UV, log_NHI[, x_HI[, Lyα]]) with an exact Voigt-Hjerting profile and the Miralda-Escudé (1998) IGM damping wing in the Totani+06 closed form. Parameters ---------- wave_A, flux, flux_err : np.ndarray Wavelength (observed-frame Å) and flux density arrays. z : float Source redshift (use 0 for rest-frame stacks). fit_x_HI : bool If True, sample over IGM neutral fraction x_HI in addition to the local DLA. Recommended at z > 6. igm_z_min : float Lower-redshift bound of the IGM integration (default 5.3, Bosman+22 reionisation end). b_kms : float Doppler parameter (km/s) for the Voigt profile. The damping wing is insensitive to b for log N_HI > 20.3. continuum : np.ndarray, optional Pre-smoothed continuum estimate. When provided the model is fit against this smoothed array instead of the raw flux — emission lines are then implicitly removed. Av, dust_law, Rv : float, str, float Optional foreground dust correction applied before fitting. R : float or None Spectral resolving power for LSF convolution. mask_lines : bool If True (default), mask known UV emission lines. Lyα, the NV λλ1238.8, 1242.8 doublet, and any `abs_*` low-ionisation absorption features are excluded from this mask: Lyα and NV sit inside the damping wing (1216–1263 Å rest) where the N_HI constraint lives, and any DLA strong enough to fit absorbs NV at τ ≫ 1 (no emission left to contaminate); `abs_*` features are foreground-gas signatures aligned with the DLA. mask_width_A : float Half-width of the general emission-line mask (rest-frame Å). Default 20 Å is appropriate for NIRSpec PRISM resolution. mask_lya_emission_width_A : float Half-width of the *narrow* Lyα-emission core mask in rest-frame Å (default 8 Å). Set this small enough not to eat the damping wing. Ignored when ``fit_lya=True``. fit_range_A : tuple Rest-frame fit window (default 1216–3000 Å, matching Pollock+26 — Lyα to just below the Balmer break). mask_regions_A : list of (float, float), optional Extra rest-frame wavelength ranges to ignore. fit_lya : bool Jointly fit a skewed Lyα emission profile. lya_params : array of length 4, optional Pre-determined Lyα profile (A_peak, μ, σ, α) added to the intrinsic spectrum before attenuation. prior_log_NHI, prior_x_HI, prior_beta_UV : tuple Uniform-prior bounds for the named parameter (Pollock+26 Sect. 3.2 defaults: log_NHI ∈ [18,24], x_HI ∈ [0,1], β_UV ∈ [-4,0]). prior_log_F0 : tuple, optional Uniform prior bounds on log10(F0) in the same flux units as the input data. Defaults to ±3 dex around the median continuum flux derived from the input spectrum. prior_log_A_lya, prior_sigma_lya, prior_alpha_lya : tuple Priors used when ``fit_lya=True``. n_live : int Number of dynesty live points (default 500). dlogz : float dynesty convergence threshold on log Z (default 0.5). seed : int RNG seed. Returns ------- DLAResult Posterior samples for every free parameter, median ± 16/84 percentiles, log-evidence, and a 95th-percentile upper limit on log N_HI when the posterior is unconstrained. Notes ----- - Pollock+26 Eq. 4: F = (F0·(λ/λ_pivot)^β + Lyα) · exp(−τ_DLA) · exp(−τ_IGM). - Lyα emission is added *before* attenuation (Pollock+26 convention). - Reports an upper limit when the posterior touches the prior lower bound (Pollock+26 do this for 21/48 sources). """ import dynesty from dynesty.utils import resample_equal wave_A = np.asarray(wave_A, dtype=np.float64) flux = np.asarray(flux, dtype=np.float64) flux_err = np.asarray(flux_err, dtype=np.float64) # --- Continuum mode: fit to smooth continuum estimate --- _use_continuum = continuum is not None if _use_continuum: continuum = np.asarray(continuum, dtype=np.float64) fit_lya = False lya_params = None # --- Dust correction --- if _use_continuum: flux_corr, err_corr = _apply_dust_correction( wave_A, continuum, flux_err, Av, dust_law, Rv, ) else: flux_corr, err_corr = _apply_dust_correction( wave_A, flux, flux_err, Av, dust_law, Rv, ) # --- Build fixed Lya model (only when fit_lya=False) --- _lya_ujy_func = None if lya_params is not None and not fit_lya: lp = np.asarray(lya_params) if len(lp) != 4: raise ValueError( f"lya_params must have 4 elements [A_peak, mu, sigma, alpha], " f"got {len(lp)}." ) from .models import asymmetric_gaussian from .io import _flam_to_ujy def _lya_ujy_func(w): lya_flam = asymmetric_gaussian(w, lp[0], lp[1], lp[2], lp[3]) return _flam_to_ujy(lya_flam, w * 1e-4) logger.info( "Including fixed Lya: A=%.2e, mu=%.1f, sigma=%.1f, alpha=%.1f", lp[0], lp[1], lp[2], lp[3], ) # --- Rest-frame wavelength range selection --- wave_rest = wave_A / (1.0 + z) in_range = (wave_rest >= fit_range_A[0]) & (wave_rest <= fit_range_A[1]) lya_obs = _LAMBDA_LYA_A * (1.0 + z) # --- Emission line masking --- # Lyα, NV, and any low-ionisation absorption (`abs_*`) lines are # excluded from the general mask because: # (a) the broad Lyα damping wing is the actual DLA signal — # masking ±20 Å around 1216 Å hides exactly the pixels that # constrain N_HI; # (b) NV λλ1238.8, 1242.8 sits at Δλ_rest ≈ 23–27 Å from Lyα, # inside the wing. Any DLA strong enough to leave a # measurable absorption signature (log N_HI ≳ 20) absorbs the # NV doublet at τ ≫ 1 — there is no NV emission left to # contaminate, so masking it would only destroy the # wing-discriminating pixels (cf. fitter table: log N_HI = 21 # gives a 40% wing depth at +30 Å rest, so masking ±20 Å # around NV erases the 1219–1263 Å window that distinguishes # log N_HI = 20 from 22); # (c) `abs_*` features are foreground neutral-gas signatures # aligned with the DLA, not contamination. # Lyα emission, when present, gets a separate narrow mask controlled # by ``mask_lya_emission_width_A``. if mask_lines and not _use_continuum: _exclude = {"Lya", "NV_1", "NV_2", "NV_doublet"} for _name in REST_LINES_A: if _name.startswith("abs_"): _exclude.add(_name) line_mask = _mask_emission_lines( wave_A, z=z, width_A=mask_width_A, exclude=_exclude, ) if not fit_lya: lya_em_width = mask_lya_emission_width_A * (1.0 + z) line_mask &= ( (wave_A < lya_obs - lya_em_width) | (wave_A > lya_obs + lya_em_width) ) else: line_mask = np.ones(len(wave_A), dtype=bool) if mask_regions_A: for lo, hi in mask_regions_A: line_mask &= (wave_A < lo * (1.0 + z)) | (wave_A > hi * (1.0 + z)) good = (err_corr > 0) & np.isfinite(flux_corr) use = in_range & line_mask & good if use.sum() < 10: raise ValueError( f"Only {use.sum()} pixels remain after masking. " "Check fit_range_A, mask_width_A, and data quality." ) w = wave_A[use] f = flux_corr[use] e = err_corr[use] inv_var = 1.0 / e ** 2 _mode_bits = ["joint"] if fit_x_HI: _mode_bits.append("DLA+IGM") else: _mode_bits.append("DLA-only") if fit_lya: _mode_bits.append("+Lyα") if _use_continuum: _mode_bits.append("continuum") logger.info( "DLA fit (%s): %d pixels in [%.0f, %.0f] Å rest-frame " "(z=%.3f, Av=%.2f).", " ".join(_mode_bits), len(w), fit_range_A[0], fit_range_A[1], z, Av, ) # --- Construct the prior --- # Default log_F0 prior centred on data median if user did not pass one. if prior_log_F0 is None: f_med = float(np.nanmedian(np.maximum(f, 1e-30))) log_F0_centre = float(np.log10(max(f_med, 1e-30))) prior_log_F0 = (log_F0_centre - 3.0, log_F0_centre + 3.0) pri_lo = [prior_log_F0[0], prior_beta_UV[0], prior_log_NHI[0]] pri_hi = [prior_log_F0[1], prior_beta_UV[1], prior_log_NHI[1]] param_names = ["log_F0", "beta_UV", "log_NHI"] if fit_x_HI: pri_lo.append(prior_x_HI[0]) pri_hi.append(prior_x_HI[1]) param_names.append("x_HI") if fit_lya: pri_lo.extend([prior_log_A_lya[0], prior_sigma_lya[0], prior_alpha_lya[0]]) pri_hi.extend([prior_log_A_lya[1], prior_sigma_lya[1], prior_alpha_lya[1]]) param_names.extend(["log_A_lya", "sigma_lya", "alpha_lya"]) pri_lo = np.asarray(pri_lo) pri_hi = np.asarray(pri_hi) pri_range = pri_hi - pri_lo ndim = len(pri_lo) def prior_transform(u): return pri_lo + u * pri_range def log_likelihood(theta): log_F0 = theta[0] beta_UV = theta[1] log_NHI = theta[2] idx = 3 x_HI = float(theta[idx]) if fit_x_HI else 0.0 if fit_x_HI: idx += 1 lya_free = tuple(theta[idx:idx + 3]) if fit_lya else None m = _evaluate_model( w, log_F0, beta_UV, log_NHI, z, R=R, b_kms=b_kms, x_HI=x_HI, igm_z_min=igm_z_min, lya_ujy_func=_lya_ujy_func if not fit_lya else None, lya_free_params=lya_free, emission_lines=emission_lines, lam_pivot_A=lam_pivot_A, ) resid = f - m return -0.5 * float(np.sum(resid * resid * inv_var)) # --- Run dynesty --- sampler = dynesty.NestedSampler( log_likelihood, prior_transform, ndim=ndim, nlive=n_live, rstate=np.random.default_rng(seed), ) sampler.run_nested(print_progress=True, dlogz=dlogz) results = sampler.results weights = np.exp(results.logwt - results.logz[-1]) samples = resample_equal(results.samples, weights) samples_dict: dict[str, np.ndarray] = { name: samples[:, i] for i, name in enumerate(param_names) } def _median_ci(arr): m = float(np.median(arr)) lo = m - float(np.percentile(arr, 16)) hi = float(np.percentile(arr, 84)) - m return m, (lo, hi) log_F0_med, log_F0_err = _median_ci(samples_dict["log_F0"]) beta_UV_med, beta_UV_err = _median_ci(samples_dict["beta_UV"]) log_NHI_med, log_NHI_err = _median_ci(samples_dict["log_NHI"]) if fit_x_HI: x_HI_med, x_HI_err = _median_ci(samples_dict["x_HI"]) else: x_HI_med, x_HI_err = 0.0, (0.0, 0.0) # Keep an empty x_HI sample for downstream compatibility. samples_dict["x_HI"] = np.zeros_like(samples_dict["log_NHI"]) # --- Upper-limit detection: posterior touches the prior lower bound --- nhi_p5 = float(np.percentile(samples_dict["log_NHI"], 5)) is_upper_limit = nhi_p5 <= prior_log_NHI[0] + 0.1 log_NHI_upper95 = float(np.percentile(samples_dict["log_NHI"], 95)) if is_upper_limit: logger.info( "log_NHI posterior is unconstrained — reporting 95%% upper " "limit log_NHI < %.2f.", log_NHI_upper95, ) # --- Surface density (Pollock+26 Eq. 7) --- Sigma_HI = 8e-21 * 10.0 ** log_NHI_med # --- Best-fit model on the full fit window (for plotting) --- lya_free_best = None if fit_lya: lya_free_best = ( float(np.median(samples_dict["log_A_lya"])), float(np.median(samples_dict["sigma_lya"])), float(np.median(samples_dict["alpha_lya"])), ) model_best = _evaluate_model( w, log_F0_med, beta_UV_med, log_NHI_med, z, R=R, b_kms=b_kms, x_HI=x_HI_med, igm_z_min=igm_z_min, lya_ujy_func=_lya_ujy_func if not fit_lya else None, lya_free_params=lya_free_best, emission_lines=emission_lines, lam_pivot_A=lam_pivot_A, ) log_evidence = float(results.logz[-1]) final_lya_params = np.asarray(lya_params) if lya_params is not None else None result = DLAResult( log_NHI=log_NHI_med, log_NHI_err=log_NHI_err, beta_UV=beta_UV_med, beta_UV_err=beta_UV_err, log_F0=log_F0_med, log_F0_err=log_F0_err, Sigma_HI=Sigma_HI, samples=samples_dict, wave_fit=np.array(w), flux_fit=np.array(f), flux_err_fit=np.array(e), model_best=model_best, z=z, Av=Av, log_evidence=log_evidence, _lya_subtracted=False, lya_params=final_lya_params, x_HI=x_HI_med, x_HI_err=x_HI_err, fit_x_HI=fit_x_HI, igm_z_min=igm_z_min, log_NHI_upper95=log_NHI_upper95, is_upper_limit=is_upper_limit, lam_pivot_A=lam_pivot_A, ) result._R = R return result
# -------------------------------------------------------------------------- # D_Lyα equivalent-width statistic (Heintz et al. 2025, Eq. 1) # --------------------------------------------------------------------------
[docs] def compute_D_Lya( wave_A: np.ndarray, flux: np.ndarray, flux_err: np.ndarray, z: float, *, cont_range_A: tuple[float, float] = (1250.0, 2600.0), int_range_A: tuple[float, float] = (1180.0, 1350.0), mask_lines: bool = True, mask_width_A: float = 20.0, n_mc: int = 1000, seed: int = 42, ) -> dict[str, float]: """Heintz+25 D_Lyα equivalent-width statistic (Eq. 1). Fits a power-law continuum F_cont(λ) = F_1550 (λ/1550)^β over *cont_range_A* (lines masked) then integrates D_Lyα = ∫ (1 − F_λ / F_cont(λ)) dλ / (1 + z) over *int_range_A* (rest-frame). D_Lyα < 0 indicates a Lyα emitter; ≳50 Å indicates a strong damped absorber. Parameters ---------- wave_A, flux, flux_err : np.ndarray Spectrum in observed-frame Å and arbitrary linear flux units. z : float Source redshift. cont_range_A : tuple Rest-frame range used to fit the power-law continuum (default 1250–2600 Å, matching Heintz+25). int_range_A : tuple Rest-frame range over which the EW integral is taken (default 1180–1350 Å, matching Heintz+25 Eq. 1). mask_lines : bool Whether to mask known UV emission lines from the continuum fit (default True). The integral is always taken over the full *int_range_A* — emission/absorption inside it is the signal. mask_width_A : float Half-width of line masks for the continuum fit (rest-frame Å). n_mc : int Monte-Carlo iterations for D_Lyα uncertainty propagation. seed : int RNG seed. Returns ------- dict ``{"D_Lya": float, "D_Lya_err": float, "beta_UV": float, "beta_UV_err": float, "F_1550": float, "F_1550_err": float}`` """ from scipy.optimize import curve_fit wave_A = np.asarray(wave_A, dtype=np.float64) flux = np.asarray(flux, dtype=np.float64) flux_err = np.asarray(flux_err, dtype=np.float64) wave_rest = wave_A / (1.0 + z) pivot = 1550.0 # rest-frame (Heintz+25) in_cont = (wave_rest >= cont_range_A[0]) & (wave_rest <= cont_range_A[1]) if mask_lines: line_mask = _mask_emission_lines(wave_A, z=z, width_A=mask_width_A) # Also mask the broad Lyα region from the continuum fit. lya_obs = _LAMBDA_LYA_A * (1.0 + z) line_mask &= ( (wave_A < lya_obs - 50.0 * (1.0 + z)) | (wave_A > lya_obs + 50.0 * (1.0 + z)) ) else: line_mask = np.ones(len(wave_A), dtype=bool) cont_use = in_cont & line_mask & (flux_err > 0) & np.isfinite(flux) if cont_use.sum() < 5: raise ValueError( f"Only {cont_use.sum()} continuum pixels in {cont_range_A} Å; " "cannot fit β_UV." ) def power_law(lam_rest, F_1550, beta): return F_1550 * (lam_rest / pivot) ** beta f_med = float(np.nanmedian(flux[cont_use])) popt, pcov = curve_fit( power_law, wave_rest[cont_use], flux[cont_use], p0=[max(f_med, 1e-30), -2.0], sigma=flux_err[cont_use], absolute_sigma=True, bounds=([1e-30, -5.0], [1e30, 1.0]), ) F_1550_med, beta_med = popt perr = np.sqrt(np.diag(pcov)) # Integral: D_Lyα = ∫ (1 - F/F_cont) dλ / (1+z), in rest-frame Å. in_int = (wave_rest >= int_range_A[0]) & (wave_rest <= int_range_A[1]) in_int &= (flux_err > 0) & np.isfinite(flux) if in_int.sum() < 3: raise ValueError( f"Only {in_int.sum()} pixels in EW integral range " f"{int_range_A} Å — increase the spectrum coverage." ) w_int = wave_rest[in_int] f_int = flux[in_int] fe_int = flux_err[in_int] order = np.argsort(w_int) w_int = w_int[order] f_int = f_int[order] fe_int = fe_int[order] rng = np.random.default_rng(seed) D_samples = np.empty(n_mc) for k in range(n_mc): F_1550_k = F_1550_med + perr[0] * rng.standard_normal() beta_k = beta_med + perr[1] * rng.standard_normal() f_k = f_int + fe_int * rng.standard_normal(len(f_int)) cont_k = F_1550_k * (w_int / pivot) ** beta_k # Avoid divide-by-zero at the rare positive-continuum dips. with np.errstate(divide="ignore", invalid="ignore"): integrand = np.where(cont_k > 0, 1.0 - f_k / cont_k, 0.0) D_samples[k] = _trapezoid(integrand, w_int) D_Lya_med = float(np.median(D_samples)) D_Lya_err = float(np.std(D_samples)) return { "D_Lya": D_Lya_med, "D_Lya_err": D_Lya_err, "beta_UV": float(beta_med), "beta_UV_err": float(perr[1]), "F_1550": float(F_1550_med), "F_1550_err": float(perr[0]), "n_pixels": int(in_int.sum()), }