1 """MPI-based distributed-memory parallelism support"""
2
3 from __future__ import division
4
5 __copyright__ = "Copyright (C) 2007 Andreas Kloeckner"
6
7 __license__ = """
8 This program is free software: you can redistribute it and/or modify
9 it under the terms of the GNU General Public License as published by
10 the Free Software Foundation, either version 3 of the License, or
11 (at your option) any later version.
12
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
17
18 You should have received a copy of the GNU General Public License
19 along with this program. If not, see U{http://www.gnu.org/licenses/}.
20 """
21
22
23
24
25 import pytools
26 import numpy
27 import numpy.linalg as la
28 import hedge.discretization
29 import hedge.mesh
30 from hedge.optemplate import \
31 IdentityMapper, \
32 FluxOpReducerMixin
33 from hedge.tools import Future
34 from hedge.backends import RunContext
35 import boostmpi as mpi
36
37
38
39
40 -class RankData(pytools.Record):
41 - def __init__(self,
42 mesh,
43 global2local_elements,
44 global2local_vertex_indices,
45 neighbor_ranks,
46 global_periodic_opposite_faces,
47 old_el_numbers=None
48 ):
49 pytools.Record.__init__(self, locals())
50
58
59
60
61
62 -class MPIRunContext(RunContext):
63 - def __init__(self, communicator, discr_class):
64 self.communicator = communicator
65 self.discr_class = discr_class
66
67 @property
69 return self.communicator.rank
70
71 @property
73 return range(0, self.communicator.size)
74
75 @property
76 - def head_rank(self):
78
79 - def distribute_mesh(self, mesh, partition=None):
80 assert self.is_head_rank
81
82 if partition is None:
83 partition = len(self.ranks)
84
85
86 if isinstance(partition, int):
87 from pymetis import part_graph
88 dummy, partition = part_graph(partition,
89 mesh.element_adjacency_graph())
90
91 from hedge.partition import partition_mesh
92 from hedge.mesh import TAG_RANK_BOUNDARY
93 for part_data in partition_mesh(
94 mesh, partition, part_bdry_tag_factory=TAG_RANK_BOUNDARY):
95
96 rank_data = RankData(
97 mesh=part_data.mesh,
98 global2local_elements=part_data.global2local_elements,
99 global2local_vertex_indices=part_data.global2local_vertex_indices,
100 neighbor_ranks=part_data.neighbor_parts,
101 global_periodic_opposite_faces=part_data.global_periodic_opposite_faces)
102
103 rank = part_data.part_nr
104
105 if rank == self.head_rank:
106 result = rank_data
107 else:
108 print "send rank", rank
109 self.communicator.send(rank, 0, rank_data)
110 print "end send", rank
111
112 return result
113
114 - def receive_mesh(self):
115 return self.communicator.recv(self.head_rank, 0)
116 print "receive end rank", self.rank
117
118 - def make_discretization(self, mesh_data, *args, **kwargs):
119 return ParallelDiscretization(self, self.discr_class, mesh_data, *args, **kwargs)
120
135 - def __init__(self, pdiscr, rank, field):
144
149
155 self.request = request
156 self.result = None
157
159 if self.request is not None:
160 status = self.request.test()
161 if status is not None:
162 self.result = self.finish(status)
163 self.request = None
164 return True
165
166 return False
167
169 if self.request is not None:
170 status = self.request.wait()
171 return self.finish(status)
172 else:
173 return self.result
174
179 - def __init__(self, comm, rank, send_vec, pdiscr):
180 self.send_vec = send_vec
181
182 assert send_vec.dtype == pdiscr.default_scalar_type
183
184 from boostmpi import isend_buffer
185 MPICompletionFuture.__init__(self,
186 isend_buffer(comm, rank, tag=1,
187 vector=send_vec))
188
191
196 - def __init__(self, pdiscr, shape, rank, indices_and_names):
210
215
220 - def __init__(self, pdiscr, rank, indices_and_names, recv_vec):
235
237 converted_vec = self.convert_future()
238 return [(name, converted_vec[idx])
239 for idx, name in self.indices_and_names], []
240
245 class ExecutionMapper(superclass):
246 def __init__(self, context, executor):
247 superclass.__init__(self, context, executor)
248 self.discr = executor.discr
249
250 def exec_flux_exchange_batch_assign(self, insn):
251 pdiscr = self.discr.parallel_discr
252
253 from hedge.tools import log_shape
254
255 field = self.rec(insn.field)
256 shape = log_shape(field)
257
258 if self.discr.instrumented:
259 if len(shape):
260 pdiscr.comm_flux_counter.add(len(pdiscr.neighbor_ranks)*shape[0])
261 else:
262 pdiscr.comm_flux_counter.add(len(pdiscr.neighbor_ranks))
263
264 return ([],
265 [BoundarizeSendFuture(pdiscr, rank, field)
266 for rank in pdiscr.neighbor_ranks]
267 + [ReceiveCompletionFuture(pdiscr, shape, rank,
268 insn.rank_to_index_and_name[rank])
269 for rank in pdiscr.neighbor_ranks])
270
271 return ExecutionMapper
272
280 self.interacting_ranks = interacting_ranks
281
283 from hedge.optemplate import \
284 FluxOperatorBase, \
285 BoundaryPair, OperatorBinding, \
286 FluxExchangeOperator
287
288 if isinstance(expr, OperatorBinding):
289 if isinstance(expr.op, FluxOperatorBase):
290 if isinstance(expr.field, BoundaryPair):
291
292 return IdentityMapper.map_operator_binding(self, expr)
293
294
295 from pymbolic.primitives import \
296 flattened_sum, \
297 CommonSubexpression
298
299 def func_and_cse(func, arg_fields):
300 from hedge.tools import is_obj_array, make_obj_array
301 if is_obj_array(arg_fields):
302 return make_obj_array([
303 CommonSubexpression(
304 OperatorBinding(
305 func(i),
306 arg_fields))
307 for i in range(len(arg_fields))])
308 else:
309 return CommonSubexpression(
310 OperatorBinding(
311 func(()),
312 arg_fields))
313
314 from hedge.mesh import TAG_RANK_BOUNDARY
315
316 def exchange_and_cse(rank):
317 return func_and_cse(
318 lambda i: FluxExchangeOperator(i, rank),
319 expr.field)
320
321 return flattened_sum([expr]
322 + [OperatorBinding(expr.op, BoundaryPair(
323 expr.field,
324 exchange_and_cse(rank),
325 TAG_RANK_BOUNDARY(rank)))
326 for rank in self.interacting_ranks])
327 else:
328 return IdentityMapper.map_operator_binding(self, expr)
329
334 @classmethod
336 return set([
337 "parallel_setup",
338 ])
339
340 @classmethod
343
344 - def __init__(self, rcon, subdiscr_class, rank_data, *args, **kwargs):
345 debug = set(kwargs.pop("debug", set()))
346 self.debug = self.my_debug_flags() & debug
347 kwargs["debug"] = debug - self.debug
348 kwargs["run_context"] = rcon
349
350 self.subdiscr = subdiscr_class(rank_data.mesh, *args, **kwargs)
351 self.subdiscr.exec_mapper_class = make_custom_exec_mapper_class(
352 self.subdiscr.exec_mapper_class)
353 self.subdiscr.parallel_discr = self
354
355 self.received_bdrys = {}
356 self.context = rcon
357
358 self.global2local_vertex_indices = rank_data.global2local_vertex_indices
359 self.neighbor_ranks = rank_data.neighbor_ranks
360 self.global_periodic_opposite_faces = rank_data.global_periodic_opposite_faces
361
362 if rank_data.old_el_numbers is not None:
363 from hedge.tools import reverse_lookup_table
364 new_el_numbers = reverse_lookup_table(rank_data.old_el_numbers)
365 self.global2local_elements = dict(
366 (gi, new_el_numbers[li])
367 for gi, li in rank_data.global2local_elements.iteritems())
368 else:
369 self.global2local_elements = rank_data.global2local_elements
370
371 self._setup_neighbor_connections()
372
374 self.subdiscr.add_instrumentation(mgr)
375
376 from pytools.log import EventCounter
377 self.comm_flux_counter = EventCounter("n_comm_flux",
378 "Number of inner flux communication runs")
379
380 mgr.add_quantity(self.comm_flux_counter)
381
382
384 return len(self.subdiscr)
385
387 if not name.startswith("_"):
388 return getattr(self.subdiscr, name)
389 else:
390 raise AttributeError(name)
391
392
394 comm = self.context.communicator
395
396
397
398
399
400
401
402
403
404
405
406 comm.barrier()
407
408 if self.neighbor_ranks:
409
410 from pytools import reverse_dictionary
411 local2global_vertex_indices = \
412 reverse_dictionary(self.global2local_vertex_indices)
413
414 send_requests = mpi.RequestList()
415
416 for rank in self.neighbor_ranks:
417 bdry_tag = hedge.mesh.TAG_RANK_BOUNDARY(rank)
418 rank_bdry = self.subdiscr.mesh.tag_to_boundary[bdry_tag]
419 rank_discr_boundary = self.subdiscr.get_boundary(bdry_tag)
420
421
422 my_vertices_global = [
423 tuple(local2global_vertex_indices[vi]
424 for vi in el.faces[face_nr])
425 for el, face_nr in rank_bdry]
426
427
428
429
430
431 my_node_coords = []
432 for el, face_nr in rank_bdry:
433 eslice, ldis = self.subdiscr.find_el_data(el.id)
434 findices = ldis.face_indices()[face_nr]
435
436 my_node_coords.append(
437 [self.nodes[eslice.start+i] for i in findices])
438
439
440
441
442 my_h_values = [rank_discr_boundary.find_facepair_side(el_face).h
443 for el_face in rank_bdry]
444
445 packet = (my_vertices_global, my_node_coords, my_h_values)
446
447 send_requests.append(comm.isend(rank, 0, packet))
448
449 received_packets = {}
450 while len(received_packets) < len(self.neighbor_ranks):
451 received_packet, status = comm.recv(tag=0, return_status=True)
452 received_packets[status.source] = received_packet
453
454 mpi.wait_all(send_requests)
455
456
457 from pytools import flatten
458
459
460
461 self.from_neighbor_maps = {}
462
463 for rank, (nb_all_facevertices_global, nb_node_coords, nb_h_values) in \
464 received_packets.iteritems():
465 bdry_tag = hedge.mesh.TAG_RANK_BOUNDARY(rank)
466 rank_bdry = self.subdiscr.mesh.tag_to_boundary[bdry_tag]
467 rank_discr_boundary = self.subdiscr.get_boundary(bdry_tag)
468
469 flat_nb_node_coords = list(flatten(nb_node_coords))
470
471
472
473 nb_face_starts = [0]
474 for node_coords in nb_node_coords[:-1]:
475 nb_face_starts.append(
476 nb_face_starts[-1]+len(node_coords))
477
478
479 nb_face_order = dict(
480 (frozenset(vertices), i)
481 for i, vertices in enumerate(nb_all_facevertices_global))
482
483
484
485
486 from_indices = []
487
488 shuffled_indices_cache = {}
489
490 def get_shuffled_indices(face_node_count, shuffle_op):
491 try:
492 return shuffled_indices_cache[shuffle_op]
493 except KeyError:
494 unshuffled_indices = range(face_node_count)
495 result = shuffled_indices_cache[shuffle_op] = \
496 shuffle_op(unshuffled_indices)
497 return result
498
499 for el, face_nr in rank_bdry:
500 eslice, ldis = self.subdiscr.find_el_data(el.id)
501
502 my_vertices = el.faces[face_nr]
503 my_global_vertices = tuple(local2global_vertex_indices[vi]
504 for vi in my_vertices)
505
506 face_node_count = ldis.face_node_count()
507 try:
508 nb_face_idx = nb_face_order[frozenset(my_global_vertices)]
509
510 except KeyError:
511
512
513
514 my_vertices_there, axis = self.global_periodic_opposite_faces[
515 my_global_vertices]
516 nb_face_idx = nb_face_order[frozenset(my_vertices_there)]
517
518 his_vertices_here, axis2 = self.global_periodic_opposite_faces[
519 nb_all_facevertices_global[nb_face_idx]]
520
521 assert axis == axis2
522
523 nb_face_start = nb_face_starts[nb_face_idx]
524
525 shuffle_op = \
526 ldis.get_face_index_shuffle_to_match(
527 my_global_vertices,
528 his_vertices_here)
529
530 shuffled_other_node_indices = [nb_face_start+i
531 for i in get_shuffled_indices(
532 face_node_count, shuffle_op)]
533
534 from_indices.extend(shuffled_other_node_indices)
535
536
537 if "parallel_setup" in self.debug:
538 my_node_indices = [eslice.start+i for i in ldis.face_indices()[face_nr]]
539
540 for my_i, other_i in zip(my_node_indices, shuffled_other_node_indices):
541 dist = self.nodes[my_i]-flat_nb_node_coords[other_i]
542 dist[axis] = 0
543 assert la.norm(dist) < 1e-14
544 else:
545
546 nb_global_vertices = nb_all_facevertices_global[nb_face_idx]
547
548 nb_face_start = nb_face_starts[nb_face_idx]
549
550 shuffle_op = \
551 ldis.get_face_index_shuffle_to_match(
552 my_global_vertices,
553 nb_global_vertices)
554
555 shuffled_other_node_indices = [nb_face_start+i
556 for i in get_shuffled_indices(
557 face_node_count, shuffle_op)]
558
559 from_indices.extend(shuffled_other_node_indices)
560
561
562 if "parallel_setup" in self.debug:
563 my_node_indices = [eslice.start+i
564 for i in ldis.face_indices()[face_nr]]
565
566 for my_i, other_i in zip(my_node_indices, shuffled_other_node_indices):
567 dist = self.nodes[my_i]-flat_nb_node_coords[other_i]
568 assert la.norm(dist) < 1e-14
569
570
571 nb_h = nb_h_values[nb_face_idx]
572 flux_face = rank_discr_boundary.find_facepair_side((el, face_nr))
573 flux_face.h = max(nb_h, flux_face.h)
574
575 if "parallel_setup" in self.debug:
576 assert len(from_indices) == len(flat_nb_node_coords)
577
578
579 self.from_neighbor_maps[rank] = \
580 self.subdiscr.prepare_from_neighbor_map(from_indices)
581
582
584 from boostmpi import all_reduce
585 from operator import add
586
587 return all_reduce(self.context.communicator,
588 self.subdiscr.nodewise_inner_product(a, b),
589 add)
590
591 - def norm(self, volume_vector, p=2):
592 def add_norms(x, y):
593 return (x**p + y**p)**(1/p)
594
595 return mpi.all_reduce(self.context.communicator,
596 self.subdiscr.norm(volume_vector, p),
597 add_norms)
598
600 from operator import add
601 return mpi.all_reduce(self.context.communicator,
602 self.subdiscr.integral(volume_vector),
603 add)
604
605
610
615
620
621
622 - def compile(self, optemplate, post_bind_mapper=lambda x:x ):
627
633 from pytools import reverse_dictionary
634 local2global_element = reverse_dictionary(
635 local_discr.global2local_elements)
636
637 send_packet = {}
638 for eg in local_discr.element_groups:
639 for el, eslice in zip(eg.members, eg.ranges):
640 send_packet[local2global_element[el.id]] = field[eslice]
641
642 def reduction(a, b):
643 a.update(b)
644 return a
645
646 gfield_parts = mpi.reduce(
647 rcon.communicator, send_packet, reduction, rcon.head_rank)
648
649 if rcon.is_head_rank:
650 result = global_discr.volume_zeros()
651 for eg in global_discr.element_groups:
652 for el, eslice in zip(eg.members, eg.ranges):
653 my_part = gfield_parts[el.id]
654 assert len(my_part) == eslice.stop-eslice.start
655 result[eslice] = my_part
656 return result
657 else:
658 return None
659