Source code for sumpy.expansion

__copyright__ = "Copyright (C) 2012 Andreas Kloeckner"

__license__ = """
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
"""

import logging
from pytools import memoize_method
import sumpy.symbolic as sym
from sumpy.tools import add_mi
from typing import List, Tuple

__doc__ = """
.. autoclass:: ExpansionBase
.. autoclass:: LinearPDEBasedExpansionTermsWrangler

Expansion Factories
^^^^^^^^^^^^^^^^^^^

.. autoclass:: ExpansionFactoryBase
.. autoclass:: DefaultExpansionFactory
.. autoclass:: VolumeTaylorExpansionFactory
"""

logger = logging.getLogger(__name__)


# {{{ base class

[docs]class ExpansionBase: """ .. automethod:: with_kernel .. automethod:: __len__ .. automethod:: get_coefficient_identifiers .. automethod:: coefficients_from_source .. automethod:: translate_from .. automethod:: __eq__ .. automethod:: __ne__ """ init_arg_names = ("kernel", "order", "use_rscale") def __init__(self, kernel, order, use_rscale=None): # Don't be tempted to remove target derivatives here. # Line Taylor QBX can't do without them, because it can't # apply those derivatives to the expanded quantity. self.kernel = kernel self.order = order if use_rscale is None: use_rscale = True self.use_rscale = use_rscale # {{{ propagate kernel interface # This is here to conform this to enough of the kernel interface # to make it fit into sumpy.qbx.LayerPotential. @property def dim(self): return self.kernel.dim @property def is_complex_valued(self): return self.kernel.is_complex_valued def get_code_transformer(self): return self.kernel.get_code_transformer() def get_global_scaling_const(self): return self.kernel.get_global_scaling_const() def get_args(self): return self.kernel.get_args() def get_source_args(self): return self.kernel.get_source_args() # }}}
[docs] def with_kernel(self, kernel): return type(self)(kernel, self.order, self.use_rscale)
[docs] def __len__(self): return len(self.get_coefficient_identifiers())
[docs] def get_coefficient_identifiers(self): """ Returns the identifiers of the coefficients that actually get stored. """ raise NotImplementedError
[docs] def coefficients_from_source(self, kernel, avec, bvec, rscale, sac=None): """Form an expansion from a source point. :arg avec: vector from source to center. :arg bvec: vector from center to target. Not usually necessary, except for line-Taylor expansion. :arg sac: a symbolic assignment collection where temporary expressions are stored. :returns: a list of :mod:`sympy` expressions representing the coefficients of the expansion. """ raise NotImplementedError
def coefficients_from_source_vec(self, kernels, avec, bvec, rscale, weights, sac=None): """Form an expansion with a linear combination of kernels and weights. :arg avec: vector from source to center. :arg bvec: vector from center to target. Not usually necessary, except for line-Taylor expansion. :arg sac: a symbolic assignment collection where temporary expressions are stored. :returns: a list of :mod:`sympy` expressions representing the coefficients of the expansion. """ result = [0]*len(self) for knl, weight in zip(kernels, weights): coeffs = self.coefficients_from_source(knl, avec, bvec, rscale, sac=sac) for i in range(len(result)): result[i] += weight * coeffs[i] return result def evaluate(self, kernel, coeffs, bvec, rscale, sac=None): """ :return: a :mod:`sympy` expression corresponding to the evaluated expansion with the coefficients in *coeffs*. """ raise NotImplementedError
[docs] def translate_from(self, src_expansion, src_coeff_exprs, src_rscale, dvec, tgt_rscale, sac=None): raise NotImplementedError
def update_persistent_hash(self, key_hash, key_builder): key_hash.update(type(self).__name__.encode("utf8")) key_builder.rec(key_hash, self.kernel) key_builder.rec(key_hash, self.order) key_builder.rec(key_hash, self.use_rscale)
[docs] def __eq__(self, other): return ( type(self) == type(other) and self.kernel == other.kernel and self.order == other.order and self.use_rscale == other.use_rscale)
[docs] def __ne__(self, other): return not self.__eq__(other)
def copy(self, **kwargs): new_kwargs = { name: getattr(self, name) for name in self.init_arg_names} for name in self.init_arg_names: new_kwargs[name] = kwargs.pop(name, getattr(self, name)) if kwargs: raise TypeError("unexpected keyword arguments '%s'" % ", ".join(kwargs)) return type(self)(**new_kwargs)
# }}} # {{{ expansion terms wrangler class ExpansionTermsWrangler: init_arg_names = ("order", "dim", "max_mi") def __init__(self, order, dim, max_mi=None): self.order = order self.dim = dim self.max_mi = max_mi def get_coefficient_identifiers(self): raise NotImplementedError def get_full_kernel_derivatives_from_stored(self, stored_kernel_derivatives, rscale, sac=None): raise NotImplementedError def get_stored_mpole_coefficients_from_full(self, full_mpole_coefficients, rscale, sac=None): raise NotImplementedError @memoize_method def get_full_coefficient_identifiers(self): """ Returns identifiers for every coefficient in the complete expansion. """ from pytools import ( generate_nonnegative_integer_tuples_summing_to_at_most as gnitstam) res = sorted(gnitstam(self.order, self.dim), key=sum) if self.max_mi is None: return res return [mi for mi in res if all(mi[i] <= self.max_mi[i] for i in range(self.dim))] def copy(self, **kwargs): new_kwargs = { name: getattr(self, name) for name in self.init_arg_names} for name in self.init_arg_names: new_kwargs[name] = kwargs.pop(name, getattr(self, name)) if kwargs: raise TypeError("unexpected keyword arguments '%s'" % ", ".join(kwargs)) return type(self)(**new_kwargs) @memoize_method def _split_coeffs_into_hyperplanes(self) -> List[Tuple[int, List[Tuple[int]]]]: r""" This splits the coefficients into :math:`O(p)` disjoint sets so that for each set, all the identifiers have the form, :math:`(m_1, m_2, ..., m_{j-1}, c, m_{j+1}, ... , m_{\text{dim}})` where :math:`c` is a constant. Geometrically, each set is an axis-aligned hyperplane. If this is an instance of :class:`LinearPDEBasedExpansionTermsWrangler`, then the number of sets will be :math:`O(1)`. In the returned object ``[(axis, [mi_1, mi2, ...]), ...]``, each element in the outer list represents a hyperplane. Each element is a 2-tuple where the first element in the tuple is the axis number *axis* to which the hyperplane is orthogonal. The second element in the tuple is a list of multi-indices in the hyperplane corresponding to the stored coefficients. E.g. for Laplace 3D order 4, the result could be:: [ (2, [(0, 0, 0), (1, 0, 0), (2, 0, 0), (3, 0, 0), (0, 1, 0), (1, 1, 0), (2, 1, 0), (0, 2, 0), (1, 2, 0), (0, 3, 0)]), (2, [(0, 0, 1), (1, 0, 1), (2, 0, 1), (0, 1, 1), (1, 1, 1), (0, 2, 1)]), ] """ mis = self.get_full_coefficient_identifiers() mi_to_index = {mi: i for i, mi in enumerate(mis)} # Each hyperplane stored below is identified by a tuple of the axis # to which it is orthogonal to and the constant `c` described above hyperplanes = [] if isinstance(self, LinearPDEBasedExpansionTermsWrangler): pde_dict, = self.knl.get_pde_as_diff_op().eqs if not all(ident.mi in mi_to_index for ident in pde_dict): # The order of the expansion is less than the order of the PDE. # Treat as if full expansion. pass else: # Calculate the multi-index that appears last in in the PDE in # reverse degree lexicographic order (degrevlex). # The monomial ordering used here which is degrevlex, must match # the monomial ordering used in LinearPDEBasedExpansionTermsWrangler max_mi_idx = max(mi_to_index[ident.mi] for ident in pde_dict.keys()) max_mi = mis[max_mi_idx] hyperplanes.extend( (d, const) for d in range(self.dim) for const in range(max_mi[d])) if not hyperplanes: d = self.dim - 1 hyperplanes.extend((d, const) for const in range(self.order + 1)) res = [] seen_mis = set() for d, const in hyperplanes: coeffs_in_hyperplane = [] for mi in self.get_coefficient_identifiers(): # Check if the multi-index is in this hyperplane and # if it is not in any of the hyperplanes we saw before # (This rejects coefficients that might be in multiple # hyperplanes, such as near the origin.) if mi[d] == const and mi not in seen_mis: coeffs_in_hyperplane.append(mi) seen_mis.add(mi) res.append((d, coeffs_in_hyperplane)) return res class FullExpansionTermsWrangler(ExpansionTermsWrangler): get_coefficient_identifiers = ( ExpansionTermsWrangler.get_full_coefficient_identifiers) def get_full_kernel_derivatives_from_stored(self, stored_kernel_derivatives, rscale, sac=None): return stored_kernel_derivatives get_stored_mpole_coefficients_from_full = ( get_full_kernel_derivatives_from_stored) # {{{ sparse matrix-vector multiplication class CSEMatVecOperator: """ A class to facilitate a fast matrix vector multiplication with common subexpression eliminated. In compressed Taylor series, the compression matrix's operation count can be reduced to `O(p**d)` from `O(p**{2d-1})` using CSE. Each row in the matrix is represented by a linear combination of values from input vector and a linear combination of values from the output vector. .. attribute:: from_input_coeffs_by_row An object of type ``List[List[Tuple[int, Any]]]``. Each element in the list represents a row of the matrix using a linear combination of values from the input vector. Each element has the form ``(index of input vector, coeff)``. Number of rows in the matrix represented is equal to the length of the `from_input_coeffs_by_row` list. .. attribute:: from_output_coeffs_by_row An object of type ``List[List[Tuple[int, Any]]]``. Each element in the list represents a row of the matrix using a linear combination of values from the output vector. Each element has the form ``(index of output vector, coeff)``. .. attribute:: shape Shape of the matrix as a tuple. """ def __init__(self, from_input_coeffs_by_row, from_output_coeffs_by_row, shape): self.from_input_coeffs_by_row = from_input_coeffs_by_row self.from_output_coeffs_by_row = from_output_coeffs_by_row self.shape = shape assert len(self.from_input_coeffs_by_row) == shape[0] assert len(self.from_output_coeffs_by_row) == shape[0] def matvec(self, inp, wrap_intermediate=lambda x: x): """ :arg inp: vector for the matrix vector multiplication :arg wrap_intermediate: a function to wrap intermediate expressions If not given, the number of operations might grow in the final expressions in the vector resulting in an expensive matvec. """ assert len(inp) == self.shape[1] out = [] for i in range(self.shape[0]): value = 0 for input_index, coeff in self.from_input_coeffs_by_row[i]: value += inp[input_index] * coeff for output_index, coeff in self.from_output_coeffs_by_row[i]: value += out[output_index] * coeff out.append(wrap_intermediate(value)) return out def transpose_matvec(self, inp, wrap_intermediate=lambda x: x): assert len(inp) == self.shape[0] res = [0]*self.shape[1] expr_all = list(inp) for i in reversed(range(self.shape[0])): for output_index, coeff in self.from_output_coeffs_by_row[i]: expr_all[output_index] += expr_all[i] * coeff expr_all[output_index] = wrap_intermediate(expr_all[output_index]) for input_index, coeff in self.from_input_coeffs_by_row[i]: res[input_index] += expr_all[i] * coeff return res # }}}
[docs]class LinearPDEBasedExpansionTermsWrangler(ExpansionTermsWrangler): """ .. automethod:: __init__ """ init_arg_names = ("order", "dim", "knl", "max_mi")
[docs] def __init__(self, order, dim, knl, max_mi=None): r""" :param order: order of the expansion :param dim: number of dimensions :param knl: kernel for the PDE """ super().__init__(order, dim, max_mi) self.knl = knl
def get_coefficient_identifiers(self): return self.stored_identifiers def get_full_kernel_derivatives_from_stored(self, stored_kernel_derivatives, rscale, sac=None): from sumpy.tools import add_to_sac projection_matrix = self.get_projection_matrix(rscale) return projection_matrix.matvec(stored_kernel_derivatives, lambda x: add_to_sac(sac, x)) def get_stored_mpole_coefficients_from_full(self, full_mpole_coefficients, rscale, sac=None): from sumpy.tools import add_to_sac projection_matrix = self.get_projection_matrix(rscale) return projection_matrix.transpose_matvec(full_mpole_coefficients, lambda x: add_to_sac(sac, x)) @property def stored_identifiers(self): stored_identifiers, _ = self.get_stored_ids_and_unscaled_projection_matrix() return stored_identifiers @memoize_method def get_stored_ids_and_unscaled_projection_matrix(self): from pytools import ProcessLogger plog = ProcessLogger(logger, "compute PDE for Taylor coefficients") mis = self.get_full_coefficient_identifiers() coeff_ident_enumerate_dict = {tuple(mi): i for (i, mi) in enumerate(mis)} diff_op = self.knl.get_pde_as_diff_op() assert len(diff_op.eqs) == 1 pde_dict = {k.mi: v for k, v in diff_op.eqs[0].items()} for ident in pde_dict.keys(): if ident not in coeff_ident_enumerate_dict: # Order of the expansion is less than the order of the PDE. # In that case, the compression matrix is the identity matrix # and there's nothing to project from_input_coeffs_by_row = [[(i, 1)] for i in range(len(mis))] from_output_coeffs_by_row = [[] for _ in range(len(mis))] shape = (len(mis), len(mis)) op = CSEMatVecOperator(from_input_coeffs_by_row, from_output_coeffs_by_row, shape) return mis, op # Calculate the multi-index that appears last in in the PDE in # reverse degree lexicographic order (degrevlex). max_mi_idx = max(coeff_ident_enumerate_dict[ident] for ident in pde_dict.keys()) max_mi = mis[max_mi_idx] max_mi_coeff = pde_dict[max_mi] max_mi_mult = -1/sym.sympify(max_mi_coeff) def is_stored(mi): """ A multi_index mi is not stored if mi >= max_mi """ return any(mi[d] < max_mi[d] for d in range(self.dim)) stored_identifiers = [] from_input_coeffs_by_row = [] from_output_coeffs_by_row = [] for i, mi in enumerate(mis): # If the multi-index is to be stored, keep the projection matrix # entry empty if is_stored(mi): idx = len(stored_identifiers) stored_identifiers.append(mi) from_input_coeffs_by_row.append([(idx, 1)]) from_output_coeffs_by_row.append([]) continue diff = [mi[d] - max_mi[d] for d in range(self.dim)] # eg: u_xx + u_yy + u_zz is represented as # [((2, 0, 0), 1), ((0, 2, 0), 1), ((0, 0, 2), 1)] assignment = [] for other_mi, coeff in pde_dict.items(): j = coeff_ident_enumerate_dict[add_mi(other_mi, diff)] if i == j: # Skip the u_zz part here. continue # PDE might not have max_mi_coeff = -1, divide by -max_mi_coeff # to get a relation of the form, u_zz = - u_xx - u_yy for Laplace 3D. assignment.append((j, coeff*max_mi_mult)) from_input_coeffs_by_row.append([]) from_output_coeffs_by_row.append(assignment) plog.done() logger.debug("number of Taylor coefficients was reduced from {orig} to {red}" .format(orig=len(self.get_full_coefficient_identifiers()), red=len(stored_identifiers))) shape = (len(mis), len(stored_identifiers)) op = CSEMatVecOperator(from_input_coeffs_by_row, from_output_coeffs_by_row, shape) return stored_identifiers, op @memoize_method def get_projection_matrix(self, rscale): r""" Return a :class:`CSEMatVecOperator` object which exposes a matrix vector multiplication operator for the projection matrix that expresses every derivative in terms of a set of "stored" derivatives. For example, for the PDE:: u_xx + u_yy + u_zz = 0 the coefficient matrix features the following entries:: ... u_xx u_yy ... <= cols = only stored derivatives ================== ...| ... ... ... ... | u_zz| ... -1 -1 ... ^ rows = one for every derivative the projection matrix `M` is the transpose of the coefficient matrix, so that .. math:: c^{\text{local}}_{\text{full}} = M^T c^{\text{local}}_{\text{stored}}.\\ c^{\text{mpole}}_{\text{stored}} = M c^{\text{mpole}}_{\text{full}}. """ _, projection_matrix = \ self.get_stored_ids_and_unscaled_projection_matrix() full_coeffs = self.get_full_coefficient_identifiers() projection_with_rscale = [] for row, assignment in \ enumerate(projection_matrix.from_output_coeffs_by_row): # For eg: (u_xxx / rscale**3) = (u_yy / rscale**2) * coeff1 + # (u_xx / rscale**2) * coeff2 # is converted to u_xxx = u_yy * (rscale * coeff1) + # u_xx * (rscale * coeff2) row_rscale = sum(full_coeffs[row]) from_output_coeffs_with_rscale = [] for k, coeff in assignment: diff = row_rscale - sum(full_coeffs[k]) mult = rscale**diff from_output_coeffs_with_rscale.append((k, coeff * mult)) projection_with_rscale.append(from_output_coeffs_with_rscale) shape = projection_matrix.shape return CSEMatVecOperator(projection_matrix.from_input_coeffs_by_row, projection_with_rscale, shape)
# }}} # {{{ volume taylor class VolumeTaylorExpansionBase: @classmethod def get_or_make_expansion_terms_wrangler(cls, *key): """ This stores the expansion terms wrangler at the class attribute level because recreating the expansion terms wrangler implicitly empties its caches. """ try: wrangler = cls.expansion_terms_wrangler_cache[key] except KeyError: wrangler = cls.expansion_terms_wrangler_class(*key) cls.expansion_terms_wrangler_cache[key] = wrangler return wrangler @property def expansion_terms_wrangler(self): return self.get_or_make_expansion_terms_wrangler( *self.expansion_terms_wrangler_key) def get_coefficient_identifiers(self): """ Returns the identifiers of the coefficients that actually get stored. """ return self.expansion_terms_wrangler.get_coefficient_identifiers() def get_full_coefficient_identifiers(self): return self.expansion_terms_wrangler.get_full_coefficient_identifiers() @property @memoize_method def _storage_loc_dict(self): return {i: idx for idx, i in enumerate(self.get_coefficient_identifiers())} def get_storage_index(self, i): return self._storage_loc_dict[i] class VolumeTaylorExpansion(VolumeTaylorExpansionBase): expansion_terms_wrangler_class = FullExpansionTermsWrangler expansion_terms_wrangler_cache = {} # not user-facing, be strict about having to pass use_rscale def __init__(self, kernel, order, use_rscale): self.expansion_terms_wrangler_key = (order, kernel.dim) class LinearPDEConformingVolumeTaylorExpansion(VolumeTaylorExpansionBase): expansion_terms_wrangler_class = LinearPDEBasedExpansionTermsWrangler expansion_terms_wrangler_cache = {} # not user-facing, be strict about having to pass use_rscale def __init__(self, kernel, order, use_rscale): self.expansion_terms_wrangler_key = (order, kernel.dim, kernel) class LaplaceConformingVolumeTaylorExpansion( LinearPDEConformingVolumeTaylorExpansion): def __init__(self, *args, **kwargs): from warnings import warn warn("LaplaceConformingVolumeTaylorExpansion is deprecated. " "Use LinearPDEConformingVolumeTaylorExpansion instead.", DeprecationWarning, stacklevel=2) super().__init__(*args, **kwargs) class HelmholtzConformingVolumeTaylorExpansion( LinearPDEConformingVolumeTaylorExpansion): def __init__(self, *args, **kwargs): from warnings import warn warn("HelmholtzConformingVolumeTaylorExpansion is deprecated. " "Use LinearPDEConformingVolumeTaylorExpansion instead.", DeprecationWarning, stacklevel=2) super().__init__(*args, **kwargs) class BiharmonicConformingVolumeTaylorExpansion( LinearPDEConformingVolumeTaylorExpansion): def __init__(self, *args, **kwargs): from warnings import warn warn("BiharmonicConformingVolumeTaylorExpansion is deprecated. " "Use LinearPDEConformingVolumeTaylorExpansion instead.", DeprecationWarning, stacklevel=2) super().__init__(*args, **kwargs) # }}} # {{{ expansion factory
[docs]class ExpansionFactoryBase: """An interface .. automethod:: get_local_expansion_class .. automethod:: get_multipole_expansion_class """ def get_local_expansion_class(self, base_kernel): """Returns a subclass of :class:`ExpansionBase` suitable for *base_kernel*. """ raise NotImplementedError() def get_multipole_expansion_class(self, base_kernel): """Returns a subclass of :class:`ExpansionBase` suitable for *base_kernel*. """ raise NotImplementedError()
[docs]class VolumeTaylorExpansionFactory(ExpansionFactoryBase): """An implementation of :class:`ExpansionFactoryBase` that uses Volume Taylor expansions for each kernel. """ def get_local_expansion_class(self, base_kernel): """Returns a subclass of :class:`ExpansionBase` suitable for *base_kernel*. """ from sumpy.expansion.local import VolumeTaylorLocalExpansion return VolumeTaylorLocalExpansion def get_multipole_expansion_class(self, base_kernel): """Returns a subclass of :class:`ExpansionBase` suitable for *base_kernel*. """ from sumpy.expansion.multipole import VolumeTaylorMultipoleExpansion return VolumeTaylorMultipoleExpansion
[docs]class DefaultExpansionFactory(ExpansionFactoryBase): """An implementation of :class:`ExpansionFactoryBase` that gives the 'best known' expansion for each kernel. """ def get_local_expansion_class(self, base_kernel): """Returns a subclass of :class:`ExpansionBase` suitable for *base_kernel*. """ from sumpy.kernel import (HelmholtzKernel, YukawaKernel) from sumpy.expansion.local import (H2DLocalExpansion, Y2DLocalExpansion, LinearPDEConformingVolumeTaylorLocalExpansion, VolumeTaylorLocalExpansion) if (isinstance(base_kernel.get_base_kernel(), HelmholtzKernel) and base_kernel.dim == 2): return H2DLocalExpansion elif (isinstance(base_kernel.get_base_kernel(), YukawaKernel) and base_kernel.dim == 2): return Y2DLocalExpansion try: base_kernel.get_base_kernel().get_pde_as_diff_op() return LinearPDEConformingVolumeTaylorLocalExpansion except NotImplementedError: return VolumeTaylorLocalExpansion def get_multipole_expansion_class(self, base_kernel): """Returns a subclass of :class:`ExpansionBase` suitable for *base_kernel*. """ from sumpy.kernel import (HelmholtzKernel, YukawaKernel) from sumpy.expansion.multipole import (H2DMultipoleExpansion, Y2DMultipoleExpansion, LinearPDEConformingVolumeTaylorMultipoleExpansion, VolumeTaylorMultipoleExpansion) if (isinstance(base_kernel.get_base_kernel(), HelmholtzKernel) and base_kernel.dim == 2): return H2DMultipoleExpansion elif (isinstance(base_kernel.get_base_kernel(), YukawaKernel) and base_kernel.dim == 2): return Y2DMultipoleExpansion try: base_kernel.get_base_kernel().get_pde_as_diff_op() return LinearPDEConformingVolumeTaylorMultipoleExpansion except NotImplementedError: return VolumeTaylorMultipoleExpansion
# }}} # vim: fdm=marker