Source code for jwspecabund.forward

"""Bayesian forward model for emission-line abundance determination.

Implements the forward modelling approach of Cullen et al. (2025): free
parameters (ionic abundances, T_e, n_e) are sampled via MCMC or nested
sampling.  For each parameter vector, CEL emissivities from PyNEB and
the Aller (1984) Hbeta recombination formula predict line flux ratios
that are compared to the observed ratios via a Gaussian likelihood.

References
----------
- Aller L. H., 1984, Physics of Thermal Gaseous Nebulae (Reidel)
- Cullen et al. (2025) — forward modelling for EXCELS galaxies
"""

from __future__ import annotations

import logging
from typing import Any

import numpy as np

logger = logging.getLogger(__name__)

# ---------------------------------------------------------------------------
# Physical constants and Aller (1984) Hbeta emissivity
# ---------------------------------------------------------------------------

# Planck constant * speed of light in CGS (erg cm).
_HC_CGS = 6.62607015e-27 * 2.99792458e10

# Hbeta vacuum wavelength in cm.
_LAMBDA_HB_CM = 4862.68e-8


[docs] def hbeta_emissivity_aller84(Te: float) -> float: r"""Hbeta volume emissivity coefficient using the Aller (1984) Case B formula. .. math:: \varepsilon_{H\beta} = \alpha_{H\beta}^{\rm eff}(T)\;h\nu_{H\beta} where :math:`\alpha_{H\beta}^{\rm eff} = 3.03\times10^{-14}\,(T/10^4)^{-0.874}` cm\ :sup:`3` s\ :sup:`-1`. Unlike PyNEB's recombination tables, this formula is valid at *all* temperatures — no upper-bound limitation at 30,000 K. Parameters ---------- Te : float Electron temperature in K. Returns ------- float :math:`\varepsilon_{H\beta}` in erg cm\ :sup:`3` s\ :sup:`-1`. """ alpha_hb = 3.03e-14 * (Te / 1e4) ** (-0.874) hnu_hb = _HC_CGS / _LAMBDA_HB_CM return alpha_hb * hnu_hb
# --------------------------------------------------------------------------- # N V (N4+) manual emissivity — PyNEB has no N 5 data # --------------------------------------------------------------------------- # Li-like N4+ (1s² 2s ²S₁/₂ ground state): # Level 1: 2s ²S₁/₂ (g = 2) # Level 2: 2p ²P₁/₂ (g = 2), λ₂₁ = 1242.804 Å, A₂₁ = 3.37×10⁸ s⁻¹ # Level 3: 2p ²P₃/₂ (g = 4), λ₃₁ = 1238.821 Å, A₃₁ = 3.40×10⁸ s⁻¹ # # Effective collision strengths Υ(T) from Aggarwal & Keenan (2004, # A&A, 427, 763) / CHIANTI v10. We store a table and interpolate. _NV_A21 = 3.37e8 # s⁻¹ _NV_A31 = 3.40e8 # s⁻¹ _NV_G = np.array([2, 2, 4]) # statistical weights _NV_WAVE = {2: 1242.804, 3: 1238.821} # Angstroms # Υ(T) table: log10(T) and effective collision strengths (Aggarwal & Keenan 2004) # Transitions: 1→2, 1→3, 2→3 _NV_LOGTEMP = np.array([3.5, 3.7, 3.9, 4.0, 4.1, 4.2, 4.3, 4.4, 4.5, 4.6, 4.7, 4.8, 4.9, 5.0, 5.2, 5.4]) _NV_UPS_12 = np.array([0.112, 0.126, 0.142, 0.151, 0.161, 0.174, 0.190, 0.210, 0.233, 0.258, 0.280, 0.295, 0.300, 0.296, 0.266, 0.224]) _NV_UPS_13 = np.array([0.224, 0.252, 0.283, 0.301, 0.322, 0.348, 0.380, 0.419, 0.466, 0.516, 0.559, 0.589, 0.600, 0.592, 0.532, 0.448]) _NV_UPS_23 = np.array([0.262, 0.291, 0.327, 0.349, 0.375, 0.408, 0.449, 0.500, 0.560, 0.624, 0.682, 0.725, 0.744, 0.738, 0.668, 0.563]) def _nv_emissivity(Te: float, ne: float, wave: int) -> float: r"""N V emissivity from a 3-level atom model. Solves statistical equilibrium for the Li-like N4+ ion (1s² 2s ²S – 1s² 2p ²P doublet). Parameters ---------- Te : float Electron temperature in K. ne : float Electron density in cm⁻³. wave : int Approximate wavelength label: ``1239`` (²P₃/₂→²S₁/₂) or ``1243`` (²P₁/₂→²S₁/₂). Returns ------- float Volume emissivity coefficient ε in erg cm³ s⁻¹ (same units as PyNEB ``Atom.getEmissivity``). """ logT = np.log10(Te) logT = np.clip(logT, _NV_LOGTEMP[0], _NV_LOGTEMP[-1]) ups_12 = float(np.interp(logT, _NV_LOGTEMP, _NV_UPS_12)) ups_13 = float(np.interp(logT, _NV_LOGTEMP, _NV_UPS_13)) ups_23 = float(np.interp(logT, _NV_LOGTEMP, _NV_UPS_23)) # Collision rate coefficients: q_ij = 8.629e-6 / (g_i * T^0.5) * Υ_ij coeff = 8.629e-6 / np.sqrt(Te) q12 = coeff * ups_12 / _NV_G[0] q13 = coeff * ups_13 / _NV_G[0] q21 = coeff * ups_12 / _NV_G[1] q31 = coeff * ups_13 / _NV_G[2] q23 = coeff * ups_23 / _NV_G[1] q32 = coeff * ups_23 / _NV_G[2] # Statistical equilibrium: Σ n_j (A_ji + n_e q_ji) = n_i Σ (A_ij + n_e q_ij) # For 3 levels, solve 2 independent equations + normalisation (n1+n2+n3=1). # Rate matrix M·n = 0, with n1+n2+n3 = 1. A21 = _NV_A21 A31 = _NV_A31 # From level 2: n2 (A21 + ne*q21 + ne*q23) = n1 * ne*q12 + n3 * (ne*q32) # From level 3: n3 (A31 + ne*q31 + ne*q32) = n1 * ne*q13 + n2 * (ne*q23) # Express n2, n3 in terms of n1 via matrix inversion. a = A21 + ne * q21 + ne * q23 b = -ne * q32 c = -ne * q23 d = A31 + ne * q31 + ne * q32 det = a * d - b * c if abs(det) < 1e-100: return 0.0 # Right-hand side coefficients (for n1=1) rhs2 = ne * q12 rhs3 = ne * q13 n2_rel = (d * rhs2 - b * rhs3) / det n3_rel = (a * rhs3 - c * rhs2) / det norm = 1.0 + n2_rel + n3_rel n2 = n2_rel / norm n3 = n3_rel / norm # Emissivity = n_level * A * hν / (ne * nion) → ε = fraction * A * hν # PyNEB convention: getEmissivity returns ε such that # j_line = ne * n_ion * ε (erg/cm³/s) # So ε = (n_upper / (ne * n_total)) * A * hν # With n_total = 1 (normalised), ε = n_upper * A * hν / ne if wave == 1239 or wave == 1238: lam_cm = _NV_WAVE[3] * 1e-8 eps = n3 * A31 * (_HC_CGS / lam_cm) / ne elif wave == 1243 or wave == 1242: lam_cm = _NV_WAVE[2] * 1e-8 eps = n2 * A21 * (_HC_CGS / lam_cm) / ne else: # Sum of doublet eps = (n3 * A31 * (_HC_CGS / (_NV_WAVE[3] * 1e-8)) + n2 * A21 * (_HC_CGS / (_NV_WAVE[2] * 1e-8))) / ne return float(eps) # --------------------------------------------------------------------------- # Line -> ion mapping # --------------------------------------------------------------------------- # Each line name maps to a list of (element, ion_stage, pyneb_wave) tuples. # Most lines are single; blended doublets (e.g. OII_doublet) sum components. _LINE_COMPONENTS: dict[str, list[tuple[str, int, int]]] = { "OIII_5007": [("O", 3, 5007)], "OIII_4959": [("O", 3, 4959)], "OIII_4363": [("O", 3, 4363)], "OII_3726": [("O", 2, 3726)], "OII_3729": [("O", 2, 3729)], "OII_doublet": [("O", 2, 3726), ("O", 2, 3729)], "NII_6585": [("N", 2, 6584)], "NII_6550": [("N", 2, 6548)], "NeIII_3869": [("Ne", 3, 3869)], "SII_6718": [("S", 2, 6718)], "SII_6732": [("S", 2, 6732)], "SIII_9069": [("S", 3, 9069)], "ArIII_7136": [("Ar", 3, 7136)], # UV semi-forbidden lines "NIV_1483": [("N", 4, 1483)], "NIV_1486": [("N", 4, 1487)], "NIII_1749": [("N", 3, 1749)], "NIII_1752": [("N", 3, 1752)], "NIII_doublet": [("N", 3, 1749), ("N", 3, 1752)], "CII]_2324": [("C", 2, 2324)], "CII]_2326": [("C", 2, 2326)], "CIII]_1907": [("C", 3, 1907)], "CIII]": [("C", 3, 1909)], "CIV_1": [("C", 4, 1548)], "CIV_2": [("C", 4, 1551)], "CIV_doublet": [("C", 4, 1548), ("C", 4, 1551)], "OIII_1661": [("O", 3, 1661)], "OIII_1666": [("O", 3, 1666)], # N V resonance doublet (manual emissivity, no PyNEB atom) "NV_1": [("N", 5, 1239)], "NV_2": [("N", 5, 1243)], "NV_doublet": [("N", 5, 1239), ("N", 5, 1243)], } # Maps (element, ion_stage) -> parameter name for ionic abundance. _ION_PARAM_NAME: dict[tuple[str, int], str] = { ("O", 3): "log_Opp", ("O", 2): "log_Op", ("N", 2): "log_Np", ("N", 3): "log_Npp", ("N", 4): "log_Nppp", ("Ne", 3): "log_Nepp", ("S", 2): "log_Sp", ("S", 3): "log_Spp", ("Ar", 3): "log_Arpp", ("C", 2): "log_Cp", ("C", 3): "log_Cpp", ("C", 4): "log_Cppp", ("N", 5): "log_Npppp", } # Human-readable labels for ionic abundance parameters. _PARAM_LABELS: dict[str, str] = { "log_Te": r"$\log\,T_e$ [K]", "log_ne": r"$\log\,n_e$ [cm$^{-3}$]", "log_Opp": r"$\log\,(\mathrm{O}^{2+}/\mathrm{H}^+)$", "log_Op": r"$\log\,(\mathrm{O}^{+}/\mathrm{H}^+)$", "log_Np": r"$\log\,(\mathrm{N}^{+}/\mathrm{H}^+)$", "log_Npp": r"$\log\,(\mathrm{N}^{2+}/\mathrm{H}^+)$", "log_Nppp": r"$\log\,(\mathrm{N}^{3+}/\mathrm{H}^+)$", "log_Nepp": r"$\log\,(\mathrm{Ne}^{2+}/\mathrm{H}^+)$", "log_Sp": r"$\log\,(\mathrm{S}^{+}/\mathrm{H}^+)$", "log_Spp": r"$\log\,(\mathrm{S}^{2+}/\mathrm{H}^+)$", "log_Arpp": r"$\log\,(\mathrm{Ar}^{2+}/\mathrm{H}^+)$", "log_Cp": r"$\log\,(\mathrm{C}^{+}/\mathrm{H}^+)$", "log_Cpp": r"$\log\,(\mathrm{C}^{2+}/\mathrm{H}^+)$", "log_Cppp": r"$\log\,(\mathrm{C}^{3+}/\mathrm{H}^+)$", "log_Npppp": r"$\log\,(\mathrm{N}^{4+}/\mathrm{H}^+)$", } # Uniform prior bounds for each parameter. _PRIOR_BOUNDS: dict[str, tuple[float, float]] = { "log_Te": (3.5, 5.0), "log_ne": (0.0, 5.0), "log_Opp": (-7.0, -2.0), "log_Op": (-7.0, -2.0), "log_Np": (-8.0, -3.0), "log_Npp": (-8.0, -3.0), "log_Nppp": (-8.0, -3.0), "log_Nepp": (-8.0, -3.0), "log_Sp": (-9.0, -4.0), "log_Spp": (-9.0, -4.0), "log_Arpp": (-9.0, -4.0), "log_Cpp": (-8.0, -3.0), "log_Cppp": (-8.0, -3.0), "log_Npppp": (-9.0, -3.0), } # --------------------------------------------------------------------------- # Observables and parameter construction # --------------------------------------------------------------------------- def _build_observables( line_fluxes: dict[str, float], line_errors: dict[str, float], ) -> tuple[ list[str], np.ndarray, np.ndarray, list[list[tuple[str, int, int]]], ]: """Build observed line/Hbeta flux ratios from available lines. Parameters ---------- line_fluxes : dict ``{line_name: flux}``. Must include ``"HBETA"``. line_errors : dict ``{line_name: flux_err}``. Returns ------- ratio_names : list of str Human-readable labels for each observable, e.g. ``"OIII_5007/Hb"``. obs_ratios : ndarray Observed line/Hbeta ratios. ratio_errors : ndarray 1-sigma errors on the ratios (Gaussian propagation). obs_components : list of list of tuple For each observable, the list of PyNEB ``(elem, ion, wave)`` components that sum to give the line flux. """ f_hb = line_fluxes.get("HBETA", 0.0) e_hb = line_errors.get("HBETA", 0.0) if f_hb <= 0: raise ValueError("HBETA flux is required and must be positive.") ratio_names: list[str] = [] obs_ratios: list[float] = [] ratio_errs: list[float] = [] obs_components: list[list[tuple[str, int, int]]] = [] # Lines that require more atomic levels than PyNEB's default data # provides (e.g. [OIII] 1661/1666 need a 6-level O++ atom). _SKIP_LINES = {"OIII_1661", "OIII_1666"} for name, components in _LINE_COMPONENTS.items(): if name in _SKIP_LINES: continue if name not in line_fluxes or line_fluxes[name] <= 0: continue f_line = line_fluxes[name] e_line = line_errors.get(name, 0.0) ratio = f_line / f_hb # Error propagation: sigma_R = R * sqrt((sigma_l/f_l)^2 + (sigma_hb/f_hb)^2) if e_line > 0 and e_hb > 0: rel_err = ratio * np.sqrt( (e_line / f_line) ** 2 + (e_hb / f_hb) ** 2 ) else: rel_err = 0.1 * ratio # 10 % default ratio_names.append(f"{name}/Hb") obs_ratios.append(ratio) ratio_errs.append(rel_err) obs_components.append(components) if not ratio_names: raise ValueError("No valid emission lines found for forward modelling.") return ( ratio_names, np.array(obs_ratios), np.array(ratio_errs), obs_components, ) def _determine_free_params( obs_components: list[list[tuple[str, int, int]]], ) -> list[str]: """Determine free parameters from the ions present in the data. Always includes ``log_Te`` and ``log_ne``, plus one log-ionic-abundance parameter for each distinct ``(element, ion)`` pair. Parameters ---------- obs_components : list Output of :func:`_build_observables`. Returns ------- list of str Parameter names, e.g. ``["log_Te", "log_ne", "log_Opp", "log_Nepp"]``. """ params = ["log_Te", "log_ne"] seen_ions: set[tuple[str, int]] = set() for components in obs_components: for elem, ion, _ in components: key = (elem, ion) if key not in seen_ions: seen_ions.add(key) param_name = _ION_PARAM_NAME.get(key) if param_name is not None: params.append(param_name) return params # --------------------------------------------------------------------------- # Model prediction, likelihood, prior, posterior # --------------------------------------------------------------------------- def _predict_ratios( theta: np.ndarray, param_names: list[str], obs_components: list[list[tuple[str, int, int]]], pyneb_atoms: dict[tuple[str, int], Any], ) -> np.ndarray: """Predict line/Hbeta ratios for a given parameter vector. Parameters ---------- theta : ndarray Current parameter values. param_names : list of str Names matching *theta*. obs_components : list Per-observable list of ``(elem, ion, wave)`` components. pyneb_atoms : dict Pre-initialised PyNEB ``Atom`` objects keyed by ``(elem, ion)``. Returns ------- ndarray Predicted line/Hbeta ratios (same length as *obs_components*). """ params = dict(zip(param_names, theta)) Te = 10.0 ** params["log_Te"] ne = 10.0 ** params["log_ne"] eps_hb = hbeta_emissivity_aller84(Te) predicted = np.empty(len(obs_components)) for i, components in enumerate(obs_components): # All components belong to the same ion. elem0, ion0, _ = components[0] ion_key = (elem0, ion0) param_name = _ION_PARAM_NAME[ion_key] abund = 10.0 ** params[param_name] if ion_key == ("N", 5): # N V: manual 3-level atom (PyNEB has no N 5 data). eps_line = sum( _nv_emissivity(Te, ne, wave=w) for _, _, w in components ) else: atom = pyneb_atoms[ion_key] eps_line = sum( atom.getEmissivity(Te, ne, wave=w) for _, _, w in components ) # F_line / F_Hb = (X^i+ / H+) * eps_line / eps_Hb predicted[i] = abund * eps_line / eps_hb return predicted def _log_likelihood( theta: np.ndarray, param_names: list[str], obs_ratios: np.ndarray, ratio_errors: np.ndarray, obs_components: list[list[tuple[str, int, int]]], pyneb_atoms: dict[tuple[str, int], Any], ) -> float: """Gaussian log-likelihood comparing predicted to observed ratios.""" try: predicted = _predict_ratios( theta, param_names, obs_components, pyneb_atoms, ) except Exception: return -np.inf if not np.all(np.isfinite(predicted)) or np.any(predicted <= 0): return -np.inf residuals = (obs_ratios - predicted) / ratio_errors return -0.5 * np.sum(residuals ** 2) def _log_prior(theta: np.ndarray, param_names: list[str]) -> float: """Uniform log-prior within prescribed bounds.""" for val, name in zip(theta, param_names): lo, hi = _PRIOR_BOUNDS[name] if val < lo or val > hi: return -np.inf return 0.0 def _log_posterior( theta: np.ndarray, param_names: list[str], obs_ratios: np.ndarray, ratio_errors: np.ndarray, obs_components: list[list[tuple[str, int, int]]], pyneb_atoms: dict[tuple[str, int], Any], ) -> float: """Log-posterior = log-prior + log-likelihood.""" lp = _log_prior(theta, param_names) if not np.isfinite(lp): return -np.inf return lp + _log_likelihood( theta, param_names, obs_ratios, ratio_errors, obs_components, pyneb_atoms, ) # --------------------------------------------------------------------------- # Sampler drivers # --------------------------------------------------------------------------- def _find_map( param_names: list[str], obs_ratios: np.ndarray, ratio_errors: np.ndarray, obs_components: list[list[tuple[str, int, int]]], pyneb_atoms: dict[tuple[str, int], Any], seed: int, ) -> np.ndarray: """Find a MAP estimate to seed the sampler. Runs ``scipy.optimize.differential_evolution`` to find a good starting point within the prior bounds. Parameters ---------- param_names, obs_ratios, ratio_errors, obs_components, pyneb_atoms Model specification (see :func:`forward_model`). seed : int Random seed. Returns ------- ndarray MAP parameter vector. """ from scipy.optimize import differential_evolution bounds = [_PRIOR_BOUNDS[n] for n in param_names] def neg_logpost(theta): val = _log_posterior( theta, param_names, obs_ratios, ratio_errors, obs_components, pyneb_atoms, ) return -val if np.isfinite(val) else 1e30 result = differential_evolution( neg_logpost, bounds, seed=seed, maxiter=500, tol=1e-6, polish=True, ) logger.info("MAP optimisation: success=%s, fun=%.2f", result.success, result.fun) return result.x def _run_emcee( param_names: list[str], obs_ratios: np.ndarray, ratio_errors: np.ndarray, obs_components: list[list[tuple[str, int, int]]], pyneb_atoms: dict[tuple[str, int], Any], *, n_walkers: int, n_steps: int, n_burn: int, seed: int, progress: bool, ) -> np.ndarray: """Run emcee MCMC and return flattened posterior samples. Parameters ---------- param_names, obs_ratios, ratio_errors, obs_components, pyneb_atoms Model specification. n_walkers : int Number of walkers. n_steps : int Total MCMC steps (including burn-in). n_burn : int Burn-in steps to discard. seed : int Random seed. progress : bool Show progress bar. Returns ------- ndarray, shape ``(n_samples, n_dim)`` Flattened posterior samples after burn-in removal. """ import emcee n_dim = len(param_names) rng = np.random.default_rng(seed) # Seed walkers around the MAP estimate. map_est = _find_map( param_names, obs_ratios, ratio_errors, obs_components, pyneb_atoms, seed, ) logger.info("MAP estimate: %s", dict(zip(param_names, map_est))) p0 = np.empty((n_walkers, n_dim)) for j in range(n_dim): lo, hi = _PRIOR_BOUNDS[param_names[j]] scale = 0.01 * (hi - lo) p0[:, j] = np.clip( rng.normal(map_est[j], scale, n_walkers), lo + 1e-6, hi - 1e-6, ) sampler = emcee.EnsembleSampler( n_walkers, n_dim, _log_posterior, args=( param_names, obs_ratios, ratio_errors, obs_components, pyneb_atoms, ), ) sampler.run_mcmc(p0, n_steps, progress=progress) return sampler.get_chain(discard=n_burn, flat=True) def _run_dynesty( param_names: list[str], obs_ratios: np.ndarray, ratio_errors: np.ndarray, obs_components: list[list[tuple[str, int, int]]], pyneb_atoms: dict[tuple[str, int], Any], *, n_live: int, seed: int, ) -> np.ndarray: """Run dynesty nested sampling and return weighted posterior samples. Parameters ---------- param_names, obs_ratios, ratio_errors, obs_components, pyneb_atoms Model specification. n_live : int Number of live points. seed : int Random seed. Returns ------- ndarray, shape ``(n_samples, n_dim)`` Equally-weighted posterior samples. """ try: import dynesty except ImportError as exc: raise ImportError( "dynesty is required for nested sampling. " "Install with: pip install dynesty" ) from exc n_dim = len(param_names) def loglike(theta): return _log_likelihood( theta, param_names, obs_ratios, ratio_errors, obs_components, pyneb_atoms, ) def prior_transform(u): """Map unit cube [0, 1]^n to the prior volume.""" theta = np.empty(n_dim) for j, name in enumerate(param_names): lo, hi = _PRIOR_BOUNDS[name] theta[j] = lo + u[j] * (hi - lo) return theta sampler = dynesty.NestedSampler( loglike, prior_transform, n_dim, nlive=n_live, rstate=np.random.default_rng(seed), ) sampler.run_nested(print_progress=True) results = sampler.results # Resample to get equally-weighted samples. from dynesty.utils import resample_equal weights = np.exp(results.logwt - results.logz[-1]) return resample_equal(results.samples, weights) # --------------------------------------------------------------------------- # Build result dict from posterior samples # --------------------------------------------------------------------------- def _build_forward_result( samples: np.ndarray, param_names: list[str], ratio_names: list[str], ) -> dict[str, Any]: """Summarise posterior samples into a result dictionary. Parameters ---------- samples : ndarray, shape ``(n, n_dim)`` Posterior samples. param_names : list of str Parameter names. ratio_names : list of str Observable names (for metadata). Returns ------- dict Contains posteriors, medians, and 16/84-percentile intervals for Te, ne, ionic abundances, and 12+log(O/H). """ result: dict[str, Any] = { "param_names": param_names, "param_labels": [_PARAM_LABELS.get(n, n) for n in param_names], "samples": samples, "ratio_names": ratio_names, } def _summarise(arr: np.ndarray) -> tuple[float, tuple[float, float]]: med = float(np.median(arr)) lo = float(med - np.percentile(arr, 16)) hi = float(np.percentile(arr, 84) - med) return med, (lo, hi) # Te idx_Te = param_names.index("log_Te") Te_post = 10.0 ** samples[:, idx_Te] result["Te_posterior"] = Te_post result["Te"], result["Te_err"] = _summarise(Te_post) # ne idx_ne = param_names.index("log_ne") ne_post = 10.0 ** samples[:, idx_ne] result["ne_posterior"] = ne_post result["ne"], result["ne_err"] = _summarise(ne_post) # Ionic abundance posteriors ionic_posteriors: dict[str, np.ndarray] = {} ionic_medians: dict[str, float] = {} for j, name in enumerate(param_names): if name.startswith("log_") and name not in ("log_Te", "log_ne"): ionic_posteriors[name] = samples[:, j] ionic_medians[name] = float(np.median(samples[:, j])) result["ionic_posteriors"] = ionic_posteriors result["ionic"] = ionic_medians # 12 + log(O/H) if "log_Opp" in ionic_posteriors: log_opp = ionic_posteriors["log_Opp"] if "log_Op" in ionic_posteriors: log_op = ionic_posteriors["log_Op"] OH = 10.0 ** log_opp + 10.0 ** log_op else: # No [OII] — assume O^2+ ~ O (paper's assumption) OH = 10.0 ** log_opp OH_12 = 12.0 + np.log10(OH) result["OH_posterior"] = OH_12 result["OH"], result["OH_err"] = _summarise(OH_12) else: result["OH"] = np.nan result["OH_err"] = np.nan result["OH_posterior"] = None # log(N/O) — sum all available nitrogen ions in linear space, divide # by total oxygen. Falls back to N+/O+ if only optical N+ is available. n_ion_keys = [ ("log_Np", "N+"), ("log_Npp", "N2+"), ("log_Nppp", "N3+"), ("log_Npppp", "N4+"), ] n_posteriors_lin = [] for key, _label in n_ion_keys: if key in ionic_posteriors: n_posteriors_lin.append(10.0 ** ionic_posteriors[key]) o_posteriors_lin = [] for key in ("log_Op", "log_Opp"): if key in ionic_posteriors: o_posteriors_lin.append(10.0 ** ionic_posteriors[key]) if n_posteriors_lin and o_posteriors_lin: N_total = sum(n_posteriors_lin) O_total = sum(o_posteriors_lin) with np.errstate(invalid="ignore", divide="ignore"): NO_post = np.where( (N_total > 0) & (O_total > 0), np.log10(N_total / O_total), np.nan, ) valid = np.isfinite(NO_post) if np.any(valid): result["NO_posterior"] = NO_post result["NO"], result["NO_err"] = _summarise(NO_post[valid]) else: result["NO"] = None result["NO_err"] = None result["NO_posterior"] = None else: result["NO"] = None result["NO_err"] = None result["NO_posterior"] = None # log(Ne/O) if Ne^2+ and O^2+ available if "log_Nepp" in ionic_posteriors and "log_Opp" in ionic_posteriors: NeO_post = ionic_posteriors["log_Nepp"] - ionic_posteriors["log_Opp"] result["NeO_posterior"] = NeO_post result["NeO"], result["NeO_err"] = _summarise(NeO_post) else: result["NeO"] = None result["NeO_err"] = None # log(C/O) — sum C2+ and C3+ in linear space, divide by O2+ c_ion_keys = [("log_Cpp", "C2+"), ("log_Cppp", "C3+")] c_posteriors_lin = [] for key, _label in c_ion_keys: if key in ionic_posteriors: c_posteriors_lin.append(10.0 ** ionic_posteriors[key]) if c_posteriors_lin and "log_Opp" in ionic_posteriors: C_total = sum(c_posteriors_lin) O_pp_lin = 10.0 ** ionic_posteriors["log_Opp"] with np.errstate(invalid="ignore", divide="ignore"): CO_post = np.where( (C_total > 0) & (O_pp_lin > 0), np.log10(C_total / O_pp_lin), np.nan, ) valid = np.isfinite(CO_post) if np.any(valid): result["CO_posterior"] = CO_post result["CO"], result["CO_err"] = _summarise(CO_post[valid]) else: result["CO"] = None result["CO_err"] = None result["CO_posterior"] = None else: result["CO"] = None result["CO_err"] = None result["CO_posterior"] = None return result # --------------------------------------------------------------------------- # Public API # ---------------------------------------------------------------------------
[docs] def forward_model( line_fluxes: dict[str, float], line_errors: dict[str, float], *, sampler: str = "emcee", n_walkers: int = 32, n_steps: int = 5000, n_burn: int = 1000, n_live: int = 500, seed: int = 42, progress: bool = True, ) -> dict[str, Any]: """Run a Bayesian forward model on emission-line fluxes. Free parameters are log(T_e), log(n_e), and one log(ionic abundance) per detected ion. The model predicts line/Hbeta flux ratios using PyNEB CEL emissivities and the Aller (1984) Hbeta formula, and the Gaussian likelihood compares predictions to observations. Parameters ---------- line_fluxes : dict ``{line_name: flux}``. Must include ``"HBETA"``. line_errors : dict ``{line_name: flux_err}``. sampler : str ``"emcee"`` (default) or ``"dynesty"``. n_walkers : int Number of walkers for emcee (default 32). n_steps : int Total MCMC steps for emcee (default 5000). n_burn : int Burn-in steps to discard for emcee (default 1000). n_live : int Number of live points for dynesty (default 500). seed : int Random seed (default 42). progress : bool Show progress bar (default ``True``). Returns ------- dict Result dictionary with keys: - ``param_names``, ``param_labels`` — parameter metadata - ``samples`` — raw posterior samples, shape ``(n, n_dim)`` - ``Te``, ``Te_err``, ``Te_posterior`` - ``ne``, ``ne_err``, ``ne_posterior`` - ``OH``, ``OH_err``, ``OH_posterior`` - ``NO``, ``NO_err``, ``NO_posterior`` (if nitrogen detected) - ``ionic`` — dict of median log-ionic abundances - ``ionic_posteriors`` — dict of log-ionic posterior arrays - ``ratio_names`` — observable labels Examples -------- >>> import jwspecabund >>> fluxes = { ... "HBETA": 1.0, "OIII_5007": 5.0, ... "OIII_4363": 0.05, "NeIII_3869": 0.3, ... } >>> errors = {k: 0.05 * v for k, v in fluxes.items()} >>> res = jwspecabund.forward_model(fluxes, errors, n_steps=2000) >>> print(f"12+log(O/H) = {res['OH']:.2f}") """ try: import pyneb as pn except ImportError as exc: raise ImportError( "PyNEB is required for the forward model. " "Install with: pip install 'jwspecfit[abund]'" ) from exc # Build observables. ratio_names, obs_ratios, ratio_errors, obs_components = _build_observables( line_fluxes, line_errors, ) # Determine free parameters. param_names = _determine_free_params(obs_components) n_dim = len(param_names) logger.info( "Forward model: %d observables (%s), %d free parameters (%s)", len(obs_ratios), ", ".join(ratio_names), n_dim, ", ".join(param_names), ) # Pre-initialise PyNEB Atom objects (skip N V — uses manual emissivity). pyneb_atoms: dict[tuple[str, int], Any] = {} for components in obs_components: for elem, ion, _ in components: key = (elem, ion) if key not in pyneb_atoms and key != ("N", 5): pyneb_atoms[key] = pn.Atom(elem, ion) # Run sampler. if sampler == "emcee": samples = _run_emcee( param_names, obs_ratios, ratio_errors, obs_components, pyneb_atoms, n_walkers=n_walkers, n_steps=n_steps, n_burn=n_burn, seed=seed, progress=progress, ) elif sampler == "dynesty": samples = _run_dynesty( param_names, obs_ratios, ratio_errors, obs_components, pyneb_atoms, n_live=n_live, seed=seed, ) else: raise ValueError( f"Unknown sampler: {sampler!r}. Use 'emcee' or 'dynesty'." ) return _build_forward_result(samples, param_names, ratio_names)