from dataclasses import dataclass, field
from enum import Enum, auto
from fractions import Fraction
from typing import TYPE_CHECKING, Callable, Optional, Sequence, Tuple, cast
import numpy as np
from weno4 import weno4
import lightweaver.constants as Const
from lightweaver.constants import VMICRO_CHAR
from .atomic_table import Element, PeriodicTable
from .broadening import LineBroadening
from .utils import gaunt_bf, sequence_repr
from .zeeman import ZeemanComponents, compute_zeeman_components
if TYPE_CHECKING:
from .atmosphere import Atmosphere
from .atomic_set import SpeciesStateTable
from .collisional_rates import CollisionalRates
[docs]
@dataclass
class AtomicModel:
'''
Container class for the complete description of a model atom.
Attributes
----------
element : Element
The element or ion represented by this model.
levels : list of AtomicLevel
The levels in use in this model.
lines : list of AtomicLine
The atomic lines present in this model.
continua : list of AtomicContinuum
The atomic continua present in this model.
collisions : list of CollisionalRates
The collisional rates present in this model.
'''
element: Element
levels: Sequence['AtomicLevel']
lines: Sequence['AtomicLine']
continua: Sequence['AtomicContinuum']
collisions: Sequence['CollisionalRates']
# @profile
def __post_init__(self):
for l in self.levels:
l.setup(self)
for l in self.lines:
l.setup(self)
for c in self.continua:
c.setup(self)
for c in self.collisions:
c.setup(self)
def __repr__(self):
s = 'AtomicModel(element=%s,\n\tlevels=[\n' % repr(self.element)
for l in self.levels:
s += '\t\t' + repr(l) + ',\n'
s += '\t],\n\tlines=[\n'
for l in self.lines:
s += '\t\t' + repr(l) + ',\n'
s += '\t],\n\tcontinua=[\n'
for c in self.continua:
s += '\t\t' + repr(c) + ',\n'
s += '\t],\n\tcollisions=[\n'
for c in self.collisions:
s += '\t\t' + repr(c) + ',\n'
s += '])\n'
return s
# def __hash__(self):
# return hash(repr(self))
[docs]
def vBroad(self, atmos: 'Atmosphere') -> np.ndarray:
'''
Computes the atomic broadening velocity structure for a given
atmosphere from the thermal motions and microturbulent velocity.
'''
vTherm = 2.0 * Const.KBoltzmann / (Const.Amu * PeriodicTable[self.element].mass)
vBroad = np.sqrt(vTherm * atmos.temperature + atmos.vturb**2)
return vBroad
@property
def transitions(self) -> Sequence['AtomicTransition']:
'''
List of all atomic transitions present on the model.
'''
return self.lines + self.continua # type: ignore
def element_sort(atom: AtomicModel):
return atom.element
[docs]
@dataclass
class AtomicLevel:
'''
Description of atomic level in model atom.
Attributes
----------
E : float
Energy above ground state [cm-1]
g : float
Statistical weight of level
label : str
Name for level
stage : int
Ionisation of level with 0 being neutral
atom : AtomicModel
AtomicModel that holds this level, will be initialised by the atom.
J : Fraction, optional
Total quantum angular momentum.
L : int, optional
Orbital angular momentum.
S : Fraction, optional
Spin.
'''
E: float
g: float
label: str
stage: int
atom: AtomicModel = field(init=False)
J: Optional[Fraction] = None
L: Optional[int] = None
S: Optional[Fraction] = None
def setup(self, atom):
self.atom = atom
def __hash__(self):
return hash((self.E, self.g, self.label, self.stage, self.J, self.L, self.S))
def __eq__(self, other: object) -> bool:
if isinstance(other, AtomicLevel):
return hash(self) == hash(other)
return False
@property
def lsCoupling(self) -> bool:
'''
Returns whether the L-S coupling formalism can be applied to this
level.
'''
if all(x is not None for x in (self.J, self.L, self.S)):
J = cast(Fraction, self.J)
L = cast(int, self.L)
S = cast(Fraction, self.S)
if J <= L + S:
return True
return False
@property
def E_SI(self):
'''
Returns E in Joule.
'''
return self.E * Const.HC / Const.CM_TO_M
@property
def E_eV(self):
'''
Returns E in electron volt.
'''
return self.E_SI / Const.EV
def __repr__(self):
s = ('AtomicLevel(E=%10.3f, g=%g, label="%s", stage=%d, '
'J=%s, L=%s, S=%s)') % (self.E, self.g, self.label, self.stage,
repr(self.J), repr(self.L), repr(self.S))
return s
[docs]
class LineType(Enum):
'''
Enum to show if the line should be treated in CRD or PRD.
'''
CRD = 0
PRD = auto()
def __repr__(self):
if self == LineType.CRD:
return 'LineType.CRD'
elif self == LineType.PRD:
return 'LineType.PRD'
else:
raise ValueError('Unknown LineType in LineType.__repr__')
[docs]
@dataclass
class LineQuadrature:
'''
Describes the wavelength quadrature to be used for integrating properties
associated with a line.
'''
def setup(self, line: 'AtomicLine'):
pass
[docs]
def doppler_units(self, line: 'AtomicLine') -> np.ndarray:
'''
Return the quadrature in Doppler units.
'''
raise NotImplementedError
[docs]
def wavelength(self, line: 'AtomicLine', vMicroChar: float=Const.VMICRO_CHAR) -> np.ndarray:
'''
Return the quadrature in nm.
'''
raise NotImplementedError
def __repr__(self):
raise NotImplementedError
def __hash__(self):
raise NotImplementedError
[docs]
@dataclass
class LinearQuadrature(LineQuadrature):
"""
Simple linearly spaced wavelength grid. Primarily provided for CRTAF
interaction.
Nlambda : int
The number of wavelength points in the wavelength grid (typically odd).
deltaLambda : int
The half-width of the grid (i.e. from core to one edge) [nm].
"""
Nlambda: int
deltaLambda: float
def __repr__(self):
s = '%s(Nlambda=%d, deltaLambda=%g)' % (type(self).__name__,
self.Nlambda, self.deltaLambda)
return s
[docs]
def wavelength(self, line: "AtomicLine", vMicroChar: float = Const.VMICRO_CHAR) -> np.ndarray:
return np.linspace(line.lambda0 - self.deltaLambda, line.lambda0 + self.deltaLambda, self.Nlambda)
[docs]
def doppler_units(self, line: "AtomicLine") -> np.ndarray:
wavelength_grid = self.wavelength(line)
vMicroChar = VMICRO_CHAR
qToLambda = line.lambda0 * (vMicroChar / Const.CLight)
return (wavelength_grid - line.lambda0) / qToLambda
[docs]
@dataclass
class TabulatedQuadrature(LineQuadrature):
"""
Tabulated wavelength quadrature. Primarily provided for CRTAF interaction.
wavelengthGrid : Sequence[float]
The wavelength sample points [nm].
"""
wavelengthGrid: Sequence[float]
def __repr__(self):
s = '%s(wavelengthGrid=%s)' % (type(self).__name__, sequence_repr(self.wavelengthGrid))
return s
[docs]
def wavelength(self, line: "AtomicLine", vMicroChar: float = Const.VMICRO_CHAR) -> np.ndarray:
return np.ascontiguousarray(self.wavelengthGrid) + line.lambda0
[docs]
def doppler_units(self, line: "AtomicLine") -> np.ndarray:
wavelength_grid = self.wavelength(line)
vMicroChar = VMICRO_CHAR
qToLambda = line.lambda0 * (vMicroChar / Const.CLight)
return (wavelength_grid - line.lambda0) / qToLambda
[docs]
@dataclass
class LinearCoreExpWings(LineQuadrature):
"""
RH-Style line quadrature, with approximately linear core spacing and
exponential wing spacing, by using a function of the form
q(n) = a*(n + (exp(b*n)-1))
with n in [0, N) satisfying the following conditions:
- q[0] = 0
- q[(N-1)/2] = qcore
- q[N-1] = qwing.
If qWing <= 2 * qCore, linear grid spacing will be used for this transition.
"""
qCore: float
qWing: float
Nlambda: int
beta: float = field(init=False)
def __repr__(self):
s = '%s(qCore=%g, qWing=%g, Nlambda=%d)' % (type(self).__name__,
self.qCore, self.qWing, self.Nlambda)
return s
def __hash__(self):
return hash((self.qCore, self.qWing, self.Nlambda))
def setup(self, line: 'AtomicLine'):
if self.qWing <= 2.0 * self.qCore:
# Use linear scale to qWing
self.beta = 1.0
else:
self.beta = self.qWing / (2.0 * self.qCore)
[docs]
def doppler_units(self, line: 'AtomicLine') -> np.ndarray:
Nlambda = self.Nlambda // 2 if self.Nlambda % 2 == 1 else (self.Nlambda - 1) // 2
Nlambda += 1
beta = self.beta
y = beta + np.sqrt(beta**2 + (beta - 1.0) * Nlambda + 2.0 - 3.0 * beta)
b = 2.0 * np.log(y) / (Nlambda - 1)
a = self.qWing / (Nlambda - 2.0 + y**2)
nl = np.arange(Nlambda)
q: np.ndarray = a * (nl + (np.exp(b * nl) - 1.0))
NlambdaFull = 2 * Nlambda - 1
result = np.zeros(NlambdaFull)
Nmid = Nlambda - 1
result[:Nmid][::-1] = -q[1:]
result[Nmid+1:] = q[1:]
return result
[docs]
def wavelength(self, line: 'AtomicLine', vMicroChar=Const.VMICRO_CHAR) -> np.ndarray:
qToLambda = line.lambda0 * (vMicroChar / Const.CLight)
result = self.doppler_units(line)
result *= qToLambda
result += line.lambda0
return result
[docs]
@dataclass
class AtomicTransition:
'''
Basic storage class for atomic transitions. Both lines and continua are
derived from this.
'''
j: int
i: int
atom: AtomicModel = field(init=False)
jLevel: AtomicLevel = field(init=False)
iLevel: AtomicLevel = field(init=False)
def setup(self, atom: AtomicModel):
if self.j < self.i:
self.i, self.j = self.j, self.i
self.atom = atom
self.jLevel: AtomicLevel = self.atom.levels[self.j]
self.iLevel: AtomicLevel = self.atom.levels[self.i]
def __hash__(self):
raise NotImplementedError
def __eq__(self, other: object) -> bool:
if other is self:
return True
return repr(self) == repr(other)
def wavelength(self) -> np.ndarray:
raise NotImplementedError
@property
def lambda0(self) -> float:
raise NotImplementedError
@property
def lambda0_m(self) -> float:
raise NotImplementedError
@property
def transId(self) -> Tuple[Element, int, int]:
'''
Unique identifier (transition ID) for transition (assuming one copy
of each Element), used in creating a SpectrumConfiguration etc.
'''
return (self.atom.element, self.i, self.j)
[docs]
@dataclass
class LineProfileState:
'''
Dataclass used to communicate line profile calculations from the backend
to the frontend whilst allowing the backend to provide an overrideable
optimised voigt implementation for the default case.
Attributes
----------
wavelength : np.ndarray
Wavelengths at which to compute the line profile [nm]
vlosMu : np.ndarray
Bulk velocity projected onto each ray in the angular integration scheme
[m/s] in an array of [Nmu, Nspace].
atmos : Atmosphere
The associated atmosphere.
eqPops : SpeciesStateTable
The associated populations for each species present in the simulation.
default_voigt_callback : callable
Computes the Voigt profile for the default case, takes the damping
parameter aDamp and broadening velocity vBroad as arguments, and
returns the line profile phi (in this case phi_num in the tech report).
vBroad : np.ndarray, optional
Cache to avoid recomputing vBroad every time. May be None.
'''
wavelength: np.ndarray
vlosMu: np.ndarray
atmos: 'Atmosphere'
eqPops: 'SpeciesStateTable'
default_voigt_callback: Callable[[np.ndarray, np.ndarray], np.ndarray]
vBroad: Optional[np.ndarray]=None
[docs]
@dataclass
class LineProfileResult:
'''
Dataclass for returning the line profile and associated data that needs
to be saved (damping parameter and elastic collision rate) from the
frontend to the backend.
'''
phi: np.ndarray
aDamp: np.ndarray
Qelast: np.ndarray
[docs]
@dataclass(eq=False)
class AtomicLine(AtomicTransition):
'''
Base class for atomic lines, holding their specialised information over
transitions.
Attributes
----------
f : float
Oscillator strength.
type : LineType
Should the line be treated in PRD or CRD.
quadrature : LineQuadrature
Wavelength quadrature for integrating line properties over.
broadening : LineBroadening
Object describing the broadening processes to be used in conjunction
with the quadrature to generate the line profile.
gLandeEff : float, optional
Optionally override LS-coupling (if available for this transition),
and just directly set the effective Lande g factor (if it isn't).
'''
f: float
type: LineType
quadrature: LineQuadrature
broadening: LineBroadening
gLandeEff: Optional[float] = None
def setup(self, atom: AtomicModel):
super().setup(atom)
self.quadrature.setup(self)
self.broadening.setup(self)
def __repr__(self):
s = '%s(j=%d, i=%d, f=%9.3e, type=%s, quadrature=%s, broadening=%s' % (
type(self).__name__,
self.j, self.i, self.f, repr(self.type),
repr(self.quadrature), repr(self.broadening))
if self.gLandeEff is not None:
s += ', gLandeEff=%e' % self.gLandeEff
s += ')'
return s
def __hash__(self):
return hash(repr(self))
[docs]
def wavelength(self, vMicroChar=Const.VMICRO_CHAR) -> np.ndarray:
'''
Returns the wavelength grid for this transition based on the
LineQuadrature.
Parameters
----------
vMicroChar : float, optional
Characterisitc microturbulent velocity to assume when computing
the line quadrature (default 3e3 m/s).
'''
return self.quadrature.wavelength(self, vMicroChar=vMicroChar)
[docs]
def zeeman_components(self) -> Optional[ZeemanComponents]:
'''
Returns the Zeeman components of a line, if possible or None.
'''
return compute_zeeman_components(self)
[docs]
def compute_phi(self, state: LineProfileState) -> LineProfileResult:
'''
Compute the line profile, intended to be called from the backend.
'''
raise NotImplementedError
@property
def overlyingContinuumLevel(self) -> AtomicLevel:
'''
Find the first overlying continuum level.
'''
Z = self.jLevel.stage + 1
j = self.j
ic = j + 1
try:
while self.atom.levels[ic].stage < Z:
ic += 1
cont = self.atom.levels[ic]
return cont
except IndexError:
raise ValueError('No overlying continuum level found for line %s' % repr(self))
@property
def lambda0(self) -> float:
'''
Return the line rest wavelength [nm].
'''
return self.lambda0_m / Const.NM_TO_M
@property
def lambda0_m(self) -> float:
'''
Return the line rest wavelength [m].
'''
deltaE = self.jLevel.E_SI - self.iLevel.E_SI
return Const.HC / deltaE
@property
def Aji(self) -> float:
'''
Return the Einstein A coefficient for this line.
'''
gRatio = self.iLevel.g / self.jLevel.g
C: float = 2 * np.pi * (Const.QElectron / Const.Epsilon0) \
* (Const.QElectron / Const.MElectron) / Const.CLight
return C / self.lambda0_m**2 * gRatio * self.f
@property
def Bji(self) -> float:
'''
Return the Einstein B_{ji} coefficient for this line.
'''
return self.lambda0_m**3 / (2.0 * Const.HC) * self.Aji
@property
def Bij(self) -> float:
'''
Return the Einstein B_{ij} coefficient for this line.
'''
return self.jLevel.g / self.iLevel.g * self.Bji
@property
def polarisable(self) -> bool:
'''
Return whether sufficient information is available to compute full
Stokes solutions for this line.
'''
return (self.iLevel.lsCoupling and self.jLevel.lsCoupling) or (self.gLandeEff is not None)
[docs]
@dataclass(eq=False, repr=False)
class VoigtLine(AtomicLine):
'''
Specialised line profile for the default case of a Voigt profile.
'''
[docs]
def damping(self, atmos: 'Atmosphere', eqPops: 'SpeciesStateTable',
vBroad: Optional[np.ndarray]=None):
'''
Computes the damping parameter and elastic collision rate.
Parameters
----------
atmos : Atmosphere
The atmosphere to consider.
eqPops : SpeciesStateTable
The populations in this atmosphere.
vBroad : np.ndarray, optional
The broadening velocity, will be used if passed, or computed
using atom.vBroad if not.
Returns
-------
aDamp : np.ndarray
The Voigt damping parameter.
Qelast : np.ndarray
The rate of elastic collisions broadening the line -- needed for PRD.
'''
Qs = self.broadening.broaden(atmos, eqPops)
if vBroad is None:
vBroad = self.atom.vBroad(atmos)
cDop = self.lambda0_m / (4.0 * np.pi)
aDamp = (Qs.natural + Qs.Qelast) * cDop / vBroad
return aDamp, Qs.Qelast
[docs]
def compute_phi(self, state: LineProfileState) -> LineProfileResult:
'''
Computes the line profile.
In the case of a VoigtLine the line profile simply uses the
default_voigt_callback from the backend.
Parameters
----------
state : LineProfileState
The information from the backend
Returns
-------
result : LineProfileResult
The line profile, as well as the damping parameter 'a' and and
the broadening velocity.
'''
vBroad = self.atom.vBroad(state.atmos) if state.vBroad is None else state.vBroad
aDamp, Qelast = self.damping(state.atmos, state.eqPops, vBroad=vBroad)
cb = state.default_voigt_callback
# NOTE(cmo): This is affected by mypy #5485, so we ignore typing for now
phi = state.default_voigt_callback(aDamp, vBroad) # type: ignore
return LineProfileResult(phi=phi, aDamp=aDamp, Qelast=Qelast)
[docs]
@dataclass(eq=False)
class AtomicContinuum(AtomicTransition):
'''
Base class for atomic continua.
'''
def setup(self, atom: AtomicModel):
super().setup(atom)
def __repr__(self):
s = 'AtomicContinuum(j=%d, i=%d)' % (self.j, self.i)
return s
def __hash__(self):
return hash(repr(self))
[docs]
def alpha(self, wavelength: np.ndarray) -> np.ndarray:
'''
Returns the cross-section as a function of wavelength
Parameters
----------
wavelength : np.ndarray
The wavelengths at which to compute the cross-section
Returns
-------
alpha : np.ndarray
The cross-section for each wavelength
'''
raise NotImplementedError
[docs]
def wavelength(self) -> np.ndarray:
'''
The wavelength grid on which this continuum's cross section is defined.
'''
raise NotImplementedError
@property
def minLambda(self) -> float:
'''
The minimum wavelength at which this transition contributes.
'''
raise NotImplementedError
@property
def lambda0(self) -> float:
'''
The maximum (edge) wavelength at which this transition contributes [nm].
'''
return self.lambda0_m / Const.NM_TO_M
@property
def lambdaEdge(self) -> float:
'''
The maximum (edge) wavelength at which this transition contributes [nm].
'''
return self.lambda0
@property
def lambda0_m(self) -> float:
'''
The maximum (edge) wavelength at which this transition contributes [m].
'''
deltaE = self.jLevel.E_SI - self.iLevel.E_SI
return Const.HC / deltaE
@property
def polarisable(self) -> bool:
'''
Returns whether this continuum is polarisable, always False.
'''
return False
[docs]
@dataclass(eq=False)
class ExplicitContinuum(AtomicContinuum):
'''
Specific version of atomic continuum with tabulated cross-section against
wavelength. Interpolated using weno4.
Attributes
----------
wavelengthGrid : list of float
Wavelengths at which cross-section is tabulated [nm].
alphaGrid : list of float
Tabulated cross-sections [m2].
'''
wavelengthGrid: Sequence[float]
alphaGrid: Sequence[float]
def setup(self, atom: AtomicModel):
super().setup(atom)
self.wavelengthGrid = np.asarray(self.wavelengthGrid) # type: ignore
if not np.all(np.diff(self.wavelengthGrid) > 0.0):
raise ValueError(('Wavelength array not monotonically'
' increasing in continuum %s') % repr(self))
self.alphaGrid = np.asarray(self.alphaGrid) # type: ignore
if self.lambdaEdge - self.wavelengthGrid[-1] > 0.01:
wav = np.concatenate((self.wavelengthGrid, np.array([self.lambdaEdge])))
self.wavelengthGrid = wav
self.alphaGrid = np.concatenate((self.alphaGrid, np.array([self.alphaGrid[-1]])))
def __repr__(self):
s = 'ExplicitContinuum(j=%d, i=%d, wavelengthGrid=%s, alphaGrid=%s)' % (self.j, self.i,
sequence_repr(self.wavelengthGrid), sequence_repr(self.alphaGrid))
return s
[docs]
def alpha(self, wavelength: np.ndarray) -> np.ndarray:
'''
Computes cross-section as a function of wavelength.
Parameters
----------
wavelength : np.ndarray
Wavelengths at which to compute the cross-section [nm].
Returns
-------
alpha : np.ndarray
Cross-section at associated wavelength.
'''
alpha = weno4(wavelength, self.wavelengthGrid, self.alphaGrid, left=0.0, right=0.0)
alpha[wavelength < self.minLambda] = 0.0
alpha[wavelength > self.lambdaEdge] = 0.0
alpha[alpha < 0.0] = 0.0
return alpha
[docs]
def wavelength(self) -> np.ndarray:
'''
Returns the wavelength grid at which this transition needs to be
computed to be correctly integrated. Specific handling is added to
ensure that it is treated properly close to the edge.
'''
grid = cast(np.ndarray, self.wavelengthGrid)
edge = self.lambdaEdge
result = np.copy(grid[(grid >= self.minLambda) & (grid <= edge)])
# NOTE(cmo): If the last value before the edge is more than 0.1 nm away
# then put the edge in.
if edge - grid[-1] > 0.1:
result = np.concatenate((result, (edge,)))
return result
@property
def minLambda(self) -> float:
'''
The minimum wavelength at which this transition contributes.
'''
return self.wavelengthGrid[0]
[docs]
@dataclass(eq=False)
class HydrogenicContinuum(AtomicContinuum):
'''
Specific case of a Hydrogenic continuum, approximately falling off as
1/nu**3 towards higher frequencies (additional effects from Gaunt
factor).
Attributes
----------
NlambaGen : int
The number of points to generate for the wavelength grid.
alpha0 : float
The cross-section at the edge wavelength [m2].
minWavelength : float
The minimum wavelength below which this transition is assumed to no
longer contribute [nm].
'''
NlambdaGen: int
alpha0: float
minWavelength: float
def __repr__(self):
s = ('HydrogenicContinuum(j=%d, i=%d, NlambdaGen=%d, alpha0=%g,'
' minWavelength=%g)') % (self.j, self.i, self.NlambdaGen, self.alpha0,
self.minWavelength)
return s
def setup(self, atom):
super().setup(atom)
if self.minLambda >= self.lambda0:
raise ValueError(('Minimum wavelength is larger than continuum edge '
'at %g [nm] in continuum %s') % (self.lambda0, repr(self)))
[docs]
def alpha(self, wavelength: np.ndarray) -> np.ndarray:
'''
Computes cross-section as a function of wavelength.
Parameters
----------
wavelength : np.ndarray
Wavelengths at which to compute the cross-section [nm].
Returns
-------
alpha : np.ndarray
Cross-section at associated wavelength.
'''
Z = self.jLevel.stage
nEff = Z * np.sqrt(Const.ERydberg / (self.jLevel.E_SI - self.iLevel.E_SI))
gbf0 = gaunt_bf(self.lambda0, nEff, Z)
gbf = gaunt_bf(wavelength, nEff, Z)
alpha = self.alpha0 * gbf / gbf0 * (wavelength / self.lambda0)**3
alpha[wavelength < self.minLambda] = 0.0
alpha[wavelength > self.lambdaEdge] = 0.0
return alpha
[docs]
def wavelength(self) -> np.ndarray:
'''
Returns the wavelength grid at which this transition needs to be
computed to be correctly integrated.
'''
return np.linspace(self.minLambda, self.lambdaEdge, self.NlambdaGen)
@property
def minLambda(self) -> float:
'''
The minimum wavelength at which this transition contributes.
'''
return self.minWavelength