Source code for sumpy.kernel

__copyright__ = "Copyright (C) 2012 Andreas Kloeckner"

__license__ = """
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
"""


import loopy as lp
import numpy as np
from pymbolic.mapper import IdentityMapper, CSECachingMapperMixin
from sumpy.symbolic import pymbolic_real_norm_2
import sumpy.symbolic as sym
from pymbolic.primitives import make_sym_vector
from pymbolic import var, parse
from pytools import memoize_method
from collections import defaultdict

__doc__ = """
Kernel interface
----------------

.. autoclass:: Kernel
.. autoclass:: KernelArgument

Symbolic kernels
----------------

.. autoclass:: ExpressionKernel

PDE kernels
-----------

.. autoclass:: LaplaceKernel
.. autoclass:: BiharmonicKernel
.. autoclass:: HelmholtzKernel
.. autoclass:: YukawaKernel
.. autoclass:: StokesletKernel
.. autoclass:: StressletKernel
.. autoclass:: ElasticityKernel
.. autoclass:: LineOfCompressionKernel

Derivatives
-----------

These objects *wrap* other kernels and take derivatives
of them in the process.

.. autoclass:: DerivativeBase
.. autoclass:: AxisTargetDerivative
.. autoclass:: AxisSourceDerivative
.. autoclass:: DirectionalSourceDerivative
.. autoclass:: DirectionalTargetDerivative

Transforming kernels
--------------------

.. autoclass:: KernelMapper
.. autoclass:: KernelCombineMapper
.. autoclass:: KernelIdentityMapper
.. autoclass:: AxisSourceDerivativeRemover
.. autoclass:: AxisTargetDerivativeRemover
.. autoclass:: SourceDerivativeRemover
.. autoclass:: TargetDerivativeRemover
.. autoclass:: DerivativeCounter
"""


[docs]class KernelArgument: """ .. attribute:: loopy_arg A :class:`loopy.KernelArgument` instance describing the type, name, and other features of this kernel argument when passed to a generated piece of code. """ def __init__(self, loopy_arg): self.loopy_arg = loopy_arg @property def name(self): return self.loopy_arg.name def __eq__(self, other): if id(self) == id(other): return True if not type(self) == KernelArgument: return NotImplemented if not type(other) == KernelArgument: return NotImplemented return self.loopy_arg == other.loopy_arg def __ne__(self, other): # Needed for python2 return not self == other def __hash__(self): return (type(self), self.loopy_arg)
# {{{ basic kernel interface
[docs]class Kernel: """Basic kernel interface. .. attribute:: is_complex_valued .. attribute:: dim .. automethod:: get_base_kernel .. automethod:: replace_base_kernel .. automethod:: prepare_loopy_kernel .. automethod:: get_code_transformer .. automethod:: get_expression .. automethod:: postprocess_at_source .. automethod:: postprocess_at_target .. automethod:: get_global_scaling_const .. automethod:: get_args .. automethod:: get_source_args """ def __init__(self, dim): self.dim = dim # {{{ hashing/pickling/equality def __eq__(self, other): if self is other: return True elif hash(self) != hash(other): return False else: return (type(self) is type(other) and self.__getinitargs__() == other.__getinitargs__()) def __ne__(self, other): return not self.__eq__(other) def __hash__(self): try: return self.hash_value except AttributeError: self.hash_value = hash((type(self),) + self.__getinitargs__()) return self.hash_value def update_persistent_hash(self, key_hash, key_builder): key_hash.update(type(self).__name__.encode("utf8")) key_builder.rec(key_hash, self.__getinitargs__()) def __getstate__(self): return self.__getinitargs__() def __setstate__(self, state): # Can't use trivial pickling: hash_value cache must stay unset assert len(self.init_arg_names) == len(state) self.__init__(*state) # }}}
[docs] def get_base_kernel(self): """Return the kernel being wrapped by this one, or else *self*. """ return self
[docs] def replace_base_kernel(self, new_base_kernel): """Return the base kernel being wrapped by this one, or else *new_base_kernel*. """ return new_base_kernel
[docs] def prepare_loopy_kernel(self, loopy_knl): """Apply some changes (such as registering function manglers) to the kernel. Return the new kernel. """ return loopy_knl
[docs] def get_code_transformer(self): """Return a function to postprocess the :mod:`pymbolic` expression generated from the result of :meth:`get_expression` on the way to code generation. """ return lambda expr: expr
[docs] def get_expression(self, dist_vec): r"""Return a :mod:`sympy` expression for the kernel.""" raise NotImplementedError
def _diff(self, expr, vec, mi): """Take the derivative of an expression """ for i in range(self.dim): if mi[i] == 0: continue expr = expr.diff(vec[i], mi[i]) return expr
[docs] def postprocess_at_source(self, expr, avec): """Transform a kernel evaluation or expansion expression in a place where the vector a (something - source) is known. ("something" may be an expansion center or a target.) The typical use of this function is to apply source-variable derivatives to the kernel. """ from sumpy.tools import (ExprDerivativeTaker, DifferentiatedExprDerivativeTaker) expr_dict = {(0,)*self.dim: 1} expr_dict = self.get_derivative_transformation_at_source(expr_dict) if isinstance(expr, ExprDerivativeTaker): return DifferentiatedExprDerivativeTaker(expr, expr_dict) result = 0 for mi, coeff in expr_dict.items(): result += coeff * self._diff(expr, avec, mi) return result
[docs] def postprocess_at_target(self, expr, bvec): """Transform a kernel evaluation or expansion expression in a place where the vector b (target - something) is known. ("something" may be an expansion center or a target.) The typical use of this function is to apply target-variable derivatives to the kernel. """ from sumpy.tools import (ExprDerivativeTaker, DifferentiatedExprDerivativeTaker) expr_dict = {(0,)*self.dim: 1} expr_dict = self.get_derivative_transformation_at_target(expr_dict) if isinstance(expr, ExprDerivativeTaker): return DifferentiatedExprDerivativeTaker(expr, expr_dict) result = 0 for mi, coeff in expr_dict.items(): result += coeff * self._diff(expr, bvec, mi) return result
def get_derivative_transformation_at_source(self, expr_dict): r"""Get the derivative transformation of the expression at source represented by the dictionary expr_dict which is mapping from multi-index `mi` to coefficient `coeff`. Expression represented by the dictionary `expr_dict` is :math:`\sum_{mi} \frac{\partial^mi}{x^mi}G * coeff`. Returns an expression of the same type. This function is meant to be overridden by child classes where necessary. """ return expr_dict def get_derivative_transformation_at_target(self, expr_dict): r"""Get the derivative transformation of the expression at target represented by the dictionary expr_dict which is mapping from multi-index `mi` to coefficient `coeff`. Expression represented by the dictionary `expr_dict` is :math:`\sum_{mi} \frac{\partial^mi}{x^mi}G * coeff`. Returns an expression of the same type. This function is meant to be overridden by child classes where necessary. """ return expr_dict
[docs] def get_global_scaling_const(self): r"""Return a global scaling constant of the kernel. Typically, this ensures that the kernel is scaled so that :math:`\mathcal L(G)(x)=C\delta(x)` with a constant of 1, where :math:`\mathcal L` is the PDE operator associated with the kernel. Not to be confused with *rscale*, which keeps expansion coefficients benignly scaled. """ raise NotImplementedError
[docs] def get_args(self): """Return list of :class:`KernelArgument` instances describing extra arguments used by the kernel. """ return []
[docs] def get_source_args(self): """Return list of :class:`KernelArgument` instances describing extra arguments used by kernel in picking up contributions from point sources. """ return []
# }}}
[docs]class ExpressionKernel(Kernel): is_complex_valued = False init_arg_names = ("dim", "expression", "global_scaling_const", "is_complex_valued") def __init__(self, dim, expression, global_scaling_const, is_complex_valued): r""" :arg expression: A :mod:`pymbolic` expression depending on variables *d_1* through *d_N* where *N* equals *dim*. (These variables match what is returned from :func:`pymbolic.primitives.make_sym_vector` with argument `"d"`.) :arg global_scaling_const: A constant :mod:`pymbolic` expression for the global scaling of the kernel. Typically, this ensures that the kernel is scaled so that :math:`\mathcal L(G)(x)=C\delta(x)` with a constant of 1, where :math:`\mathcal L` is the PDE operator associated with the kernel. Not to be confused with *rscale*, which keeps expansion coefficients benignly scaled. """ # expression and global_scaling_const are pymbolic objects because # those pickle cleanly. D'oh, sympy! Kernel.__init__(self, dim) self.expression = expression self.global_scaling_const = global_scaling_const self.is_complex_valued = is_complex_valued def __getinitargs__(self): return (self.dim, self.expression, self.global_scaling_const, self.is_complex_valued) def __repr__(self): return "ExprKnl%dD" % self.dim def get_expression(self, scaled_dist_vec): from sumpy.symbolic import PymbolicToSympyMapperWithSymbols expr = PymbolicToSympyMapperWithSymbols()(self.expression) if self.dim != len(scaled_dist_vec): raise ValueError("dist_vec length does not match expected dimension") from sumpy.symbolic import Symbol expr = expr.xreplace({ Symbol("d%d" % i): dist_vec_i for i, dist_vec_i in enumerate(scaled_dist_vec) }) return expr def get_global_scaling_const(self): """Return a global scaling of the kernel.""" from sumpy.symbolic import PymbolicToSympyMapperWithSymbols return PymbolicToSympyMapperWithSymbols()( self.global_scaling_const) def update_persistent_hash(self, key_hash, key_builder): key_hash.update(type(self).__name__.encode("utf8")) for name, value in zip(self.init_arg_names, self.__getinitargs__()): if name in ["expression", "global_scaling_const"]: from pymbolic.mapper.persistent_hash import ( PersistentHashWalkMapper as PersistentHashWalkMapper) PersistentHashWalkMapper(key_hash)(value) else: key_builder.rec(key_hash, value) mapper_method = "map_expression_kernel" def get_derivative_taker(self, dvec, rscale, sac): """Return a :class:`sumpy.tools.ExprDerivativeTaker` instance that supports taking derivatives of the base kernel with respect to dvec. """ from sumpy.tools import ExprDerivativeTaker return ExprDerivativeTaker(self.get_expression(dvec), dvec, rscale, sac) def get_pde_as_diff_op(self): r""" Returns the PDE for the kernel as a :class:`sumpy.expansion.diff_op.LinearPDESystemOperator` object `L` where `L(u) = 0` is the PDE. """ raise NotImplementedError
one_kernel_2d = ExpressionKernel( dim=2, expression=1, global_scaling_const=1, is_complex_valued=False) one_kernel_3d = ExpressionKernel( dim=3, expression=1, global_scaling_const=1, is_complex_valued=False) # {{{ PDE kernels
[docs]class LaplaceKernel(ExpressionKernel): init_arg_names = ("dim",) def __init__(self, dim): # See (Kress LIE, Thm 6.2) for scaling if dim == 2: r = pymbolic_real_norm_2(make_sym_vector("d", dim)) expr = var("log")(r) scaling = 1/(-2*var("pi")) elif dim == 3: r = pymbolic_real_norm_2(make_sym_vector("d", dim)) expr = 1/r scaling = 1/(4*var("pi")) else: raise NotImplementedError("unsupported dimensionality") super().__init__( dim, expression=expr, global_scaling_const=scaling, is_complex_valued=False) def __getinitargs__(self): return (self.dim,) def __repr__(self): return "LapKnl%dD" % self.dim mapper_method = "map_laplace_kernel" def get_derivative_taker(self, dvec, rscale, sac): """Return a :class:`sumpy.tools.ExprDerivativeTaker` instance that supports taking derivatives of the base kernel with respect to dvec. """ from sumpy.tools import LaplaceDerivativeTaker return LaplaceDerivativeTaker(self.get_expression(dvec), dvec, rscale, sac) def get_pde_as_diff_op(self): from sumpy.expansion.diff_op import make_identity_diff_op, laplacian w = make_identity_diff_op(self.dim) return laplacian(w)
[docs]class BiharmonicKernel(ExpressionKernel): init_arg_names = ("dim",) def __init__(self, dim): r = pymbolic_real_norm_2(make_sym_vector("d", dim)) if dim == 2: # Ref: Farkas, Peter. Mathematical foundations for fast algorithms # for the biharmonic equation. Technical Report 765, # Department of Computer Science, Yale University, 1990. expr = r**2 * var("log")(r) scaling = 1/(8*var("pi")) elif dim == 3: # Ref: Jiang, Shidong, Bo Ren, Paul Tsuji, and Lexing Ying. # "Second kind integral equations for the first kind Dirichlet problem # of the biharmonic equation in three dimensions." # Journal of Computational Physics 230, no. 19 (2011): 7488-7501. expr = r scaling = -1/(8*var("pi")) else: raise RuntimeError("unsupported dimensionality") super().__init__( dim, expression=expr, global_scaling_const=scaling, is_complex_valued=False) def __getinitargs__(self): return (self.dim,) def __repr__(self): return "BiharmKnl%dD" % self.dim mapper_method = "map_biharmonic_kernel" def get_derivative_taker(self, dvec, rscale, sac): """Return a :class:`sumpy.tools.ExprDerivativeTaker` instance that supports taking derivatives of the base kernel with respect to dvec. """ from sumpy.tools import RadialDerivativeTaker return RadialDerivativeTaker(self.get_expression(dvec), dvec, rscale, sac) def get_pde_as_diff_op(self): from sumpy.expansion.diff_op import make_identity_diff_op, laplacian w = make_identity_diff_op(self.dim) return laplacian(laplacian(w))
[docs]class HelmholtzKernel(ExpressionKernel): init_arg_names = ("dim", "helmholtz_k_name", "allow_evanescent") def __init__(self, dim, helmholtz_k_name="k", allow_evanescent=False): """ :arg helmholtz_k_name: The argument name to use for the Helmholtz parameter when generating functions to evaluate this kernel. """ k = var(helmholtz_k_name) # Guard against code using the old positional interface. assert isinstance(allow_evanescent, bool) if dim == 2: r = pymbolic_real_norm_2(make_sym_vector("d", dim)) expr = var("hankel_1")(0, k*r) scaling = var("I")/4 elif dim == 3: r = pymbolic_real_norm_2(make_sym_vector("d", dim)) expr = var("exp")(var("I")*k*r)/r scaling = 1/(4*var("pi")) else: raise RuntimeError("unsupported dimensionality") super().__init__( dim, expression=expr, global_scaling_const=scaling, is_complex_valued=True) self.helmholtz_k_name = helmholtz_k_name self.allow_evanescent = allow_evanescent def __getinitargs__(self): return (self.dim, self.helmholtz_k_name, self.allow_evanescent) def update_persistent_hash(self, key_hash, key_builder): key_hash.update(type(self).__name__.encode("utf8")) key_builder.rec(key_hash, (self.dim, self.helmholtz_k_name, self.allow_evanescent)) def __repr__(self): return "HelmKnl%dD(%s)" % ( self.dim, self.helmholtz_k_name) def prepare_loopy_kernel(self, loopy_knl): from sumpy.codegen import register_bessel_callables return register_bessel_callables(loopy_knl) def get_args(self): if self.allow_evanescent: k_dtype = np.complex128 else: k_dtype = np.float64 return [ KernelArgument( loopy_arg=lp.ValueArg(self.helmholtz_k_name, k_dtype), )] mapper_method = "map_helmholtz_kernel" def get_derivative_taker(self, dvec, rscale, sac): """Return a :class:`sumpy.tools.ExprDerivativeTaker` instance that supports taking derivatives of the base kernel with respect to dvec. """ from sumpy.tools import HelmholtzDerivativeTaker return HelmholtzDerivativeTaker(self.get_expression(dvec), dvec, rscale, sac) def get_pde_as_diff_op(self): from sumpy.expansion.diff_op import make_identity_diff_op, laplacian w = make_identity_diff_op(self.dim) k = sym.Symbol(self.helmholtz_k_name) return (laplacian(w) + k**2 * w)
[docs]class YukawaKernel(ExpressionKernel): init_arg_names = ("dim", "yukawa_lambda_name") def __init__(self, dim, yukawa_lambda_name="lam"): """ :arg yukawa_lambda_name: The argument name to use for the Yukawa parameter when generating functions to evaluate this kernel. """ lam = var(yukawa_lambda_name) # NOTE: The Yukawa kernel is given by [1] # -1/(2 pi)**(n/2) * (lam/r)**(n/2-1) * K(n/2-1, lam r) # where K is a modified Bessel function of the second kind. # # [1] https://en.wikipedia.org/wiki/Green%27s_function # [2] https://dlmf.nist.gov/10.27#E8 # [3] https://dlmf.nist.gov/10.47#E2 # [4] https://dlmf.nist.gov/10.49 r = pymbolic_real_norm_2(make_sym_vector("d", dim)) if dim == 2: # NOTE: transform K(0, lam r) into a Hankel function using [2] expr = var("hankel_1")(0, var("I")*lam*r) scaling_for_K0 = var("pi")/2*var("I") # noqa: N806 scaling = -1/(2*var("pi")) * scaling_for_K0 elif dim == 3: # NOTE: to get the expression, we do the following and simplify # 1. express K(1/2, lam r) as a modified spherical Bessel function # k(0, lam r) using [3] and use expression for k(0, lam r) from [4] # 2. or use (AS 10.2.17) directly expr = var("exp")(-lam*r) / r scaling = -1/(4 * var("pi")**2) else: raise RuntimeError("unsupported dimensionality") super().__init__( dim, expression=expr, global_scaling_const=scaling, is_complex_valued=True) self.yukawa_lambda_name = yukawa_lambda_name def __getinitargs__(self): return (self.dim, self.yukawa_lambda_name) def update_persistent_hash(self, key_hash, key_builder): key_hash.update(type(self).__name__.encode("utf8")) key_builder.rec(key_hash, (self.dim, self.yukawa_lambda_name)) def __repr__(self): return "YukKnl%dD(%s)" % ( self.dim, self.yukawa_lambda_name) def prepare_loopy_kernel(self, loopy_knl): from sumpy.codegen import register_bessel_callables return register_bessel_callables(loopy_knl) def get_args(self): return [ KernelArgument( loopy_arg=lp.ValueArg(self.yukawa_lambda_name, np.float64), )] mapper_method = "map_yukawa_kernel" def get_derivative_taker(self, dvec, rscale, sac): """Return a :class:`sumpy.tools.ExprDerivativeTaker` instance that supports taking derivatives of the base kernel with respect to dvec. """ from sumpy.tools import HelmholtzDerivativeTaker return HelmholtzDerivativeTaker(self.get_expression(dvec), dvec, rscale, sac) def get_pde_as_diff_op(self): from sumpy.expansion.diff_op import make_identity_diff_op, laplacian w = make_identity_diff_op(self.dim) lam = sym.Symbol(self.yukawa_lambda_name) return (laplacian(w) - lam**2 * w)
[docs]class ElasticityKernel(ExpressionKernel): init_arg_names = ("dim", "icomp", "jcomp", "viscosity_mu", "poisson_ratio") def __new__(cls, dim, icomp, jcomp, viscosity_mu="mu", poisson_ratio="nu"): if poisson_ratio == 0.5: instance = super(ElasticityKernel, cls).__new__(StokesletKernel) else: instance = super(ElasticityKernel, cls).__new__(cls) return instance def __init__(self, dim, icomp, jcomp, viscosity_mu="mu", poisson_ratio="nu"): r""" :arg viscosity_mu: The argument name to use for dynamic viscosity :math:`\mu` when generating functions to evaluate this kernel. Can also be a numeric value. :arg poisson_ratio: The argument name to use for Poisson's ratio :math:`\nu` when generating functions to evaluate this kernel. Can also be a numeric value. """ if isinstance(viscosity_mu, str): mu = parse(viscosity_mu) else: mu = viscosity_mu if isinstance(poisson_ratio, str): nu = parse(poisson_ratio) else: nu = poisson_ratio if dim == 2: d = make_sym_vector("d", dim) r = pymbolic_real_norm_2(d) # See (Berger and Karageorghis 2001) expr = ( -var("log")(r)*((3 - 4 * nu) if icomp == jcomp else 0) + # noqa: W504 d[icomp]*d[jcomp]/r**2 ) scaling = -1/(8*var("pi")*(1 - nu)*mu) elif dim == 3: d = make_sym_vector("d", dim) r = pymbolic_real_norm_2(d) # Kelvin solution expr = ( (1/r)*((3 - 4*nu) if icomp == jcomp else 0) + # noqa: W504 d[icomp]*d[jcomp]/r**3 ) scaling = -1/(16*var("pi")*(1 - nu)*mu) else: raise RuntimeError("unsupported dimensionality") self.viscosity_mu = mu self.poisson_ratio = nu self.icomp = icomp self.jcomp = jcomp super().__init__( dim, expression=expr, global_scaling_const=scaling, is_complex_valued=False) def __getinitargs__(self): return (self.dim, self.icomp, self.jcomp, self.viscosity_mu, self.poisson_ratio) def __reduce__(self): return (ElasticityKernel, self.__getinitargs__()) def update_persistent_hash(self, key_hash, key_builder): from pymbolic.mapper.persistent_hash import PersistentHashWalkMapper key_hash.update(type(self).__name__.encode()) key_builder.rec(key_hash, (self.dim, self.icomp, self.jcomp)) mapper = PersistentHashWalkMapper(key_hash) mapper(self.viscosity_mu) mapper(self.poisson_ratio) def __repr__(self): return "ElasticityKnl%dD_%d%d" % (self.dim, self.icomp, self.jcomp) @memoize_method def get_args(self): from sumpy.tools import get_all_variables variables = get_all_variables(self.viscosity_mu) res = [] for v in variables: res.append(KernelArgument(loopy_arg=lp.ValueArg(v.name, np.float64))) return res + self.get_source_args() @memoize_method def get_source_args(self): from sumpy.tools import get_all_variables variables = get_all_variables(self.poisson_ratio) res = [] for v in variables: res.append(KernelArgument(loopy_arg=lp.ValueArg(v.name, np.float64))) return res mapper_method = "map_elasticity_kernel" def get_pde_as_diff_op(self): from sumpy.expansion.diff_op import make_identity_diff_op, laplacian w = make_identity_diff_op(self.dim) return laplacian(laplacian(w))
[docs]class StokesletKernel(ElasticityKernel): def __new__(cls, dim, icomp, jcomp, viscosity_mu="mu", poisson_ratio="0.5"): return object.__new__(cls) def __init__(self, dim, icomp, jcomp, viscosity_mu="mu", poisson_ratio=0.5): super().__init__(dim, icomp, jcomp, viscosity_mu, poisson_ratio) def __repr__(self): return "StokesletKnl%dD_%d%d" % (self.dim, self.icomp, self.jcomp)
[docs]class StressletKernel(ExpressionKernel): init_arg_names = ("dim", "icomp", "jcomp", "kcomp", "viscosity_mu") def __init__(self, dim, icomp, jcomp, kcomp, viscosity_mu="mu"): r""" :arg viscosity_mu: The argument name to use for dynamic viscosity :math:`\mu` the then generating functions to evaluate this kernel. """ # Mu is unused but kept for consistency with the stokeslet. if dim == 2: d = make_sym_vector("d", dim) r = pymbolic_real_norm_2(d) expr = ( d[icomp]*d[jcomp]*d[kcomp]/r**4 ) scaling = 1/(var("pi")) elif dim == 3: d = make_sym_vector("d", dim) r = pymbolic_real_norm_2(d) expr = ( d[icomp]*d[jcomp]*d[kcomp]/r**5 ) scaling = 3/(4*var("pi")) else: raise RuntimeError("unsupported dimensionality") self.icomp = icomp self.jcomp = jcomp self.kcomp = kcomp self.viscosity_mu = viscosity_mu super().__init__( dim, expression=expr, global_scaling_const=scaling, is_complex_valued=False) def __getinitargs__(self): return (self.dim, self.icomp, self.jcomp, self.kcomp, self.viscosity_mu) def update_persistent_hash(self, key_hash, key_builder): key_hash.update(type(self).__name__.encode()) key_builder.rec(key_hash, (self.dim, self.icomp, self.jcomp, self.kcomp, self.viscosity_mu)) def __repr__(self): return "StressletKnl%dD_%d%d%d" % (self.dim, self.icomp, self.jcomp, self.kcomp) mapper_method = "map_stresslet_kernel" def get_pde_as_diff_op(self): from sumpy.expansion.diff_op import make_identity_diff_op, laplacian w = make_identity_diff_op(self.dim) return laplacian(laplacian(w)) def get_args(self): return [ KernelArgument( loopy_arg=lp.ValueArg(self.viscosity_mu, np.float64), ) ]
[docs]class LineOfCompressionKernel(ExpressionKernel): """A kernel for the line of compression or dilatation of constant strength along the axis "axis" from zero to negative infinity. This is used for the explicit solution to half-space Elasticity problem. See [1] for details. [1]: Mindlin, R.: Force at a Point in the Interior of a Semi-Infinite Solid https://doi.org/10.1063/1.1745385 """ init_arg_names = ("dim", "axis", "viscosity_mu", "poisson_ratio") def __init__(self, dim=3, axis=2, viscosity_mu="mu", poisson_ratio="nu"): r""" :arg axis: axis number defaulting to 2 for the z axis. :arg viscosity_mu: The argument name to use for dynamic viscosity :math:`\mu` when generating functions to evaluate this kernel. Can also be a numeric value. :arg poisson_ratio: The argument name to use for Poisson's ratio :math:`\nu` when generating functions to evaluate this kernel. Can also be a numeric value. """ if isinstance(viscosity_mu, str): mu = parse(viscosity_mu) else: mu = viscosity_mu if isinstance(poisson_ratio, str): nu = parse(poisson_ratio) else: nu = poisson_ratio if dim == 3: d = make_sym_vector("d", dim) r = pymbolic_real_norm_2(d) # Kelvin solution expr = d[axis] * var("log")(r + d[axis]) - r scaling = (1 - 2*nu)/(4*var("pi")*mu) else: raise RuntimeError("unsupported dimensionality") self.viscosity_mu = mu self.poisson_ratio = nu self.axis = axis super().__init__( dim, expression=expr, global_scaling_const=scaling, is_complex_valued=False) def __getinitargs__(self): return (self.dim, self.axis, self.viscosity_mu, self.poisson_ratio) def update_persistent_hash(self, key_hash, key_builder): from pymbolic.mapper.persistent_hash import PersistentHashWalkMapper key_hash.update(type(self).__name__.encode()) key_builder.rec(key_hash, (self.dim, self.axis)) mapper = PersistentHashWalkMapper(key_hash) mapper(self.viscosity_mu) mapper(self.poisson_ratio) def __repr__(self): return "LineOfCompressionKnl%dD_%d" % (self.dim, self.axis) @memoize_method def get_args(self): from sumpy.tools import get_all_variables variables = list(get_all_variables(self.viscosity_mu)) \ + list(get_all_variables(self.poisson_ratio)) res = [] for v in variables: res.append(KernelArgument(loopy_arg=lp.ValueArg(v.name, np.float64))) return res mapper_method = "map_line_of_compression_kernel" def get_pde_as_diff_op(self): from sumpy.expansion.diff_op import make_identity_diff_op, laplacian w = make_identity_diff_op(self.dim) return laplacian(w)
# }}} # {{{ a kernel defined as wrapping another one--e.g., derivatives class KernelWrapper(Kernel): def __init__(self, inner_kernel): Kernel.__init__(self, inner_kernel.dim) self.inner_kernel = inner_kernel def get_base_kernel(self): return self.inner_kernel.get_base_kernel() def prepare_loopy_kernel(self, loopy_knl): return self.inner_kernel.prepare_loopy_kernel(loopy_knl) @property def is_complex_valued(self): return self.inner_kernel.is_complex_valued def get_expression(self, scaled_dist_vec): return self.inner_kernel.get_expression(scaled_dist_vec) def get_derivative_transformation_at_source(self, expr_dict): return self.inner_kernel.get_derivative_transformation_at_source(expr_dict) def get_derivative_transformation_at_target(self, expr_dict): return self.inner_kernel.get_derivative_transformation_at_target(expr_dict) def get_global_scaling_const(self): return self.inner_kernel.get_global_scaling_const() def get_code_transformer(self): return self.inner_kernel.get_code_transformer() def get_args(self): return self.inner_kernel.get_args() def get_source_args(self): return self.inner_kernel.get_source_args() def replace_base_kernel(self, new_base_kernel): raise NotImplementedError("replace_base_kernel is not implemented " "for this wrapper.") def get_derivative_taker(self, dvec, rscale, sac): return self.inner_kernel.get_derivative_taker(dvec, rscale, sac) # }}} # {{{ derivatives
[docs]class DerivativeBase(KernelWrapper): pass
[docs]class AxisSourceDerivative(DerivativeBase): init_arg_names = ("axis", "inner_kernel") def __init__(self, axis, inner_kernel): KernelWrapper.__init__(self, inner_kernel) self.axis = axis def __getinitargs__(self): return (self.axis, self.inner_kernel) def __str__(self): return "d/dy%d %s" % (self.axis, self.inner_kernel) def __repr__(self): return "AxisSourceDerivative(%d, %r)" % (self.axis, self.inner_kernel) def get_derivative_transformation_at_source(self, expr_dict): expr_dict = self.inner_kernel.get_derivative_transformation_at_source( expr_dict) result = dict() for mi, coeff in expr_dict.items(): new_mi = list(mi) new_mi[self.axis] += 1 result[tuple(new_mi)] = -coeff return result def replace_base_kernel(self, new_base_kernel): return type(self)(self.axis, self.inner_kernel.replace_base_kernel(new_base_kernel)) def replace_inner_kernel(self, new_inner_kernel): return type(self)(self.axis, new_inner_kernel) mapper_method = "map_axis_source_derivative"
[docs]class AxisTargetDerivative(DerivativeBase): init_arg_names = ("axis", "inner_kernel") def __init__(self, axis, inner_kernel): KernelWrapper.__init__(self, inner_kernel) self.axis = axis def __getinitargs__(self): return (self.axis, self.inner_kernel) def __str__(self): return "d/dx%d %s" % (self.axis, self.inner_kernel) def __repr__(self): return "AxisTargetDerivative(%d, %r)" % (self.axis, self.inner_kernel) def get_derivative_transformation_at_target(self, expr_dict): expr_dict = self.inner_kernel.get_derivative_transformation_at_target( expr_dict) result = dict() for mi, coeff in expr_dict.items(): new_mi = list(mi) new_mi[self.axis] += 1 result[tuple(new_mi)] = coeff return result def replace_base_kernel(self, new_base_kernel): return type(self)(self.axis, self.inner_kernel.replace_base_kernel(new_base_kernel)) def replace_inner_kernel(self, new_inner_kernel): return type(self)(self.axis, new_inner_kernel) mapper_method = "map_axis_target_derivative"
class _VectorIndexAdder(CSECachingMapperMixin, IdentityMapper): def __init__(self, vec_name, additional_indices): self.vec_name = vec_name self.additional_indices = additional_indices def map_subscript(self, expr): from pymbolic.primitives import CommonSubexpression if expr.aggregate.name == self.vec_name \ and isinstance(expr.index, int): return CommonSubexpression(expr.aggregate.index( (expr.index,) + self.additional_indices)) else: return IdentityMapper.map_subscript(self, expr) map_common_subexpression_uncached = IdentityMapper.map_common_subexpression class DirectionalDerivative(DerivativeBase): init_arg_names = ("inner_kernel", "dir_vec_name") def __init__(self, inner_kernel, dir_vec_name=None): if dir_vec_name is None: dir_vec_name = self.directional_kind + "_derivative_dir" else: from warnings import warn warn("specified the name of the direction vector", stacklevel=2) KernelWrapper.__init__(self, inner_kernel) self.dir_vec_name = dir_vec_name def __getinitargs__(self): return (self.inner_kernel, self.dir_vec_name) def update_persistent_hash(self, key_hash, key_builder): key_hash.update(type(self).__name__.encode("utf8")) key_builder.rec(key_hash, self.inner_kernel) key_builder.rec(key_hash, self.dir_vec_name) def replace_base_kernel(self, new_base_kernel): return type(self)(self.inner_kernel.replace_base_kernel(new_base_kernel), dir_vec_name=self.dir_vec_name) def __str__(self): return r"{} . \/_{} {}".format( self.dir_vec_name, self.directional_kind[0], self.inner_kernel) def __repr__(self): return "{}({!r}, {})".format( type(self).__name__, self.inner_kernel, self.dir_vec_name)
[docs]class DirectionalTargetDerivative(DirectionalDerivative): directional_kind = "tgt" def get_code_transformer(self): from sumpy.codegen import VectorComponentRewriter vcr = VectorComponentRewriter([self.dir_vec_name]) from pymbolic.primitives import Variable via = _VectorIndexAdder(self.dir_vec_name, (Variable("itgt"),)) def transform(expr): return via(vcr(expr)) return transform def get_derivative_transformation_at_target(self, expr_dict): from sumpy.symbolic import make_sym_vector as make_sympy_vector dir_vec = make_sympy_vector(self.dir_vec_name, self.dim) expr_dict = self.inner_kernel.get_derivative_transformation_at_target( expr_dict) # bvec = tgt - center result = defaultdict(lambda: 0) for mi, coeff in expr_dict.items(): for axis in range(self.dim): new_mi = list(mi) new_mi[axis] += 1 result[tuple(new_mi)] += coeff * dir_vec[axis] return result def get_source_args(self): return [ KernelArgument( loopy_arg=lp.GlobalArg( self.dir_vec_name, None, shape=(self.dim, "ntargets"), dim_tags="sep,C"), ) ] + self.inner_kernel.get_source_args() mapper_method = "map_directional_target_derivative"
[docs]class DirectionalSourceDerivative(DirectionalDerivative): directional_kind = "src" def get_code_transformer(self): inner = self.inner_kernel.get_code_transformer() from sumpy.codegen import VectorComponentRewriter vcr = VectorComponentRewriter([self.dir_vec_name]) from pymbolic.primitives import Variable via = _VectorIndexAdder(self.dir_vec_name, (Variable("isrc"),)) def transform(expr): return via(vcr(inner(expr))) return transform def get_derivative_transformation_at_source(self, expr_dict): from sumpy.symbolic import make_sym_vector as make_sympy_vector dir_vec = make_sympy_vector(self.dir_vec_name, self.dim) expr_dict = self.inner_kernel.get_derivative_transformation_at_source( expr_dict) # avec = center-src -> minus sign from chain rule result = defaultdict(lambda: 0) for mi, coeff in expr_dict.items(): for axis in range(self.dim): new_mi = list(mi) new_mi[axis] += 1 result[tuple(new_mi)] += -coeff * dir_vec[axis] return result def get_source_args(self): return [ KernelArgument( loopy_arg=lp.GlobalArg( self.dir_vec_name, None, shape=(self.dim, "nsources"), dim_tags="sep,C"), ) ] + self.inner_kernel.get_source_args() mapper_method = "map_directional_source_derivative"
class TargetPointMultiplier(KernelWrapper): """Wraps a kernel :math:`G(x, y)` and outputs :math:`x_j G(x, y)` where :math:`x, y` are targets and sources respectively. """ init_arg_names = ("axis", "inner_kernel") target_array_name = "targets" def __init__(self, axis, inner_kernel): KernelWrapper.__init__(self, inner_kernel) self.axis = axis def __getinitargs__(self): return (self.axis, self.inner_kernel) def __str__(self): return "x%d %s" % (self.axis, self.inner_kernel) def __repr__(self): return "TargetPointMultiplier(%d, %r)" % (self.axis, self.inner_kernel) def replace_base_kernel(self, new_base_kernel): return type(self)(self.axis, self.inner_kernel.replace_base_kernel(new_base_kernel)) def replace_inner_kernel(self, new_inner_kernel): return type(self)(self.axis, new_inner_kernel) def postprocess_at_target(self, expr, avec): from sumpy.symbolic import make_sym_vector as make_sympy_vector from sumpy.tools import (ExprDerivativeTaker, DifferentiatedExprDerivativeTaker) expr = self.inner_kernel.postprocess_at_target(expr, avec) target_vec = make_sympy_vector(self.target_array_name, self.dim) if isinstance(expr, (ExprDerivativeTaker, DifferentiatedExprDerivativeTaker)): class DerivativeTakerWrapper: def __init__(self, taker, vec, axis): self.taker = taker self.vec = vec self.axis = axis def diff(self, *args, **kwargs): return self.vec[self.axis] * self.taker.diff(*args, **kwargs) return DerivativeTakerWrapper(expr, target_vec, self.axis) return target_vec[self.axis] * expr def get_code_transformer(self): from sumpy.codegen import VectorComponentRewriter vcr = VectorComponentRewriter([self.target_array_name]) from pymbolic.primitives import Variable via = _VectorIndexAdder(self.target_array_name, (Variable("itgt"),)) def transform(expr): return via(vcr(expr)) return transform mapper_method = "map_target_point_multiplier" # }}} # {{{ kernel mappers
[docs]class KernelMapper: def rec(self, kernel): try: method = getattr(self, kernel.mapper_method) except AttributeError: raise RuntimeError("{} cannot handle {}".format( type(self), type(kernel))) else: return method(kernel) __call__ = rec
[docs]class KernelCombineMapper(KernelMapper): def map_difference_kernel(self, kernel): return self.combine([ self.rec(kernel.kernel_plus), self.rec(kernel.kernel_minus)]) def map_axis_target_derivative(self, kernel): return self.rec(kernel.inner_kernel) map_directional_target_derivative = map_axis_target_derivative map_directional_source_derivative = map_axis_target_derivative map_axis_source_derivative = map_axis_target_derivative map_target_point_multiplier = map_axis_target_derivative
[docs]class KernelIdentityMapper(KernelMapper): def map_expression_kernel(self, kernel): return kernel map_laplace_kernel = map_expression_kernel map_biharmonic_kernel = map_expression_kernel map_helmholtz_kernel = map_expression_kernel map_yukawa_kernel = map_expression_kernel map_elasticity_kernel = map_expression_kernel map_line_of_compression_kernel = map_expression_kernel map_stresslet_kernel = map_expression_kernel def map_axis_target_derivative(self, kernel): return type(kernel)(kernel.axis, self.rec(kernel.inner_kernel)) map_axis_source_derivative = map_axis_target_derivative map_target_point_multiplier = map_axis_target_derivative def map_directional_target_derivative(self, kernel): return type(kernel)( self.rec(kernel.inner_kernel), kernel.dir_vec_name) map_directional_source_derivative = map_directional_target_derivative
[docs]class AxisSourceDerivativeRemover(KernelIdentityMapper): def map_axis_source_derivative(self, kernel): return self.rec(kernel.inner_kernel)
[docs]class AxisTargetDerivativeRemover(KernelIdentityMapper): def map_axis_target_derivative(self, kernel): return self.rec(kernel.inner_kernel)
[docs]class TargetDerivativeRemover(AxisTargetDerivativeRemover): def map_directional_target_derivative(self, kernel): return self.rec(kernel.inner_kernel)
[docs]class SourceDerivativeRemover(AxisSourceDerivativeRemover): def map_directional_source_derivative(self, kernel): return self.rec(kernel.inner_kernel)
class TargetTransformationRemover(TargetDerivativeRemover): def map_target_point_multiplier(self, kernel): return self.rec(kernel.inner_kernel) SourceTransformationRemover = SourceDerivativeRemover
[docs]class DerivativeCounter(KernelCombineMapper): def combine(self, values): return max(values) def map_expression_kernel(self, kernel): return 0 map_laplace_kernel = map_expression_kernel map_biharmonic_kernel = map_expression_kernel map_helmholtz_kernel = map_expression_kernel map_yukawa_kernel = map_expression_kernel map_line_of_compression_kernel = map_expression_kernel map_stresslet_kernel = map_expression_kernel def map_axis_target_derivative(self, kernel): return 1 + self.rec(kernel.inner_kernel) map_directional_target_derivative = map_axis_target_derivative map_directional_source_derivative = map_axis_target_derivative map_axis_source_derivative = map_axis_target_derivative
# }}} def to_kernel_and_args(kernel_like): if (isinstance(kernel_like, tuple) and len(kernel_like) == 2 and isinstance(kernel_like[0], Kernel)): # already gone through to_kernel_and_args return kernel_like if not isinstance(kernel_like, Kernel): if kernel_like == 0: return LaplaceKernel(), {} elif isinstance(kernel_like, str): return HelmholtzKernel(None), {"k": var(kernel_like)} else: raise ValueError("Only Kernel instances, 0 (for Laplace) and " "variable names (strings) " "for the Helmholtz parameter are allowed as kernels.") return kernel_like, {} # vim: fdm=marker