Package hedge :: Package backends :: Package mpi
[hide private]
[frames] | no frames]

Source Code for Package hedge.backends.mpi

  1  """MPI-based distributed-memory parallelism support""" 
  2   
  3  from __future__ import division 
  4   
  5  __copyright__ = "Copyright (C) 2007 Andreas Kloeckner" 
  6   
  7  __license__ = """ 
  8  This program is free software: you can redistribute it and/or modify 
  9  it under the terms of the GNU General Public License as published by 
 10  the Free Software Foundation, either version 3 of the License, or 
 11  (at your option) any later version. 
 12   
 13  This program is distributed in the hope that it will be useful, 
 14  but WITHOUT ANY WARRANTY; without even the implied warranty of 
 15  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 16  GNU General Public License for more details. 
 17   
 18  You should have received a copy of the GNU General Public License 
 19  along with this program.  If not, see U{http://www.gnu.org/licenses/}. 
 20  """ 
 21   
 22   
 23   
 24   
 25  import pytools 
 26  import numpy 
 27  import numpy.linalg as la 
 28  import hedge.discretization 
 29  import hedge.mesh 
 30  from hedge.optemplate import \ 
 31          IdentityMapper, \ 
 32          FluxOpReducerMixin 
 33  from hedge.tools import Future 
 34  from hedge.backends import RunContext 
 35  import boostmpi as mpi 
36 37 38 39 40 -class RankData(pytools.Record):
41 - def __init__(self, 42 mesh, 43 global2local_elements, 44 global2local_vertex_indices, 45 neighbor_ranks, 46 global_periodic_opposite_faces, 47 old_el_numbers=None 48 ):
49 pytools.Record.__init__(self, locals())
50
51 - def reordered_by(self, *args, **kwargs):
52 old_el_numbers = self.mesh.get_reorder_oldnumbers(*args, **kwargs) 53 mesh = self.mesh.reordered(old_el_numbers) 54 return self.copy( 55 mesh=mesh, 56 old_el_numbers=old_el_numbers 57 )
58
59 60 61 62 -class MPIRunContext(RunContext):
63 - def __init__(self, communicator, discr_class):
64 self.communicator = communicator 65 self.discr_class = discr_class
66 67 @property
68 - def rank(self):
69 return self.communicator.rank
70 71 @property
72 - def ranks(self):
73 return range(0, self.communicator.size)
74 75 @property
76 - def head_rank(self):
77 return 0
78
79 - def distribute_mesh(self, mesh, partition=None):
80 assert self.is_head_rank 81 82 if partition is None: 83 partition = len(self.ranks) 84 85 # compute partition using Metis, if necessary 86 if isinstance(partition, int): 87 from pymetis import part_graph 88 dummy, partition = part_graph(partition, 89 mesh.element_adjacency_graph()) 90 91 from hedge.partition import partition_mesh 92 from hedge.mesh import TAG_RANK_BOUNDARY 93 for part_data in partition_mesh( 94 mesh, partition, part_bdry_tag_factory=TAG_RANK_BOUNDARY): 95 96 rank_data = RankData( 97 mesh=part_data.mesh, 98 global2local_elements=part_data.global2local_elements, 99 global2local_vertex_indices=part_data.global2local_vertex_indices, 100 neighbor_ranks=part_data.neighbor_parts, 101 global_periodic_opposite_faces=part_data.global_periodic_opposite_faces) 102 103 rank = part_data.part_nr 104 105 if rank == self.head_rank: 106 result = rank_data 107 else: 108 print "send rank", rank 109 self.communicator.send(rank, 0, rank_data) 110 print "end send", rank 111 112 return result
113
114 - def receive_mesh(self):
115 return self.communicator.recv(self.head_rank, 0) 116 print "receive end rank", self.rank
117
118 - def make_discretization(self, mesh_data, *args, **kwargs):
119 return ParallelDiscretization(self, self.discr_class, mesh_data, *args, **kwargs)
120
121 122 123 124 # Subtlety here: The vectors for isend and irecv need to stay allocated 125 # for as long as the request is not completed. The wrapper aids this 126 # by making sure the vector outlives the request by using Boost.Python's 127 # with_custodian_and_ward mechanism. However, this "life support" gets 128 # eliminated if the only reference to the request is from inside a 129 # RequestList, so we need to provide our own life support for these vectors. 130 131 132 133 134 -class BoundarizeSendFuture(Future):
135 - def __init__(self, pdiscr, rank, field):
136 self.pdiscr = pdiscr 137 self.rank = rank 138 139 from hedge.mesh import TAG_RANK_BOUNDARY 140 self.bdry_future = pdiscr.boundarize_volume_field_async( 141 field, TAG_RANK_BOUNDARY(rank), kind="numpy") 142 143 self.is_ready = self.bdry_future.is_ready
144
145 - def __call__(self):
146 return [], [SendCompletionFuture( 147 self.pdiscr.context.communicator, self.rank, 148 self.bdry_future(), self.pdiscr)]
149
150 151 152 153 -class MPICompletionFuture(Future):
154 - def __init__(self, request):
155 self.request = request 156 self.result = None
157
158 - def is_ready(self):
159 if self.request is not None: 160 status = self.request.test() 161 if status is not None: 162 self.result = self.finish(status) 163 self.request = None 164 return True 165 166 return False
167
168 - def __call__(self):
169 if self.request is not None: 170 status = self.request.wait() 171 return self.finish(status) 172 else: 173 return self.result
174
175 176 177 178 -class SendCompletionFuture(MPICompletionFuture):
179 - def __init__(self, comm, rank, send_vec, pdiscr):
180 self.send_vec = send_vec 181 182 assert send_vec.dtype == pdiscr.default_scalar_type 183 184 from boostmpi import isend_buffer 185 MPICompletionFuture.__init__(self, 186 isend_buffer(comm, rank, tag=1, 187 vector=send_vec))
188
189 - def finish(self, status):
190 return [], []
191
192 193 194 195 -class ReceiveCompletionFuture(MPICompletionFuture):
196 - def __init__(self, pdiscr, shape, rank, indices_and_names):
197 self.pdiscr = pdiscr 198 self.rank = rank 199 self.indices_and_names = indices_and_names 200 201 self.recv_vec = pdiscr.boundary_empty( 202 hedge.mesh.TAG_RANK_BOUNDARY(rank), 203 shape=shape, 204 kind="numpy-mpi-recv", 205 dtype=self.pdiscr.default_scalar_type) 206 from boostmpi import irecv_buffer 207 MPICompletionFuture.__init__(self, 208 irecv_buffer(pdiscr.context.communicator, 209 rank, tag=1, vector=self.recv_vec))
210
211 - def finish(self, status):
212 return [], [BoundaryConvertFuture( 213 self.pdiscr, self.rank, self.indices_and_names, 214 self.recv_vec)]
215
216 217 218 219 -class BoundaryConvertFuture(Future):
220 - def __init__(self, pdiscr, rank, indices_and_names, recv_vec):
221 self.pdiscr = pdiscr 222 self.rank = rank 223 self.indices_and_names = indices_and_names 224 self.recv_vec = recv_vec 225 226 fnm = pdiscr.from_neighbor_maps[rank] 227 228 from hedge.mesh import TAG_RANK_BOUNDARY 229 self.convert_future = self.pdiscr.convert_boundary_async( 230 self.recv_vec, TAG_RANK_BOUNDARY(rank), 231 kind=self.pdiscr.compute_kind, 232 read_map=fnm) 233 234 self.is_ready = self.convert_future.is_ready
235
236 - def __call__(self):
237 converted_vec = self.convert_future() 238 return [(name, converted_vec[idx]) 239 for idx, name in self.indices_and_names], []
240
241 242 243 244 -def make_custom_exec_mapper_class(superclass):
245 class ExecutionMapper(superclass): 246 def __init__(self, context, executor): 247 superclass.__init__(self, context, executor) 248 self.discr = executor.discr
249 250 def exec_flux_exchange_batch_assign(self, insn): 251 pdiscr = self.discr.parallel_discr 252 253 from hedge.tools import log_shape 254 255 field = self.rec(insn.field) 256 shape = log_shape(field) 257 258 if self.discr.instrumented: 259 if len(shape): 260 pdiscr.comm_flux_counter.add(len(pdiscr.neighbor_ranks)*shape[0]) 261 else: 262 pdiscr.comm_flux_counter.add(len(pdiscr.neighbor_ranks)) 263 264 return ([], 265 [BoundarizeSendFuture(pdiscr, rank, field) 266 for rank in pdiscr.neighbor_ranks] 267 + [ReceiveCompletionFuture(pdiscr, shape, rank, 268 insn.rank_to_index_and_name[rank]) 269 for rank in pdiscr.neighbor_ranks]) 270 271 return ExecutionMapper 272
273 274 275 276 -class FluxCommunicationInserter( 277 IdentityMapper, 278 FluxOpReducerMixin):
279 - def __init__(self, interacting_ranks):
280 self.interacting_ranks = interacting_ranks
281
282 - def map_operator_binding(self, expr):
283 from hedge.optemplate import \ 284 FluxOperatorBase, \ 285 BoundaryPair, OperatorBinding, \ 286 FluxExchangeOperator 287 288 if isinstance(expr, OperatorBinding): 289 if isinstance(expr.op, FluxOperatorBase): 290 if isinstance(expr.field, BoundaryPair): 291 # we're only worried about internal fluxes 292 return IdentityMapper.map_operator_binding(self, expr) 293 294 # by now we've narrowed it down to a bound interior flux 295 from pymbolic.primitives import \ 296 flattened_sum, \ 297 CommonSubexpression 298 299 def func_and_cse(func, arg_fields): 300 from hedge.tools import is_obj_array, make_obj_array 301 if is_obj_array(arg_fields): 302 return make_obj_array([ 303 CommonSubexpression( 304 OperatorBinding( 305 func(i), 306 arg_fields)) 307 for i in range(len(arg_fields))]) 308 else: 309 return CommonSubexpression( 310 OperatorBinding( 311 func(()), 312 arg_fields))
313 314 from hedge.mesh import TAG_RANK_BOUNDARY 315 316 def exchange_and_cse(rank): 317 return func_and_cse( 318 lambda i: FluxExchangeOperator(i, rank), 319 expr.field)
320 321 return flattened_sum([expr] 322 + [OperatorBinding(expr.op, BoundaryPair( 323 expr.field, 324 exchange_and_cse(rank), 325 TAG_RANK_BOUNDARY(rank))) 326 for rank in self.interacting_ranks]) 327 else: 328 return IdentityMapper.map_operator_binding(self, expr) 329
330 331 332 333 -class ParallelDiscretization(object):
334 @classmethod
335 - def my_debug_flags(cls):
336 return set([ 337 "parallel_setup", 338 ])
339 340 @classmethod
341 - def all_debug_flags(cls, subcls):
342 return cls.my_debug_flags() | subcls.all_debug_flags()
343
344 - def __init__(self, rcon, subdiscr_class, rank_data, *args, **kwargs):
345 debug = set(kwargs.pop("debug", set())) 346 self.debug = self.my_debug_flags() & debug 347 kwargs["debug"] = debug - self.debug 348 kwargs["run_context"] = rcon 349 350 self.subdiscr = subdiscr_class(rank_data.mesh, *args, **kwargs) 351 self.subdiscr.exec_mapper_class = make_custom_exec_mapper_class( 352 self.subdiscr.exec_mapper_class) 353 self.subdiscr.parallel_discr = self 354 355 self.received_bdrys = {} 356 self.context = rcon 357 358 self.global2local_vertex_indices = rank_data.global2local_vertex_indices 359 self.neighbor_ranks = rank_data.neighbor_ranks 360 self.global_periodic_opposite_faces = rank_data.global_periodic_opposite_faces 361 362 if rank_data.old_el_numbers is not None: 363 from hedge.tools import reverse_lookup_table 364 new_el_numbers = reverse_lookup_table(rank_data.old_el_numbers) 365 self.global2local_elements = dict( 366 (gi, new_el_numbers[li]) 367 for gi, li in rank_data.global2local_elements.iteritems()) 368 else: 369 self.global2local_elements = rank_data.global2local_elements 370 371 self._setup_neighbor_connections()
372
373 - def add_instrumentation(self, mgr):
374 self.subdiscr.add_instrumentation(mgr) 375 376 from pytools.log import EventCounter 377 self.comm_flux_counter = EventCounter("n_comm_flux", 378 "Number of inner flux communication runs") 379 380 mgr.add_quantity(self.comm_flux_counter)
381 382 # property forwards -------------------------------------------------------
383 - def __len__(self):
384 return len(self.subdiscr)
385
386 - def __getattr__(self, name):
387 if not name.startswith("_"): 388 return getattr(self.subdiscr, name) 389 else: 390 raise AttributeError(name)
391 392 # neighbor connectivity ---------------------------------------------------
394 comm = self.context.communicator 395 396 # Why is this barrier needed? Some of our ranks may arrive at this 397 # point early and start sending packets to ranks that are still stuck 398 # in previous wildcard-recv loops. These receivers will then be very 399 # confused by packets they didn't expect, and, once they reach their 400 # recv bit in *this* subroutine, will wait for packets that will never 401 # arrive. This same argument does not apply to other recv()s in this 402 # file because they are targeted and thus benefit from MPI's 403 # non-overtaking rule. 404 # 405 # Parallel programming is fun. 406 comm.barrier() 407 408 if self.neighbor_ranks: 409 # send interface information to neighboring ranks ----------------- 410 from pytools import reverse_dictionary 411 local2global_vertex_indices = \ 412 reverse_dictionary(self.global2local_vertex_indices) 413 414 send_requests = mpi.RequestList() 415 416 for rank in self.neighbor_ranks: 417 bdry_tag = hedge.mesh.TAG_RANK_BOUNDARY(rank) 418 rank_bdry = self.subdiscr.mesh.tag_to_boundary[bdry_tag] 419 rank_discr_boundary = self.subdiscr.get_boundary(bdry_tag) 420 421 # a list of global vertex numbers for each face 422 my_vertices_global = [ 423 tuple(local2global_vertex_indices[vi] 424 for vi in el.faces[face_nr]) 425 for el, face_nr in rank_bdry] 426 427 # a list of node coordinates, indicating the order 428 # in which nodal values will be sent, this is for 429 # testing only and could (potentially) be omitted 430 431 my_node_coords = [] 432 for el, face_nr in rank_bdry: 433 eslice, ldis = self.subdiscr.find_el_data(el.id) 434 findices = ldis.face_indices()[face_nr] 435 436 my_node_coords.append( 437 [self.nodes[eslice.start+i] for i in findices]) 438 439 # compile a list of FluxFace.h values for unification 440 # across the rank boundary 441 442 my_h_values = [rank_discr_boundary.find_facepair_side(el_face).h 443 for el_face in rank_bdry] 444 445 packet = (my_vertices_global, my_node_coords, my_h_values) 446 447 send_requests.append(comm.isend(rank, 0, packet)) 448 449 received_packets = {} 450 while len(received_packets) < len(self.neighbor_ranks): 451 received_packet, status = comm.recv(tag=0, return_status=True) 452 received_packets[status.source] = received_packet 453 454 mpi.wait_all(send_requests) 455 456 # process received packets ---------------------------------------- 457 from pytools import flatten 458 459 # nb_ stands for neighbor_ 460 461 self.from_neighbor_maps = {} 462 463 for rank, (nb_all_facevertices_global, nb_node_coords, nb_h_values) in \ 464 received_packets.iteritems(): 465 bdry_tag = hedge.mesh.TAG_RANK_BOUNDARY(rank) 466 rank_bdry = self.subdiscr.mesh.tag_to_boundary[bdry_tag] 467 rank_discr_boundary = self.subdiscr.get_boundary(bdry_tag) 468 469 flat_nb_node_coords = list(flatten(nb_node_coords)) 470 471 # step 1: find start node indices for each 472 # of the neighbor's elements 473 nb_face_starts = [0] 474 for node_coords in nb_node_coords[:-1]: 475 nb_face_starts.append( 476 nb_face_starts[-1]+len(node_coords)) 477 478 # step 2: match faces by matching vertices 479 nb_face_order = dict( 480 (frozenset(vertices), i) 481 for i, vertices in enumerate(nb_all_facevertices_global)) 482 483 # step 3: make a list of indices into the data we 484 # receive from our neighbor that'll tell us how 485 # to reshuffle them to match our node order 486 from_indices = [] 487 488 shuffled_indices_cache = {} 489 490 def get_shuffled_indices(face_node_count, shuffle_op): 491 try: 492 return shuffled_indices_cache[shuffle_op] 493 except KeyError: 494 unshuffled_indices = range(face_node_count) 495 result = shuffled_indices_cache[shuffle_op] = \ 496 shuffle_op(unshuffled_indices) 497 return result
498 499 for el, face_nr in rank_bdry: 500 eslice, ldis = self.subdiscr.find_el_data(el.id) 501 502 my_vertices = el.faces[face_nr] 503 my_global_vertices = tuple(local2global_vertex_indices[vi] 504 for vi in my_vertices) 505 506 face_node_count = ldis.face_node_count() 507 try: 508 nb_face_idx = nb_face_order[frozenset(my_global_vertices)] 509 # continue below in else part 510 except KeyError: 511 # this happens if my_global_vertices is not a permutation 512 # of the neighbor's face vertices. Periodicity is the only 513 # reason why that would be so. 514 my_vertices_there, axis = self.global_periodic_opposite_faces[ 515 my_global_vertices] 516 nb_face_idx = nb_face_order[frozenset(my_vertices_there)] 517 518 his_vertices_here, axis2 = self.global_periodic_opposite_faces[ 519 nb_all_facevertices_global[nb_face_idx]] 520 521 assert axis == axis2 522 523 nb_face_start = nb_face_starts[nb_face_idx] 524 525 shuffle_op = \ 526 ldis.get_face_index_shuffle_to_match( 527 my_global_vertices, 528 his_vertices_here) 529 530 shuffled_other_node_indices = [nb_face_start+i 531 for i in get_shuffled_indices( 532 face_node_count, shuffle_op)] 533 534 from_indices.extend(shuffled_other_node_indices) 535 536 # check if the nodes really match up 537 if "parallel_setup" in self.debug: 538 my_node_indices = [eslice.start+i for i in ldis.face_indices()[face_nr]] 539 540 for my_i, other_i in zip(my_node_indices, shuffled_other_node_indices): 541 dist = self.nodes[my_i]-flat_nb_node_coords[other_i] 542 dist[axis] = 0 543 assert la.norm(dist) < 1e-14 544 else: 545 # continue handling of nonperiodic case 546 nb_global_vertices = nb_all_facevertices_global[nb_face_idx] 547 548 nb_face_start = nb_face_starts[nb_face_idx] 549 550 shuffle_op = \ 551 ldis.get_face_index_shuffle_to_match( 552 my_global_vertices, 553 nb_global_vertices) 554 555 shuffled_other_node_indices = [nb_face_start+i 556 for i in get_shuffled_indices( 557 face_node_count, shuffle_op)] 558 559 from_indices.extend(shuffled_other_node_indices) 560 561 # check if the nodes really match up 562 if "parallel_setup" in self.debug: 563 my_node_indices = [eslice.start+i 564 for i in ldis.face_indices()[face_nr]] 565 566 for my_i, other_i in zip(my_node_indices, shuffled_other_node_indices): 567 dist = self.nodes[my_i]-flat_nb_node_coords[other_i] 568 assert la.norm(dist) < 1e-14 569 570 # finally, unify FluxFace.h values across boundary 571 nb_h = nb_h_values[nb_face_idx] 572 flux_face = rank_discr_boundary.find_facepair_side((el, face_nr)) 573 flux_face.h = max(nb_h, flux_face.h) 574 575 if "parallel_setup" in self.debug: 576 assert len(from_indices) == len(flat_nb_node_coords) 577 578 # construct from_neighbor_map 579 self.from_neighbor_maps[rank] = \ 580 self.subdiscr.prepare_from_neighbor_map(from_indices)
581 582 # norm and integral -------------------------------------------------------
583 - def nodewise_dot_product(self, a, b):
584 from boostmpi import all_reduce 585 from operator import add 586 587 return all_reduce(self.context.communicator, 588 self.subdiscr.nodewise_inner_product(a, b), 589 add)
590
591 - def norm(self, volume_vector, p=2):
592 def add_norms(x, y): 593 return (x**p + y**p)**(1/p)
594 595 return mpi.all_reduce(self.context.communicator, 596 self.subdiscr.norm(volume_vector, p), 597 add_norms) 598
599 - def integral(self, volume_vector):
600 from operator import add 601 return mpi.all_reduce(self.context.communicator, 602 self.subdiscr.integral(volume_vector), 603 add)
604 605 # dt estimation -----------------------------------------------------------
606 - def dt_non_geometric_factor(self):
607 return mpi.all_reduce(self.context.communicator, 608 self.subdiscr.dt_non_geometric_factor(), 609 min)
610
611 - def dt_geometric_factor(self):
612 return mpi.all_reduce(self.context.communicator, 613 self.subdiscr.dt_geometric_factor(), 614 min)
615
616 - def dt_factor(self, max_system_ev):
617 return 1/max_system_ev \ 618 * self.dt_non_geometric_factor() \ 619 * self.dt_geometric_factor()
620 621 # compilation -------------------------------------------------------------
622 - def compile(self, optemplate, post_bind_mapper=lambda x:x ):
623 fci = FluxCommunicationInserter(self.neighbor_ranks) 624 return self.subdiscr.compile( 625 optemplate, 626 post_bind_mapper=lambda x: fci(post_bind_mapper(x)))
627
628 629 630 631 632 -def reassemble_volume_field(rcon, global_discr, local_discr, field):
633 from pytools import reverse_dictionary 634 local2global_element = reverse_dictionary( 635 local_discr.global2local_elements) 636 637 send_packet = {} 638 for eg in local_discr.element_groups: 639 for el, eslice in zip(eg.members, eg.ranges): 640 send_packet[local2global_element[el.id]] = field[eslice] 641 642 def reduction(a, b): 643 a.update(b) 644 return a
645 646 gfield_parts = mpi.reduce( 647 rcon.communicator, send_packet, reduction, rcon.head_rank) 648 649 if rcon.is_head_rank: 650 result = global_discr.volume_zeros() 651 for eg in global_discr.element_groups: 652 for el, eslice in zip(eg.members, eg.ranges): 653 my_part = gfield_parts[el.id] 654 assert len(my_part) == eslice.stop-eslice.start 655 result[eslice] = my_part 656 return result 657 else: 658 return None 659