Source code for sumpy.expansion.diff_op

__copyright__ = "Copyright (C) 2019 Isuru Fernando"

__license__ = """
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
"""

from collections import namedtuple
from pyrsistent import pmap
from pytools import memoize
from sumpy.tools import add_mi, find_linear_relationship
import logging

logger = logging.getLogger(__name__)

__doc__ = """
Differential operator interface
-------------------------------

.. autoclass:: LinearPDESystemOperator
.. autoclass:: DerivativeIdentifier
.. autofunction:: make_identity_diff_op
.. autofunction:: as_scalar_pde
"""

DerivativeIdentifier = namedtuple("DerivativeIdentifier", ["mi", "vec_idx"])


[docs]class LinearPDESystemOperator: r""" Represents a constant-coefficient linear differential operator of a vector-valued function with `dim` spatial variables. It is represented by a tuple of immutable dictionaries. The dictionary maps a :class:`DerivativeIdentifier` to the coefficient. This object is immutable. Optionally supports a time variable as the last variable in the multi-index of the :class:`DerivativeIdentifier`. """ def __init__(self, dim, *eqs): """ :arg dim: Number of spatial dimensions of the LinearPDESystemOperator :arg eqs: A list of dictionaries mapping a :class:`DerivativeIdentifier` to a coefficient. """ self.dim = dim self.eqs = tuple(eqs) def __eq__(self, other): return self.dim == other.dim and self.eqs == other.eqs def __hash__(self): return hash((self.dim, self.eqs)) @property def order(self): deg = 0 for eq in self.eqs: deg = max(deg, max(sum(ident.mi) for ident in eq.keys())) return deg def __mul__(self, param): eqs = [] for eq in self.eqs: deriv_ident_to_coeff = {} for k, v in eq.items(): deriv_ident_to_coeff[k] = v * param eqs.append(pmap(deriv_ident_to_coeff)) return LinearPDESystemOperator(self.dim, *eqs) __rmul__ = __mul__ def __add__(self, other_diff_op): assert self.dim == other_diff_op.dim assert len(self.eqs) == len(other_diff_op.eqs) eqs = [] for eq, other_eq in zip(self.eqs, other_diff_op.eqs): res = dict(eq) for k, v in other_eq.items(): if k in res: res[k] += v else: res[k] = v eqs.append(pmap(res)) return LinearPDESystemOperator(self.dim, *eqs) __radd__ = __add__ def __sub__(self, other_diff_op): return self + (-1)*other_diff_op def __repr__(self): return f"LinearPDESystemOperator({self.dim}, {repr(self.eqs)})" def __getitem__(self, idx): item = self.eqs.__getitem__(idx) if not isinstance(item, tuple): item = (item,) return LinearPDESystemOperator(self.dim, *item) @property def total_dims(self): """ Returns the total number of dimensions including time """ return len(self.eqs[0].keys()[0].mi) def to_sym(self, fnames=None): from sumpy.symbolic import make_sym_vector, Function x = list(make_sym_vector("x", self.dim)) x += list(make_sym_vector("t", self.total_dims - self.dim)) if fnames is None: noutputs = 0 for eq in self.eqs: for deriv_ident in eq.keys(): noutputs = max(noutputs, deriv_ident.vec_idx) fnames = [f"f{i}" for i in range(noutputs+1)] funcs = [Function(fname)(*x) for fname in fnames] res = [] for eq in self.eqs: sym_eq = 0 for deriv_ident, coeff in eq.items(): expr = funcs[deriv_ident.vec_idx] for i, val in enumerate(deriv_ident.mi): for _ in range(val): expr = expr.diff(x[i]) sym_eq += expr * coeff res.append(sym_eq) return res
[docs]@memoize def as_scalar_pde(pde, vec_idx): r""" Returns a scalar PDE that is satisfied by the *vec_idx* component of *pde*. :arg pde: An instance of :class:`LinearPDESystemOperator` :arg vec_idx: the index of the vector-valued function that we want as a scalar PDE """ from sumpy.tools import nullspace indices = set() for eq in pde.eqs: for deriv_ident in eq.keys(): indices.add(deriv_ident.vec_idx) # this is already a scalar pde if len(indices) == 1 and list(indices)[0] == vec_idx: return pde from pytools import ProcessLogger plog = ProcessLogger(logger, "computing single PDE for multiple PDEs") from pytools import ( generate_nonnegative_integer_tuples_summing_to_at_most as gnitstam) dim = pde.total_dims # slowly increase the order of the derivatives that we take of the # system of PDEs. Once we reach the order of the scalar PDE, this # loop will break for order in range(2, 100): mis = sorted(gnitstam(order, dim), key=sum) pde_mat = [] coeff_ident_enumerate_dict = dict((tuple(mi), i) for (i, mi) in enumerate(mis)) offset = len(mis) # Create a matrix of equations that are derivatives of the # original system of PDEs for mi in mis: for pde_dict in pde.eqs: eq = [0]*(len(mis)*(max(indices)+1)) for ident, coeff in pde_dict.items(): c = tuple(add_mi(ident.mi, mi)) if c not in coeff_ident_enumerate_dict: break idx = offset*ident.vec_idx + coeff_ident_enumerate_dict[c] eq[idx] = coeff else: pde_mat.append(eq) if len(pde_mat) == 0: continue # Get the nullspace of the matrix and get the rows related to this # vec_idx n = nullspace(pde_mat)[offset*vec_idx:offset*(vec_idx+1), :] indep_row = find_linear_relationship(n) if len(indep_row) > 0: pde_dict = {} mult = indep_row[max(indep_row.keys())] for k, v in indep_row.items(): pde_dict[DerivativeIdentifier(mis[k], 0)] = v / mult plog.done() return LinearPDESystemOperator(pde.dim, pmap(pde_dict)) plog.done() raise AssertionError
def laplacian(diff_op): dim = diff_op.dim empty = [pmap()] * len(diff_op.eqs) res = LinearPDESystemOperator(dim, *empty) for j in range(dim): mi = [0]*diff_op.total_dims mi[j] = 2 res = res + diff(diff_op, tuple(mi)) return res def diff(diff_op, mi): eqs = [] for eq in diff_op.eqs: res = {} for deriv_ident, v in eq.items(): new_mi = add_mi(deriv_ident.mi, mi) res[DerivativeIdentifier(new_mi, deriv_ident.vec_idx)] = v eqs.append(pmap(res)) return LinearPDESystemOperator(diff_op.dim, *eqs) def divergence(diff_op): assert len(diff_op.eqs) == diff_op.dim res = LinearPDESystemOperator(diff_op.dim, pmap()) for i in range(diff_op.dim): mi = [0]*diff_op.total_dims mi[i] = 1 res += diff(diff_op[i], tuple(mi)) return res def gradient(diff_op): assert len(diff_op.eqs) == 1 eqs = [] dim = diff_op.dim for i in range(dim): mi = [0]*diff_op.total_dims mi[i] = 1 eqs.append(diff(diff_op, tuple(mi)).eqs[0]) return LinearPDESystemOperator(dim, *eqs) def curl(pde): assert len(pde.eqs) == 3 assert pde.dim == 3 eqs = [] mis = [] for i in range(3): mi = [0]*pde.total_dims mi[i] = 1 mis.append(tuple(mi)) for i in range(3): new_pde = diff(pde[(i+2) % 3], mis[(i+1) % 3]) - \ diff(pde[(i+1) % 3], mis[(i+2) % 3]) eqs.append(new_pde.eqs[0]) return LinearPDESystemOperator(pde.dim, *eqs) def concat(*ops): ops = list(ops) assert len(ops) >= 1 dim = ops[0].dim for op in ops: assert op.dim == dim eqs = list(ops[0].eqs) for op in ops[1:]: eqs.extend(list(op.eqs)) return LinearPDESystemOperator(dim, *eqs)
[docs]def make_identity_diff_op(ninput, noutput=1, time_dependent=False): """ Returns the identity as a linear PDE system operator. if *include_time* is true, then the last dimension of the multi-index is time. :arg ninput: number of spatial variables of the function :arg noutput: number of output values of function :arg time_dependent: include time as a dimension """ if time_dependent: mi = tuple([0]*(ninput + 1)) else: mi = tuple([0]*ninput) eqs = [pmap({DerivativeIdentifier(mi, i): 1}) for i in range(noutput)] return LinearPDESystemOperator(ninput, *eqs)