import warnings
from collections.abc import Callable
from mpi4py import MPI
from petsc4py import PETSc
import adios4dolfinx
import basix
import dolfinx
import numpy as np
import numpy.typing as npt
import tqdm.auto
import ufl
from dolfinx import fem
from packaging.version import Version
from scifem import BlockedNewtonSolver
from festim import (
boundary_conditions,
exports,
k_B,
problem,
)
from festim import (
reaction as _reaction,
)
from festim import source as _source
from festim import (
species as _species,
)
from festim import (
subdomain as _subdomain,
)
from festim.advection import AdvectionTerm
from festim.helpers import (
KSPMonitor,
SnesMonitor,
as_fenics_constant,
convergenceTest,
get_interpolation_points,
is_it_time_to_export,
nmm_interpolate,
)
from .mesh import CoordinateSystem, Mesh
__all__ = [
"HydrogenTransportProblem",
"HydrogenTransportProblemDiscontinuous",
]
[docs]
class HydrogenTransportProblem(problem.ProblemBase):
"""Hydrogen Transport Problem.
Args:
mesh: The mesh
subdomains: list containing the subdomains
species: list containing the species
reactions: list containing the reactions
temperature: The temperature or a function describing the temperature as
a model of either space or space and time. Unit (K)
sources: The hydrogen sources
initial_conditions: The initial conditions
boundary_conditions: The boundary conditions
exports (list of festim.Export): the exports of the model
traps (list of F.Trap): the traps of the model
advection_terms: the advection terms of the model
Attributes:
mesh : The mesh
subdomains: The subdomains
species: The species
reactions: the reaction
temperature: The temperature in unit `K`
sources: The hydrogen sources
initial_conditions: The initial conditions
boundary_conditions: list of Dirichlet boundary conditions
exports (list of festim.Export): the export
traps (list of F.Trap): the traps of the model
advection_terms: the advection terms of the model
dx (dolfinx.fem.dx): the volume measure of the model
ds (dolfinx.fem.ds): the surface measure of the model
function_space (dolfinx.fem.FunctionSpaceBase): the function space of the
model
facet_meshtags (dolfinx.mesh.MeshTags): the facet meshtags of the model
volume_meshtags (dolfinx.mesh.MeshTags): the volume meshtags of the
model
formulation (ufl.form.Form): the formulation of the model
solver (dolfinx.nls.newton.NewtonSolver): the solver of the model
temperature_fenics (fem.Constant or fem.Function): the
temperature of the model as a fenics object (fem.Constant or
fem.Function).
temperature_expr (fem.Expression): the expression of the temperature
that is used to update the temperature_fenics
temperature_time_dependent (bool): True if the temperature is time
dependent
V_DG_0 (dolfinx.fem.FunctionSpaceBase): A DG function space of degree 0
over domain
V_DG_1 (dolfinx.fem.FunctionSpaceBase): A DG function space of degree 1
over domain
volume_subdomains (list of festim.VolumeSubdomain): the volume subdomains
of the model
surface_subdomains (list of festim.SurfaceSubdomain): the surface subdomains
of the model
Examples:
Can be used as either
.. highlight:: python
.. code-block:: python
import festim as F
my_model = F.HydrogenTransportProblem()
my_model.mesh = F.Mesh(...)
my_model.subdomains = [F.Subdomain(...)]
my_model.species = [F.Species(name="H"), F.Species(name="Trap")]
my_model.temperature = 500
my_model.sources = [F.ParticleSource(...)]
my_model.boundary_conditions = [F.BoundaryCondition(...)]
my_model.initialise()
or
.. highlight:: python
.. code-block:: python
my_model = F.HydrogenTransportProblem(
mesh=F.Mesh(...),
subdomains=[F.Subdomain(...)],
species=[F.Species(name="H"), F.Species(name="Trap")],
)
my_model.initialise()
"""
_temperature_as_function: fem.Function
_species_to_D_global: dict[_species.Species, fem.Function]
_species_to_D_global_expr: dict[_species.Species, fem.Expression]
def __init__(
self,
mesh: Mesh | None = None,
subdomains: (
list[_subdomain.VolumeSubdomain | _subdomain.SurfaceSubdomain] | None
) = None,
species: list[_species.Species] | None = None,
reactions: list[_reaction.Reaction] | None = None,
temperature: (
float
| int
| fem.Constant
| fem.Function
| Callable[
[npt.NDArray[dolfinx.default_scalar_type]], # type: ignore
npt.NDArray[dolfinx.default_scalar_type], # type: ignore
]
| Callable[
[npt.NDArray[dolfinx.default_scalar_type], fem.Constant], # type: ignore
npt.NDArray[dolfinx.default_scalar_type], # type: ignore
]
| None
) = None,
sources=None,
initial_conditions=None,
boundary_conditions=None,
settings=None,
exports=None,
traps=None,
advection_terms=None,
petsc_options=None,
element_immobile: str = "CG",
):
super().__init__(
mesh=mesh,
sources=sources,
exports=exports,
subdomains=subdomains,
boundary_conditions=boundary_conditions,
settings=settings,
petsc_options=petsc_options,
)
self.species = species or []
self.temperature = temperature
self.reactions = reactions or []
self.initial_conditions = initial_conditions or []
self.traps = traps or []
self.advection_terms = advection_terms or []
self.temperature_fenics = None
self._element_immobile = element_immobile
self._temperature_as_function = None
self._species_to_D_global = None
self._species_to_D_global_expr = None
@property
def temperature(self):
return self._temperature
@temperature.setter
def temperature(self, value):
if value is None:
self._temperature = value
elif isinstance(value, float | int | fem.Constant | fem.Function):
self._temperature = value
elif callable(value):
self._temperature = value
else:
raise TypeError(
"Value must be a float, int, fem.Constant, fem.Function, or callable"
)
@property
def temperature_fenics(self):
return self._temperature_fenics
@temperature_fenics.setter
def temperature_fenics(self, value):
if value is None:
self._temperature_fenics = value
return
elif not isinstance(
value,
fem.Constant | fem.Function,
):
raise TypeError("Value must be a fem.Constant or fem.Function")
self._temperature_fenics = value
@property
def temperature_time_dependent(self):
if self.temperature is None:
return False
if isinstance(self.temperature, fem.Constant | fem.Function):
return False
if callable(self.temperature):
arguments = self.temperature.__code__.co_varnames
return "t" in arguments
else:
return False
@property
def species(self) -> list[_species.Species]:
return self._species
@species.setter
def species(self, value):
# check that all species are of type festim.Species
for spe in value:
if not isinstance(spe, _species.Species):
raise TypeError(
f"elements of species must be of type festim.Species not "
f"{type(spe)}"
)
self._species = value
@property
def facet_meshtags(self):
return self._facet_meshtags
@facet_meshtags.setter
def facet_meshtags(self, value):
if value is None:
self._facet_meshtags = value
elif isinstance(value, dolfinx.mesh.MeshTags):
self._facet_meshtags = value
else:
raise TypeError("value must be of type dolfinx.mesh.MeshTags")
@property
def volume_meshtags(self):
return self._volume_meshtags
@volume_meshtags.setter
def volume_meshtags(self, value):
if value is None:
self._volume_meshtags = value
elif isinstance(value, dolfinx.mesh.MeshTags):
self._volume_meshtags = value
else:
raise TypeError("value must be of type dolfinx.mesh.MeshTags")
@property
def _unpacked_bcs(self):
"""Returns all boundary conditions, including fluxes from surface reactions."""
all_boundary_conditions = []
for bc in self.boundary_conditions:
if isinstance(bc, boundary_conditions.SurfaceReactionBC):
all_boundary_conditions.extend(bc.flux_bcs)
else:
all_boundary_conditions.append(bc)
return all_boundary_conditions
@property
def element_immobile(self):
return self._element_immobile
@element_immobile.setter
def element_immobile(self, value):
allowed_values = ["DG", "CG", "P"]
if value not in allowed_values:
raise ValueError(f"element_immobile should be in {allowed_values}")
if value == "P":
value = "CG"
self._element_immobile = value
def initialise(self):
self.create_species_from_traps()
self.define_function_spaces(element_degree=self.settings.element_degree)
self.define_meshtags_and_measures()
self.assign_functions_to_species()
self.t = fem.Constant(self.mesh.mesh, 0.0)
if self.settings.transient:
# TODO should raise error if no stepsize is provided
# TODO Should this be an attribute of festim.Stepsize?
self._dt = as_fenics_constant(
self.settings.stepsize.initial_value, self.mesh.mesh
)
self.create_implicit_species_value_fenics()
self.define_temperature()
self.define_boundary_conditions()
self.convert_source_input_values_to_fenics_objects()
self.convert_advection_term_to_fenics_objects()
self.create_flux_values_fenics()
self.create_initial_conditions()
self.create_formulation()
self.create_solver()
self.initialise_exports()
[docs]
def create_implicit_species_value_fenics(self):
"""For each implicit species, create the value_fenics."""
for reaction in self.reactions:
for reactant in reaction.reactant:
if isinstance(reactant, _species.ImplicitSpecies):
reactant.create_value_fenics(
mesh=self.mesh.mesh,
t=self.t,
)
[docs]
def create_species_from_traps(self):
"""Generate a species and reaction per trap defined in self.traps."""
for trap in self.traps:
trap.create_species_and_reaction()
self.species.append(trap.trapped_concentration)
self.reactions.append(trap.reaction)
[docs]
def define_temperature(self):
"""Sets the value of temperature_fenics_value.
The type depends on self.temperature. If self.temperature is a function on t
only, create a fem.Constant. Else, create an dolfinx.fem.Expression (stored in
self.temperature_expr) to be updated, a dolfinx.fem.Function object is created
from the Expression (stored in self.temperature_fenics_value). Raise a
ValueError if temperature is None.
"""
# check if temperature is None
if self.temperature is None:
raise ValueError("the temperature attribute needs to be defined")
# if temperature is a float or int, create a fem.Constant
elif isinstance(self.temperature, float | int):
self.temperature_fenics = as_fenics_constant(
self.temperature, self.mesh.mesh
)
# if temperature is a fem.Constant or function, pass it to temperature_fenics
elif isinstance(self.temperature, fem.Constant | fem.Function):
self.temperature_fenics = self.temperature
# if temperature is callable, process accordingly
elif callable(self.temperature):
arguments = self.temperature.__code__.co_varnames
if "t" in arguments and "x" not in arguments:
if not isinstance(self.temperature(t=float(self.t)), float | int):
raise ValueError(
f"self.temperature should return a float or an int, not "
f"{type(self.temperature(t=float(self.t)))} "
)
# only t is an argument
self.temperature_fenics = as_fenics_constant(
mesh=self.mesh.mesh, value=self.temperature(t=float(self.t))
)
else:
x = ufl.SpatialCoordinate(self.mesh.mesh)
degree = 1
element_temperature = basix.ufl.element(
basix.ElementFamily.P,
self.mesh.mesh.basix_cell(),
degree,
basix.LagrangeVariant.equispaced,
)
function_space_temperature = fem.functionspace(
self.mesh.mesh, element_temperature
)
self.temperature_fenics = fem.Function(function_space_temperature)
self.temperature_fenics.name = "temperature"
kwargs = {}
if "t" in arguments:
kwargs["t"] = self.t
if "x" in arguments:
kwargs["x"] = x
# store the expression of the temperature
# to update the temperature_fenics later
self.temperature_expr = fem.Expression(
self.temperature(**kwargs),
get_interpolation_points(function_space_temperature.element),
)
self.temperature_fenics.interpolate(self.temperature_expr)
[docs]
def initialise_exports(self):
"""Defines the export writers of the model, if field is given as a string, find
species object in self.species."""
for export in self.exports:
if isinstance(export, exports.ExportBaseClass):
if export.times:
for time in export.times:
if time not in self.settings.stepsize.milestones:
msg = "To ensure that the exports data at the desired times"
msg += "the values in export.times are added to milestones"
warnings.warn(msg)
self.settings.stepsize.milestones.append(time)
self.settings.stepsize.milestones.sort()
if isinstance(export, exports.VTXTemperatureExport):
self._temperature_as_function = (
self._get_temperature_field_as_function()
)
export.writer = dolfinx.io.VTXWriter(
comm=self._temperature_as_function.function_space.mesh.comm,
filename=export.filename,
output=self._temperature_as_function,
engine="BP5",
)
continue
elif isinstance(export, exports.VTXSpeciesExport):
functions = export.get_functions()
if not export._checkpoint:
export.writer = dolfinx.io.VTXWriter(
comm=functions[0].function_space.mesh.comm,
filename=export.filename,
output=functions,
engine="BP5",
)
else:
adios4dolfinx.write_mesh(export.filename, mesh=self.mesh.mesh)
elif isinstance(export, exports.CustomFieldExport):
export.function = fem.Function(self.V_CG_1)
export.set_dolfinx_expression(
temperature=self.temperature_fenics,
time=self.t,
)
export.writer = dolfinx.io.VTXWriter(
comm=export.function.function_space.mesh.comm,
filename=export.filename,
output=export.function,
engine="BP5",
)
continue
elif isinstance(export, exports.DerivedQuantity):
# raise not implemented error if the derived quantity don't match the
# type of mesh eg. SurfaceFlux is used with cylindrical mesh
if self.mesh.coordinate_system != CoordinateSystem.CARTESIAN:
raise NotImplementedError(
f"Derived quantity exports are not implemented for "
f"{self.mesh.coordinate_system!s} meshes"
)
# if name of species is given then replace with species object
if hasattr(export, "field"):
if isinstance(export.field, list):
for idx, field in enumerate(export.field):
if isinstance(field, str):
export.field[idx] = _species.find_species_from_name(
field, self.species
)
elif isinstance(export.field, str):
export.field = _species.find_species_from_name(
export.field, self.species
)
# Initialize XDMFFile for writer
if isinstance(export, exports.XDMFExport):
export.define_writer(MPI.COMM_WORLD)
# clean data for profile1D export
if isinstance(export, exports.Profile1DExport):
export.data = []
export.t = []
# compute diffusivity function for surface fluxes
# TODO: probably a better way to handle things would be to follow what's done in
# https://jsdokken.com/dolfinx-tutorial/chapter3/subdomains.html
spe_to_D_global_func_expr = {
spe: self.define_D_global(spe) for spe in self.species if spe.mobile
}
self._species_to_D_global_expr = {
k: v[1] for k, v in spe_to_D_global_func_expr.items()
} # links species to D expression
self._species_to_D_global = {
k: v[0] for k, v in spe_to_D_global_func_expr.items()
} # links species to global D function
for export in self.exports:
if isinstance(export, exports.SurfaceQuantity):
# add the global D to the export
export.D = self._species_to_D_global.get(export.field)
export.D_expr = self._species_to_D_global_expr.get(export.field)
if isinstance(export, exports.MaximumVolume | exports.MinimumVolume):
export.volume_meshtags = self.volume_meshtags
if isinstance(export, exports.MaximumSurface | exports.MinimumSurface):
export.facet_meshtags = self.facet_meshtags
# reset the data and time for SurfaceQuantity and VolumeQuantity
if isinstance(export, exports.DerivedQuantity):
export.t = []
export.data = []
if isinstance(export, exports.CustomQuantity):
kwargs = {
species.name: species.post_processing_solution
for species in self.species
}
kwargs["n"] = ufl.FacetNormal(self.mesh.mesh)
kwargs["t"] = self.t
kwargs["T"] = self.temperature_fenics
# NOTE we need to change our D_global approach
D_kwargs = {
f"D_{sp.name}": self._species_to_D_global[sp] for sp in self.species
}
kwargs.update(D_kwargs)
kwargs["D"] = {sp.name: D_kwargs[f"D_{sp.name}"] for sp in self.species}
if len(self.species) == 1:
kwargs["D"] = kwargs[f"D_{self.species[0].name}"]
kwargs["x"] = ufl.SpatialCoordinate(self.mesh.mesh)
export.ufl_expr = export.expr(**kwargs)
def _get_temperature_field_as_function(self) -> dolfinx.fem.Function:
"""Based on the type of the temperature_fenics attribute, converts it as a
Function to be used in VTX export.
Returns:
the temperature field of the simulation
"""
if isinstance(self.temperature_fenics, fem.Function):
return self.temperature_fenics
elif isinstance(self.temperature_fenics, fem.Constant):
# use existing function space if function already exists
if self._temperature_as_function is None:
V = dolfinx.fem.functionspace(self.mesh.mesh, ("P", 1))
else:
V = self._temperature_as_function.function_space
temperature_field = dolfinx.fem.Function(V)
temperature_expr = fem.Expression(
self.temperature_fenics,
get_interpolation_points(V.element),
)
temperature_field.interpolate(temperature_expr)
return temperature_field
[docs]
def define_D_global(self, species):
"""Defines the global diffusion coefficient for a given species.
Args:
species (F.Species): the species
Returns:
dolfinx.fem.Function, dolfinx.fem.Expression: the global diffusion
coefficient and the expression of the global diffusion coefficient
for a given species
"""
assert isinstance(species, _species.Species)
# create global D function
D = fem.Function(self.V_DG_1)
# if diffusion coeffient has been given as a function, use that
if self.volume_subdomains[0].material.D:
if len(self.volume_subdomains) > 1:
raise NotImplementedError(
"Giving the diffusion coefficient as a function is currently "
"only supported for a single volume subdomain case"
)
return self.volume_subdomains[0].material.D, None
D_0 = fem.Function(self.V_DG_0)
E_D = fem.Function(self.V_DG_0)
for vol in self.volume_subdomains:
cell_indices = self.volume_meshtags.find(vol.id)
# replace values of D_0 and E_D by values from the material
D_0.x.array[cell_indices] = vol.material.get_D_0(species=species)
E_D.x.array[cell_indices] = vol.material.get_E_D(species=species)
expr = D_0 * ufl.exp(
-E_D / as_fenics_constant(k_B, self.mesh.mesh) / self.temperature_fenics
)
D_expr = fem.Expression(expr, get_interpolation_points(self.V_DG_1.element))
D.interpolate(D_expr)
return D, D_expr
[docs]
def define_function_spaces(self, element_degree: int = 1):
"""Creates the function space of the modelw with a mixed element. Creates the
main solution and previous solution function u and u_n. Create global DG
function spaces of degree 0 and 1 for the global diffusion coefficient.
Args:
element_degree: Degree order for finite element. Defaults to 1.
"""
element_CG = basix.ufl.element(
basix.ElementFamily.P,
self.mesh.mesh.basix_cell(),
element_degree,
basix.LagrangeVariant.equispaced,
)
element_DG = basix.ufl.element(
"DG",
self.mesh.mesh.basix_cell(),
element_degree,
basix.LagrangeVariant.equispaced,
)
elements = []
for spe in self.species:
if isinstance(spe, _species.Species):
if spe.mobile:
elements.append(element_CG)
else:
match self._element_immobile:
case "DG":
elements.append(element_DG)
case "CG":
elements.append(element_CG)
element = basix.ufl.mixed_element(elements)
self.function_space = fem.functionspace(self.mesh.mesh, element)
# create global DG function spaces of degree 0 and 1
element_DG0 = basix.ufl.element(
"DG",
self.mesh.mesh.basix_cell(),
0,
basix.LagrangeVariant.equispaced,
)
element_DG1 = basix.ufl.element(
"DG",
self.mesh.mesh.basix_cell(),
1,
basix.LagrangeVariant.equispaced,
)
self.V_DG_0 = fem.functionspace(self.mesh.mesh, element_DG0)
self.V_DG_1 = fem.functionspace(self.mesh.mesh, element_DG1)
self.V_CG_1 = fem.functionspace(self.mesh.mesh, ("CG", 1))
self.u = fem.Function(self.function_space)
self.u_n = fem.Function(self.function_space)
[docs]
def assign_functions_to_species(self):
"""Creates the solution, prev solution, test function and post-processing
solution for each species, as well as a collapsed function space for each
species."""
sub_solutions = list(ufl.split(self.u))
sub_prev_solution = list(ufl.split(self.u_n))
sub_test_functions = list(ufl.TestFunctions(self.function_space))
for idx, spe in enumerate(self.species):
spe.sub_function_space = self.function_space.sub(idx)
spe.sub_function = self.u.sub(idx) # TODO add this to discontinuous class
spe.post_processing_solution = self.u.sub(idx).collapse()
spe.post_processing_solution.name = spe.name
spe.collapsed_function_space, spe.map_sub_to_main_solution = (
self.function_space.sub(idx).collapse()
)
for idx, spe in enumerate(self.species):
spe.solution = sub_solutions[idx]
spe.prev_solution = sub_prev_solution[idx]
spe.test_function = sub_test_functions[idx]
[docs]
def define_boundary_conditions(self):
"""Defines the boundary conditions of the model."""
for bc in self._unpacked_bcs:
if isinstance(bc.species, str):
# if name of species is given then replace with species object
bc.species = _species.find_species_from_name(bc.species, self.species)
if isinstance(bc, boundary_conditions.ParticleFluxBC):
bc.create_value_fenics(
mesh=self.mesh.mesh,
temperature=self.temperature_fenics,
t=self.t,
)
super().define_boundary_conditions()
[docs]
def convert_advection_term_to_fenics_objects(self):
"""For each advection term convert the input value."""
for advec_term in self.advection_terms:
advec_term.velocity.convert_input_value(
function_space=self.function_space, t=self.t
)
[docs]
def create_flux_values_fenics(self):
"""For each particle flux create the value_fenics."""
for bc in self.boundary_conditions:
# create value_fenics for all F.ParticleFluxBC objects
if isinstance(bc, boundary_conditions.ParticleFluxBC):
bc.create_value_fenics(
mesh=self.mesh.mesh,
temperature=self.temperature_fenics,
t=self.t,
)
[docs]
def create_initial_conditions(self):
"""For each initial condition, create the value_fenics and assign it to the
previous solution of the condition's species."""
if len(self.initial_conditions) > 0 and not self.settings.transient:
raise ValueError(
"Initial conditions can only be defined for transient simulations"
)
for condition in self.initial_conditions:
function_space_value = None
if callable(condition.value):
# if bc.value is a callable then need to provide a functionspace
function_space_value = condition.species.collapsed_function_space
condition.create_expr_fenics(
mesh=self.mesh.mesh,
temperature=self.temperature_fenics,
function_space=function_space_value,
)
# assign to previous solution of species
entities = self.volume_meshtags.find(condition.volume.id)
idx = self.species.index(condition.species)
function_to_interpolate_on = self.u_n.sub(idx)
function_to_interpolate_on.interpolate(
condition.expr_fenics, cells0=entities
)
def update_time_dependent_values(self):
super().update_time_dependent_values()
t = float(self.t)
for reaction in self.reactions:
for reactant in reaction.reactant:
if isinstance(reactant, _species.ImplicitSpecies):
reactant.update_density(t=t)
if (
isinstance(self.temperature, fem.Function)
or self.temperature_time_dependent
):
for bc in self.boundary_conditions:
if isinstance(
bc,
boundary_conditions.FixedConcentrationBC
| boundary_conditions.ParticleFluxBC,
):
if bc.temperature_dependent:
bc.update(t=t)
for source in self.sources:
if source.value.temperature_dependent:
source.value.update(t=t)
if self.temperature_time_dependent:
if isinstance(self.temperature_fenics, fem.Constant):
self.temperature_fenics.value = self.temperature(t=t)
elif isinstance(self.temperature_fenics, fem.Function):
self.temperature_fenics.interpolate(self.temperature_expr)
for advec_term in self.advection_terms:
if advec_term.velocity.explicit_time_dependent:
advec_term.velocity.update(t=t)
[docs]
def update_post_processing_solutions(self):
"""Updates the post-processing solutions of each species."""
for spe in self.species:
spe.post_processing_solution.x.array[:] = self.u.x.array[
spe.map_sub_to_main_solution
]
[docs]
def post_processing(self):
"""Post processes the model."""
self.update_post_processing_solutions()
if self.temperature_time_dependent:
# update global D if temperature time dependent or internal
# variables time dependent
# TODO: honestly, we probably don't need to do this at all
# SurfaceFlux quantities should use ufl.Expr for D instead of a fem.Function
for spe, D_global in self._species_to_D_global.items():
D_global.interpolate(self._species_to_D_global_expr[spe])
for export in self.exports:
# skip if it isn't time to export
if hasattr(export, "times"):
if not is_it_time_to_export(
current_time=float(self.t), times=export.times
):
continue
# handle VTX exports
if isinstance(export, exports.ExportBaseClass):
if isinstance(export, exports.VTXSpeciesExport):
if export._checkpoint:
for field in export.field:
adios4dolfinx.write_function(
export.filename,
field.post_processing_solution,
time=float(self.t),
name=field.name,
)
else:
export.writer.write(float(self.t))
elif (
isinstance(export, exports.VTXTemperatureExport)
and self.temperature_time_dependent
):
self._temperature_as_function.interpolate(
self._get_temperature_field_as_function()
)
export.writer.write(float(self.t))
elif isinstance(export, exports.CustomFieldExport):
# update internal function
export.function.interpolate(export.dolfinx_expression)
export.writer.write(float(self.t))
# TODO if export type derived quantity
if isinstance(export, exports.SurfaceQuantity):
if isinstance(
export,
exports.SurfaceFlux | exports.TotalSurface | exports.AverageSurface,
):
if len(self.advection_terms) > 0:
warnings.warn(
"Advection terms are not currently accounted for in the "
"evaluation of surface flux values"
)
export.compute(export.field.solution, self.ds)
else:
export.compute()
# update export data
export.t.append(float(self.t))
# if filename given write export data to file
if export.filename is not None:
export.write(t=float(self.t))
elif isinstance(export, exports.VolumeQuantity):
if isinstance(export, exports.TotalVolume | exports.AverageVolume):
export.compute(u=export.field.solution, dx=self.dx)
else:
export.compute()
# update export data
export.t.append(float(self.t))
# if filename given write export data to file
if export.filename is not None:
export.write(t=float(self.t))
elif isinstance(export, exports.CustomQuantity):
is_surface = isinstance(export.subdomain, _subdomain.SurfaceSubdomain)
measure = self.ds if is_surface else self.dx
export.compute(measure)
# update export data
export.t.append(float(self.t))
# if filename given write export data to file
if export.filename is not None:
export.write(t=float(self.t))
if isinstance(export, exports.XDMFExport):
export.write(float(self.t))
if isinstance(export, exports.Profile1DExport):
# computing dofs at each time step is costly so storing it in the export
if export._dofs is None:
index = self.species.index(export.field)
V0, export._dofs = self.u.function_space.sub(index).collapse()
coords = V0.tabulate_dof_coordinates()[:, 0]
export._sort_coords = np.argsort(coords)
x = coords[export._sort_coords]
export.x = x
c = self.u.x.array[export._dofs][export._sort_coords]
export.data.append(c)
export.t.append(float(self.t))
[docs]
class HydrogenTransportProblemDiscontinuous(HydrogenTransportProblem):
interfaces: list[_subdomain.Interface]
surface_to_volume: dict
_method_interface: _subdomain.interface.InterfaceMethod = (
_subdomain.interface.InterfaceMethod.penalty
)
subdomain_to_species: dict
def __init__(
self,
mesh=None,
subdomains=None,
species=None,
reactions=None,
temperature=None,
sources=None,
initial_conditions=None,
boundary_conditions=None,
settings=None,
exports=None,
traps=None,
interfaces: list[_subdomain.Interface] | None = None,
petsc_options: dict | None = None,
):
"""Class for a multi-material hydrogen transport problem For other arguments see
``festim.HydrogenTransportProblem``.
Args:
interfaces (list, optional): list of interfaces (``festim.Interface``
objects). Defaults to None.
surface_to_volume (dict, optional): correspondance dictionary linking
each ``festim.SurfaceSubdomain`` objects to a ``festim.VolumeSubdomain``
object). Defaults to None.
petsc_options (dict, optional): petsc options to be passed to the
``festim.NewtonSolver`` object. If None, the default options are:
```
default_petsc_options = {
"ksp_type": "preonly",
"pc_type": "lu",
"pc_factor_mat_solver_type": "mumps",
}
```
Defaults to None.
"""
super().__init__(
mesh,
subdomains,
species,
reactions,
temperature,
sources,
initial_conditions,
boundary_conditions,
settings,
exports,
traps,
petsc_options=petsc_options,
)
self.interfaces = interfaces or []
self.surface_to_volume = {}
self.subdomain_to_species = {} # maps subdomain to species defined in it
self.subdomain_to_V_CG1 = {}
@property
def method_interface(self):
# deprecation warning
warnings.warn(
"The method_interface attribute of the Problem class is deprecated, "
"please use the method_interface attribute of each interface instead",
DeprecationWarning,
)
return self._method_interface
@method_interface.setter
def method_interface(self, value):
if isinstance(value, _subdomain.interface.InterfaceMethod):
self._method_interface = value
elif isinstance(value, str):
self._method_interface = _subdomain.interface.InterfaceMethod.from_string(
value
)
else:
raise TypeError("method_interface must be of type str or InterfaceMethod")
def initialise(self):
# if method_interface is given as an attribute of Problem class, then pass it to
# each interface and raise a deprecation warning
if hasattr(self, "method_interface"):
warnings.warn(
"The method_interface attribute of the Problem class is deprecated, "
"please set the method_interface attribute of each interface instead",
DeprecationWarning,
)
for interface in self.interfaces:
interface.method = self.method_interface
# check that all species have a list of F.VolumeSubdomain as this is
# different from F.HydrogenTransportProblem
for spe in self.species:
if not spe.subdomains:
raise ValueError(
f"Species {spe.name} must have a list of subdomains defined "
"in 'subdomains' attribute for discontinuous problem"
)
if not isinstance(spe.subdomains, list):
raise TypeError("subdomains attribute in Species should be list")
self.define_meshtags_and_measures()
if self.surface_to_volume:
# tell users that this is no longer required
warnings.warn(
f"The surface_to_volume attribute of the {self.__class__.__name__}"
" class is no longer required and can be removed."
"The mapping between surface and volume subdomains is now done"
"automatically based on the connectivity of the mesh and the meshtags",
DeprecationWarning,
)
else:
facet_to_cell = self.mesh.mesh.topology.connectivity(
self.mesh.mesh.topology.dim - 1, self.mesh.mesh.topology.dim
)
self.surface_to_volume = _subdomain.map_surface_to_volume_subdomains(
ft=self.facet_meshtags,
ct=self.volume_meshtags,
facet_to_cell=facet_to_cell,
volume_subdomains=self.volume_subdomains,
surface_subdomains=self.surface_subdomains,
comm=self.mesh.mesh.comm,
)
# create submeshes and transfer meshtags to subdomains
for subdomain in self.volume_subdomains:
subdomain.create_subdomain(self.mesh.mesh, self.volume_meshtags)
subdomain.transfer_meshtag(self.mesh.mesh, self.facet_meshtags)
for interface in self.interfaces:
interface.mt = self.volume_meshtags
interface.parent_mesh = self.mesh.mesh
self.create_species_from_traps()
self.t = fem.Constant(self.mesh.mesh, 0.0)
if self.settings.transient:
# TODO should raise error if no stepsize is provided
# TODO Should this be an attribute of festim.Stepsize?
self._dt = as_fenics_constant(
self.settings.stepsize.initial_value, self.mesh.mesh
)
self.create_implicit_species_value_fenics()
for subdomain in self.volume_subdomains:
self.define_function_spaces(subdomain)
# create global DG function spaces of degree 0 and 1
element_DG0 = basix.ufl.element(
"DG",
self.mesh.mesh.basix_cell(),
0,
basix.LagrangeVariant.equispaced,
)
element_DG1 = basix.ufl.element(
"DG",
self.mesh.mesh.basix_cell(),
1,
basix.LagrangeVariant.equispaced,
)
self.V_DG_0 = fem.functionspace(self.mesh.mesh, element_DG0)
self.V_DG_1 = fem.functionspace(self.mesh.mesh, element_DG1)
self.define_temperature()
self.convert_source_input_values_to_fenics_objects()
self.convert_advection_term_to_fenics_objects()
self.define_boundary_conditions()
self.create_flux_values_fenics()
self.create_initial_conditions()
for subdomain in self.volume_subdomains:
self.create_subdomain_formulation(subdomain)
subdomain.u.name = f"u_{subdomain.id}"
self.create_formulation()
self.create_solver()
self.initialise_exports()
[docs]
def define_temperature(self):
super().define_temperature()
# NOTE this won't be needed anymore when https://github.com/FEniCS/dolfinx/pull/4140
# is released
# because dolfinx.fem.Expressions cannot work with submeshes
# (ie. mixing parent and submesh),
# we need to create "sub" temperature functions for each subdomain
# pass temperature function to each subdomain
if isinstance(self.temperature_fenics, fem.Function):
for subdomain in self.volume_subdomains:
element_CG = basix.ufl.element(
basix.ElementFamily.P,
subdomain.submesh.basix_cell(),
1, # could expose?
basix.LagrangeVariant.equispaced,
)
V = dolfinx.fem.functionspace(subdomain.submesh, element_CG)
sub_T = dolfinx.fem.Function(V)
sub_T.name = "temperature"
from festim.helpers import nmm_interpolate
nmm_interpolate(f_out=sub_T, f_in=self.temperature_fenics)
subdomain.sub_T = sub_T
[docs]
def create_initial_conditions(self):
"""For each intial condition, create the value_fenics and assign it to the
previous solution of the condition's species."""
for condition in self.initial_conditions:
idx = self.species.index(condition.species)
# if the value given is a function, then directly interpolate it on the
# previous solution of the species
if isinstance(condition.value, fem.Function):
nmm_interpolate(condition.volume.u_n.sub(idx), condition.value)
else:
V = condition.species.subdomain_to_function_space[condition.volume]
condition.create_expr_fenics(
mesh=self.mesh.mesh,
temperature=self.temperature_fenics,
function_space=V,
)
# assign to previous solution of species
entities = self.volume_meshtags.find(condition.volume.id)
condition.volume.u_n.sub(idx).interpolate(
condition.expr_fenics, cells1=entities
)
[docs]
def define_function_spaces(
self, subdomain: _subdomain.VolumeSubdomain, element_degree=1
):
"""Creates appropriate function space and functions for a given subdomain
(submesh) based on the number of species existing in this subdomain. Then stores
the functionspace, the current solution (``u``) and the previous solution
(``u_n``) functions. It also populates the correspondance dicts attributes of
the species (eg. ``species.subdomain_to_solution``,
``species.subdomain_to_test_function``, etc) for easy access to the right
subfunctions, sub-testfunctions etc.
Args:
subdomain (F.VolumeSubdomain): a subdomain of the geometry
element_degree (int, optional): Degree order for finite element.
Defaults to 1.
"""
# get number of species defined in the subdomain
self.subdomain_to_species[subdomain] = [
species for species in self.species if subdomain in species.subdomains
]
# instead of using the set function we use a list to keep the order
unique_species = []
for species in self.subdomain_to_species[subdomain]:
if species not in unique_species:
unique_species.append(species)
nb_species = len(unique_species)
element_CG = basix.ufl.element(
basix.ElementFamily.P,
subdomain.submesh.basix_cell(),
element_degree,
basix.LagrangeVariant.equispaced,
)
element = basix.ufl.mixed_element([element_CG] * nb_species)
V = dolfinx.fem.functionspace(subdomain.submesh, element)
u = dolfinx.fem.Function(V)
u_n = dolfinx.fem.Function(V)
self.subdomain_to_V_CG1[subdomain] = dolfinx.fem.functionspace(
subdomain.submesh, ("CG", 1)
)
# store attributes in the subdomain object
subdomain.u = u
subdomain.u_n = u_n
# split the functions and assign the subfunctions to the species
us = list(ufl.split(u))
u_ns = list(ufl.split(u_n))
vs = list(ufl.TestFunctions(V))
for i, species in enumerate(unique_species):
species.subdomain_to_solution[subdomain] = us[i]
species.subdomain_to_prev_solution[subdomain] = u_ns[i]
species.subdomain_to_test_function[subdomain] = vs[i]
species.subdomain_to_function_space[subdomain] = V.sub(i)
species.subdomain_to_post_processing_solution[subdomain] = u.sub(
i
).collapse()
species.subdomain_to_collapsed_function_space[subdomain] = V.sub(
i
).collapse()
name = f"{species.name}_{subdomain.id}"
species.subdomain_to_post_processing_solution[subdomain].name = name
[docs]
def convert_advection_term_to_fenics_objects(self):
"""For each advection term convert the input value."""
for advec_term in self.advection_terms:
if isinstance(advec_term, AdvectionTerm):
for spe in advec_term.species:
V = spe.subdomain_to_function_space[advec_term.subdomain]
advec_term.velocity.convert_input_value(function_space=V, t=self.t)
[docs]
def define_boundary_conditions(self):
for bc in self._unpacked_bcs:
if isinstance(bc, boundary_conditions.ParticleFluxBC):
bc._volume_subdomain = self.surface_to_volume[bc.subdomain]
super().define_boundary_conditions()
[docs]
def create_subdomain_formulation(self, subdomain: _subdomain.VolumeSubdomain):
"""Creates the variational formulation for each subdomain and stores it in
``subdomain.F``
Args:
subdomain (F.VolumeSubdomain): a subdomain of the geometry
"""
form = 0
# add diffusion and time derivative for each species
for spe in self.species:
if subdomain not in spe.subdomains:
continue
u = spe.subdomain_to_solution[subdomain]
u_n = spe.subdomain_to_prev_solution[subdomain]
v = spe.subdomain_to_test_function[subdomain]
if self.settings.transient:
form += ((u - u_n) / self.dt) * v * self.dx(subdomain.id)
if spe.mobile:
D = subdomain.material.get_diffusion_coefficient(
self.mesh.mesh, self.temperature_fenics, spe
)
match self.mesh.coordinate_system:
case CoordinateSystem.CARTESIAN:
form += ufl.dot(D * ufl.grad(u), ufl.grad(v)) * self.dx(
subdomain.id
)
case CoordinateSystem.CYLINDRICAL:
r = ufl.SpatialCoordinate(self.mesh.mesh)[0]
form += (
r
* ufl.dot(D * ufl.grad(u), ufl.grad(v / r))
* self.dx(subdomain.id)
)
case CoordinateSystem.SPHERICAL:
r = ufl.SpatialCoordinate(self.mesh.mesh)[0]
form += (
r**2
* ufl.dot(D * ufl.grad(u), ufl.grad(v / r**2))
* self.dx(subdomain.id)
)
case _:
raise ValueError(
f"Unsupported coordinate system {self.mesh.coordinate_system}" # noqa: E501
)
# add reaction terms
for reaction in self.reactions:
if reaction.volume != subdomain:
continue
# reactant
for reactant in reaction.reactant:
if isinstance(reactant, _species.Species):
form += (
reaction.reaction_term(self.temperature_fenics)
* reactant.subdomain_to_test_function[subdomain]
* self.dx(subdomain.id)
)
# product
if isinstance(reaction.product, list):
products = reaction.product
else:
products = [reaction.product]
for product in products:
form += (
-reaction.reaction_term(self.temperature_fenics)
* product.subdomain_to_test_function[subdomain]
* self.dx(subdomain.id)
)
# add fluxes
for bc in self._unpacked_bcs:
if isinstance(bc, boundary_conditions.ParticleFluxBC):
# check that the bc is applied on a surface
# belonging to this subdomain
if subdomain == self.surface_to_volume[bc.subdomain]:
v = bc.species.subdomain_to_test_function[subdomain]
form -= bc.value_fenics * v * self.ds(bc.subdomain.id)
# add volumetric sources
for source in self.sources:
v = source.species.subdomain_to_test_function[subdomain]
if source.volume == subdomain:
form -= source.value.fenics_object * v * self.dx(subdomain.id)
# add advection
for adv_term in self.advection_terms:
if adv_term.subdomain != subdomain:
continue
for spe in adv_term.species:
v = spe.subdomain_to_test_function[subdomain]
conc = spe.subdomain_to_solution[subdomain]
vel = adv_term.velocity.fenics_object
# print(vel.x.array)
form += ufl.inner(ufl.dot(ufl.grad(conc), vel), v) * self.dx(
subdomain.id
)
# store the form in the subdomain object
subdomain.F = form
[docs]
def create_solver(self):
if Version(dolfinx.__version__) == Version("0.9.0"):
self.solver = BlockedNewtonSolver(
self.forms,
[subdomain.u for subdomain in self.volume_subdomains],
J=self.J,
bcs=self.bc_forms,
petsc_options=self.petsc_options,
)
self.solver.max_iterations = self.settings.max_iterations
self.solver.convergence_criterion = self.settings.convergence_criterion
self.solver.atol = (
self.settings.atol
if not callable(self.settings.atol)
else self.settings.atol(float(self.t))
)
self.solver.rtol = (
self.settings.rtol
if not callable(self.settings.rtol)
else self.settings.rtol(float(self.t))
)
elif Version(dolfinx.__version__) > Version("0.9.0"):
from dolfinx.fem.petsc import NonlinearProblem
petsc_options = self.get_petsc_options()
self.solver = NonlinearProblem(
self.forms,
[subdomain.u for subdomain in self.volume_subdomains],
bcs=self.bc_forms,
J=self.J,
petsc_options=petsc_options,
petsc_options_prefix="festim_solver",
)
self.solver.solver.setMonitor(SnesMonitor)
self.solver.solver.getKSP().setMonitor(KSPMonitor)
self.solver.solver.setConvergenceTest(convergenceTest)
# Delete PETSc options post setting them, ref:
# https://gitlab.com/petsc/petsc/-/issues/1201
snes = self.solver.solver
prefix = snes.getOptionsPrefix()
opts = PETSc.Options()
for k in petsc_options.keys():
del opts[f"{prefix}{k}"]
[docs]
def create_flux_values_fenics(self):
"""For each particle flux create the ``value_fenics`` attribute."""
for bc in self._unpacked_bcs:
if isinstance(bc, boundary_conditions.ParticleFluxBC):
volume_subdomain = self.surface_to_volume[bc.subdomain]
bc.create_value_fenics(
mesh=volume_subdomain.submesh,
temperature=self.temperature_fenics,
t=self.t,
)
[docs]
def initialise_exports(self):
for export in self.exports:
if isinstance(export, exports.VTXSpeciesExport):
functions = export.get_functions()
if not export._checkpoint:
export.writer = dolfinx.io.VTXWriter(
functions[0].function_space.mesh.comm,
export.filename,
functions,
engine="BP5",
)
else:
adios4dolfinx.write_mesh(
export.filename,
mesh=functions[0].function_space.mesh,
)
elif isinstance(export, exports.VTXTemperatureExport):
assert isinstance(self.temperature_fenics, fem.Function), (
"Temperature must be space-dependent to be exported as "
"VTXTemperatureExport"
)
export.writer = dolfinx.io.VTXWriter(
self.temperature_fenics.function_space.mesh.comm,
export.filename,
self.temperature_fenics,
engine="BP5",
)
elif isinstance(export, exports.CustomFieldExport):
# need to find an appropriate function space on the right submesh
V = self.subdomain_to_V_CG1[export.subdomain]
export.function = fem.Function(V)
export.set_dolfinx_expression(
# need to pass the right temperature
temperature=self.temperature_fenics,
time=self.t,
)
export.writer = dolfinx.io.VTXWriter(
comm=export.function.function_space.mesh.comm,
filename=export.filename,
output=export.function,
engine="BP5",
)
# compute diffusivity function for surface fluxes
# for the discontinuous case, we don't use D_global as in
# HydrogenTransportProblem
for export in self.exports:
if isinstance(export, exports.SurfaceQuantity):
volume = self.surface_to_volume[export.surface]
D = volume.material.get_diffusion_coefficient(
self.mesh.mesh, self.temperature_fenics, export.field
)
# NOTE: maybe we need to make sure there are no functionspace clashes?
export.D = D
# reset the data and time for SurfaceQuantity and VolumeQuantity
if isinstance(export, exports.DerivedQuantity):
export.t = []
export.data = []
if isinstance(export, exports.CustomQuantity):
volume = (
export.subdomain
if not isinstance(export.subdomain, _subdomain.SurfaceSubdomain)
else self.surface_to_volume[
export.subdomain
if isinstance(export.subdomain, _subdomain.SurfaceSubdomain)
else next(
s
for s in self.surface_subdomains
if s.id == export.subdomain
)
]
)
kwargs = {
species.name: species.subdomain_to_post_processing_solution[volume]
for species in self.species
}
kwargs["n"] = ufl.FacetNormal(self.mesh.mesh)
kwargs["t"] = self.t
kwargs["T"] = self.temperature_fenics
D_kwargs = {
f"D_{sp.name}": volume.material.get_diffusion_coefficient(
self.mesh.mesh, self.temperature_fenics, sp
)
for sp in self.species
}
kwargs.update(D_kwargs)
kwargs["D"] = {sp.name: D_kwargs[f"D_{sp.name}"] for sp in self.species}
if len(self.species) == 1:
kwargs["D"] = kwargs[f"D_{self.species[0].name}"]
kwargs["x"] = ufl.SpatialCoordinate(self.mesh.mesh)
export.ufl_expr = export.expr(**kwargs)
[docs]
def post_processing(self):
# update post-processing solutions (for each species in each subdomain)
# with new solution
for subdomain in self.volume_subdomains:
for species in self.species:
if subdomain not in species.subdomains:
continue
collapsed_function = species.subdomain_to_post_processing_solution[
subdomain
]
u = subdomain.u
v0_to_V = species.subdomain_to_collapsed_function_space[subdomain][1]
collapsed_function.x.array[:] = u.x.array[v0_to_V]
for export in self.exports:
# skip if it isn't time to export
if hasattr(export, "times"):
if not is_it_time_to_export(
current_time=float(self.t), times=export.times
):
continue
# handle VTX exports
if isinstance(export, exports.ExportBaseClass):
if isinstance(export, exports.CustomFieldExport):
# update internal function
export.function.interpolate(export.dolfinx_expression)
export.writer.write(float(self.t))
elif isinstance(export, exports.VTXSpeciesExport):
if export._checkpoint:
for species in export.field:
post_processing_solution = (
species.subdomain_to_post_processing_solution[
export._subdomain
]
)
adios4dolfinx.write_function(
export.filename,
post_processing_solution,
time=float(self.t),
name=species.name,
)
else:
export.writer.write(float(self.t))
elif isinstance(export, exports.VTXTemperatureExport):
export.writer.write(float(self.t))
else:
raise NotImplementedError(
f"Export type {type(export)} not implemented"
)
# handle derived quantities
if isinstance(export, exports.SurfaceQuantity):
if isinstance(
export,
exports.SurfaceFlux | exports.TotalSurface | exports.AverageSurface,
):
if len(self.advection_terms) > 0:
warnings.warn(
"Advection terms are not currently accounted for in the "
"evaluation of surface flux values"
)
export_surf = export.surface
export_vol = self.surface_to_volume[export_surf]
submesh_function = (
export.field.subdomain_to_post_processing_solution[export_vol]
)
try:
from dolfinx.mesh import EntityMap
entity_maps = [sd.cell_map for sd in self.volume_subdomains]
except ImportError:
entity_maps = {
sd.submesh: sd.parent_to_submesh
for sd in self.volume_subdomains
}
export.compute(
u=submesh_function, ds=self.ds, entity_maps=entity_maps
)
else:
export.compute()
elif isinstance(export, exports.VolumeQuantity):
if isinstance(export, exports.TotalVolume | exports.AverageVolume):
try:
from dolfinx.mesh import EntityMap
entity_maps = [sd.cell_map for sd in self.volume_subdomains]
except ImportError:
entity_maps = {
sd.submesh: sd.parent_to_submesh
for sd in self.volume_subdomains
}
export.compute(
u=export.field.subdomain_to_post_processing_solution[
export.volume
],
dx=self.dx,
entity_maps=entity_maps,
)
else:
export.compute()
elif isinstance(export, exports.CustomQuantity):
is_surface = isinstance(export.subdomain, _subdomain.SurfaceSubdomain)
measure = self.ds if is_surface else self.dx
# getting entity_maps
try:
from dolfinx.mesh import EntityMap # noqa: F401
entity_maps = [sd.cell_map for sd in self.volume_subdomains]
except ImportError:
entity_maps = {
sd.submesh: sd.parent_to_submesh
for sd in self.volume_subdomains
}
export.compute(measure, entity_maps=entity_maps)
if isinstance(export, exports.DerivedQuantity):
# update export data
export.t.append(float(self.t))
# if filename given write export data to file
if export.filename is not None:
export.write(t=float(self.t))
elif isinstance(export, exports.Profile1DExport):
assert export.subdomain, (
"Profile1DExport requires a subdomain to be set"
)
u = export.subdomain.u
if export._dofs is None:
index = self.subdomain_to_species[export.subdomain].index(
export.field
)
V0, export._dofs = u.function_space.sub(index).collapse()
coords = V0.tabulate_dof_coordinates()[:, 0]
export._sort_coords = np.argsort(coords)
x = coords[export._sort_coords]
export.x = x
c = u.x.array[export._dofs][export._sort_coords]
export.data.append(c)
export.t.append(float(self.t))
def update_time_dependent_values(self):
super().update_time_dependent_values()
# update sub_T if temperature is given as a function
if self.temperature_time_dependent:
if isinstance(self.temperature_fenics, fem.Function):
for subdomain in self.volume_subdomains:
temp = self.temperature_fenics
sub_T = subdomain.sub_T
from festim.helpers import nmm_interpolate
nmm_interpolate(f_out=sub_T, f_in=temp)
[docs]
def iterate(self):
"""Iterates the model for a given time step."""
if self.show_progress_bar:
self.progress_bar.update(
min(self.dt.value, abs(self.settings.final_time - self.t.value))
)
self.t.value += self.dt.value
self.update_time_dependent_values()
# Solve main problem
if Version(dolfinx.__version__) == Version("0.9.0"):
nb_its, _converged = self.solver.solve()
elif Version(dolfinx.__version__) > Version("0.9.0"):
_ = self.solver.solve()
converged_reason = self.solver.solver.getConvergedReason()
assert converged_reason > 0, (
f"Non-linear solver did not converge. Reason code: {converged_reason}. \n See https://petsc.org/release/manualpages/SNES/SNESConvergedReason/ for more information." # noqa: E501
)
nb_its = self.solver.solver.getIterationNumber()
# post processing
self.post_processing()
# update previous solution
for subdomain in self.volume_subdomains:
subdomain.u_n.x.array[:] = subdomain.u.x.array[:]
# adapt stepsize
if self.settings.stepsize.adaptive:
new_stepsize = self.settings.stepsize.modify_value(
value=self.dt.value, nb_iterations=nb_its, t=self.t.value
)
self.dt.value = new_stepsize
[docs]
def run(self):
if self.settings.transient:
# Solve transient
if self.show_progress_bar:
self.progress_bar = tqdm.auto.tqdm(
desc=f"Solving {self.__class__.__name__}",
total=self.settings.final_time,
unit_scale=True,
)
while self.t.value < self.settings.final_time:
self.iterate()
if self.show_progress_bar:
self.progress_bar.refresh() # refresh progress bar to show 100%
else:
# Solve steady-state
self.solver.solve()
self.post_processing()
def __del__(self):
for export in self.exports:
if isinstance(export, exports.ExportBaseClass):
if hasattr(export, "writer") and export.writer is not None:
export.writer.close()
class HydrogenTransportProblemDiscontinuousChangeVar(HydrogenTransportProblem):
species: list[_species.Species]
def initialise(self):
# check if a SurfaceReactionBC is given
for bc in self.boundary_conditions:
if isinstance(bc, (boundary_conditions.SurfaceReactionBC)):
raise ValueError(
f"{type(bc)} not implemented for "
f"HydrogenTransportProblemDiscontinuousChangeVar"
)
if isinstance(bc, boundary_conditions.ParticleFluxBC):
if bc.species_dependent_value:
raise ValueError(
f"{type(bc)} concentration-dependent not implemented for "
f"HydrogenTransportProblemDiscontinuousChangeVar"
)
super().initialise()
def create_formulation(self):
"""Creates the formulation of the model."""
self.formulation = 0
# add diffusion and time derivative for each species
for spe in self.species:
u = spe.solution
u_n = spe.prev_solution
v = spe.test_function
for vol in self.volume_subdomains:
D = vol.material.get_diffusion_coefficient(
self.mesh.mesh, self.temperature_fenics, spe
)
if spe.mobile:
K_S = vol.material.get_solubility_coefficient(
self.mesh.mesh, self.temperature_fenics, spe
)
c = u * K_S
c_n = u_n * K_S
else:
c = u
c_n = u_n
if spe.mobile:
self.formulation += ufl.dot(D * ufl.grad(c), ufl.grad(v)) * self.dx(
vol.id
)
if self.settings.transient:
self.formulation += ((c - c_n) / self.dt) * v * self.dx(vol.id)
for reaction in self.reactions:
self.add_reaction_term(reaction)
# add sources
for source in self.sources:
self.formulation -= (
source.value.fenics_object
* source.species.test_function
* self.dx(source.volume.id)
)
# add fluxes
for bc in self._unpacked_bcs:
if isinstance(bc, boundary_conditions.ParticleFluxBC):
self.formulation -= (
bc.value_fenics
* bc.species.test_function
* self.ds(bc.subdomain.id)
)
# check if each species is defined in all volumes
if not self.settings.transient:
for spe in self.species:
# if species mobile, already defined in diffusion term
if not spe.mobile:
not_defined_in_volume = self.volume_subdomains.copy()
for vol in self.volume_subdomains:
# check reactions
for reaction in self.reactions:
if (
spe in reaction.product
): # TODO we probably need this in HydrogenTransportProblem
# too no?
if vol == reaction.volume:
if vol in not_defined_in_volume:
not_defined_in_volume.remove(vol)
# add c = 0 to formulation where needed
for vol in not_defined_in_volume:
self.formulation += (
spe.solution * spe.test_function * self.dx(vol.id)
)
def add_reaction_term(self, reaction: _reaction.Reaction):
"""Adds the reaction term to the formulation."""
products = (
reaction.product
if isinstance(reaction.product, list)
else [reaction.product]
)
# we cannot use the `concentration` attribute of the mobile species and need to
# use u * K_S instead
def get_concentrations(species_list) -> list:
concentrations = []
for spe in species_list:
if isinstance(spe, _species.ImplicitSpecies):
concentrations.append(None)
elif spe.mobile:
K_S = reaction.volume.material.get_solubility_coefficient(
self.mesh.mesh, self.temperature_fenics, spe
)
concentrations.append(spe.solution * K_S)
else:
concentrations.append(None)
return concentrations
reactant_concentrations = get_concentrations(reaction.reactant)
product_concentrations = get_concentrations(products)
# get the reaction term from the reaction
reaction_term = reaction.reaction_term(
temperature=self.temperature_fenics,
reactant_concentrations=reactant_concentrations,
product_concentrations=product_concentrations,
)
# add reaction term to formulation
# reactant
for reactant in reaction.reactant:
if isinstance(reactant, _species.Species):
self.formulation += (
reaction_term * reactant.test_function * self.dx(reaction.volume.id)
)
# product
for product in products:
self.formulation += (
-reaction_term * product.test_function * self.dx(reaction.volume.id)
)
def initialise_exports(self):
self.override_post_processing_solution()
super().initialise_exports()
def override_post_processing_solution(self):
# override the post-processing solution c = theta * K_S
Q0 = fem.functionspace(self.mesh.mesh, ("DG", 0))
Q1 = fem.functionspace(self.mesh.mesh, ("DG", 1))
for spe in self.species:
if not spe.mobile:
continue
K_S0 = fem.Function(Q0)
E_KS = fem.Function(Q0)
for subdomain in self.volume_subdomains:
entities = self.volume_meshtags.find(subdomain.id)
K_S0.x.array[entities] = subdomain.material.get_K_S_0(spe)
E_KS.x.array[entities] = subdomain.material.get_E_K_S(spe)
K_S = K_S0 * ufl.exp(-E_KS / (k_B * self.temperature_fenics))
theta = spe.solution
spe.dg_expr = fem.Expression(
theta * K_S, get_interpolation_points(Q1.element)
)
spe.post_processing_solution = fem.Function(Q1)
spe.post_processing_solution.interpolate(
spe.dg_expr
) # NOTE: do we need this line since it's in initialise?
def update_post_processing_solutions(self):
"""Updates the post-processing solutions after each time step."""
# need to compute c = theta * K_S
# this expression is stored in species.dg_expr
for spe in self.species:
if not spe.mobile:
continue
spe.post_processing_solution.interpolate(spe.dg_expr)
def create_dirichletbc_form(self, bc: boundary_conditions.FixedConcentrationBC):
"""Creates a dirichlet boundary condition form.
Args:
bc (festim.DirichletBC): the boundary condition
Returns:
dolfinx.fem.bcs.DirichletBC: A representation of
the boundary condition for modifying linear systems.
"""
# create K_S function
Q0 = fem.functionspace(self.mesh.mesh, ("DG", 0))
K_S0 = fem.Function(Q0)
E_KS = fem.Function(Q0)
for subdomain in self.volume_subdomains:
entities = self.volume_meshtags.find(subdomain.id)
K_S0.x.array[entities] = subdomain.material.get_K_S_0(bc.species)
E_KS.x.array[entities] = subdomain.material.get_E_K_S(bc.species)
K_S = K_S0 * ufl.exp(-E_KS / (k_B * self.temperature_fenics))
# create value_fenics
bc.create_value(
temperature=self.temperature_fenics,
function_space=bc.species.collapsed_function_space,
t=self.t,
K_S=K_S,
)
# get dofs
if isinstance(bc.value_fenics, fem.Function):
function_space_dofs = (
bc.species.sub_function_space,
bc.species.collapsed_function_space,
)
else:
function_space_dofs = bc.species.sub_function_space
bc_dofs = bc.define_surface_subdomain_dofs(
facet_meshtags=self.facet_meshtags,
function_space=function_space_dofs,
)
# create form
form = fem.dirichletbc(
value=bc.value_fenics,
dofs=bc_dofs,
V=bc.species.sub_function_space,
)
return form
def update_time_dependent_values(self):
super().update_time_dependent_values()
if self.temperature_time_dependent:
for bc in self.boundary_conditions:
if isinstance(bc, boundary_conditions.FixedConcentrationBC):
bc.update(self.t)