Epetra Package Browser (Single Doxygen Collection)  Development
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
Epetra_MpiDistributor.cpp
Go to the documentation of this file.
1 //@HEADER
2 // ************************************************************************
3 //
4 // Epetra: Linear Algebra Services Package
5 // Copyright 2011 Sandia Corporation
6 //
7 // Under the terms of Contract DE-AC04-94AL85000 with Sandia Corporation,
8 // the U.S. Government retains certain rights in this software.
9 //
10 // Redistribution and use in source and binary forms, with or without
11 // modification, are permitted provided that the following conditions are
12 // met:
13 //
14 // 1. Redistributions of source code must retain the above copyright
15 // notice, this list of conditions and the following disclaimer.
16 //
17 // 2. Redistributions in binary form must reproduce the above copyright
18 // notice, this list of conditions and the following disclaimer in the
19 // documentation and/or other materials provided with the distribution.
20 //
21 // 3. Neither the name of the Corporation nor the names of the
22 // contributors may be used to endorse or promote products derived from
23 // this software without specific prior written permission.
24 //
25 // THIS SOFTWARE IS PROVIDED BY SANDIA CORPORATION "AS IS" AND ANY
26 // EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
27 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
28 // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SANDIA CORPORATION OR THE
29 // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
30 // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
31 // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
32 // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
33 // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
34 // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
35 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
36 //
37 // Questions? Contact Michael A. Heroux (maherou@sandia.gov)
38 //
39 // ************************************************************************
40 //@HEADER
41 
42 #include "Epetra_MpiDistributor.h"
43 #include "Epetra_MpiComm.h"
44 
45 #include <stdexcept>
46 #include <vector>
47 
48 //==============================================================================
50  Epetra_Object("Epetra::MpiDistributor"),
51  lengths_to_(0),
52  procs_to_(0),
53  indices_to_(0),
54  size_indices_to_(0),
55  lengths_from_(0),
56  procs_from_(0),
57  indices_from_(0),
58  size_indices_from_(0),
59  resized_(false),
60  sizes_(0),
61  sizes_to_(0),
62  starts_to_(0),
63  starts_to_ptr_(0),
64  indices_to_ptr_(0),
65  sizes_from_(0),
66  starts_from_(0),
67  starts_from_ptr_(0),
68  indices_from_ptr_(0),
69  nrecvs_(0),
70  nsends_(0),
71  nexports_(0),
72  self_msg_(0),
73  max_send_length_(0),
74  total_recv_length_(0),
75  tag_(Comm.GetMpiTag()),
76  epComm_(&Comm),
77  comm_(Comm.GetMpiComm()),
78  request_(0),
79  status_(0),
80  no_delete_(false),
81  send_array_(0),
82  send_array_size_(0),
83  comm_plan_reverse_(0)
84 {
85 }
86 
87 //==============================================================================
89  Epetra_Object("Epetra::MpiDistributor"),
90  lengths_to_(0),
91  procs_to_(0),
92  indices_to_(0),
93  size_indices_to_(Distributor.size_indices_to_),
94  lengths_from_(0),
95  procs_from_(0),
96  indices_from_(0),
97  size_indices_from_(Distributor.size_indices_from_),
98  resized_(false),
99  sizes_(0),
100  sizes_to_(0),
101  starts_to_(0),
102  starts_to_ptr_(0),
103  indices_to_ptr_(0),
104  sizes_from_(0),
105  starts_from_(0),
106  starts_from_ptr_(0),
107  indices_from_ptr_(0),
108  nrecvs_(Distributor.nrecvs_),
109  nsends_(Distributor.nsends_),
110  nexports_(Distributor.nexports_),
111  self_msg_(Distributor.self_msg_),
112  max_send_length_(Distributor.max_send_length_),
113  total_recv_length_(Distributor.total_recv_length_),
114  tag_(Distributor.tag_),
115  epComm_(Distributor.epComm_),
116  comm_(Distributor.comm_),
117  request_(0),
118  status_(0),
119  no_delete_(0),// NOTE: Stuff is being copied... you can always delete it
120  send_array_(0),
121  send_array_size_(0),
122  comm_plan_reverse_(0)
123 {
124  int i;
125  if (nsends_>0) {
126  lengths_to_ = new int[nsends_];
127  procs_to_ = new int[nsends_];
128  starts_to_ = new int[nsends_];
129  for (i=0; i<nsends_; i++) {
130  lengths_to_[i] = Distributor.lengths_to_[i];
131  procs_to_[i] = Distributor.procs_to_[i];
132  starts_to_[i] = Distributor.starts_to_[i];
133  }
134  }
135  if (size_indices_to_>0) {
136  indices_to_ = new int[size_indices_to_];
137  for (i=0; i<size_indices_to_; i++) {
138  indices_to_[i] = Distributor.indices_to_[i];
139  }
140  indices_to_ptr_ = new int[nexports_];
141  for (i=0; i<nexports_; i++) {
142  indices_to_ptr_[i] = Distributor.indices_to_ptr_[i];
143  }
144  }
145 
146  if( nsends_+self_msg_ > 0 ) {
147  if(Distributor.sizes_to_) sizes_to_ = new int[nsends_+self_msg_];
148  if(Distributor.starts_to_ptr_) starts_to_ptr_ = new int[nsends_+self_msg_];
149  for(i=0;i<nsends_+self_msg_; i++) {
150  if(Distributor.sizes_to_) sizes_to_[i] = Distributor.sizes_to_[i];
151  if(Distributor.starts_to_ptr_) starts_to_ptr_[i] = Distributor.starts_to_ptr_[i];
152  }
153  }
154 
155  if (nrecvs_>0) {
156  lengths_from_ = new int[nrecvs_];
157  procs_from_ = new int[nrecvs_];
158  starts_from_ = new int[nrecvs_];
159  request_ = new MPI_Request[ nrecvs_ ];
160  status_ = new MPI_Status[ nrecvs_ ];
161  for (i=0; i<nrecvs_; i++) {
162  lengths_from_[i] = Distributor.lengths_from_[i];
163  procs_from_[i] = Distributor.procs_from_[i];
164  starts_from_[i] = Distributor.starts_from_[i];
165  }
166  }
167 
168  if (size_indices_from_>0) {
170  for (i=0; i<size_indices_from_; i++) {
171  indices_from_[i] = Distributor.indices_from_[i];
172  }
173  }
174 
175  if( nrecvs_+self_msg_ > 0 ) {
176  if(Distributor.sizes_from_) sizes_from_ = new int [nrecvs_+self_msg_];
177  if(Distributor.starts_from_ptr_) starts_from_ptr_ = new int [nrecvs_+self_msg_];
178  for(i=0;i<nrecvs_+self_msg_; i++) {
179  if(Distributor.sizes_from_) sizes_from_[i] = Distributor.sizes_from_[i];
180  if(Distributor.starts_from_ptr_) starts_from_ptr_[i] = Distributor.starts_from_ptr_[i];
181  }
182  }
183 
184  // Note: indices_from_ptr_ is always length zero...
185 
186 }
187 
188 //==============================================================================
190 {
191  if( !no_delete_ )
192  {
193  if( lengths_to_ != 0 ) delete [] lengths_to_;
194  if( procs_to_ != 0 ) delete [] procs_to_;
195  if( indices_to_ != 0 ) delete [] indices_to_;
196  if( lengths_from_ != 0 ) delete [] lengths_from_;
197  if( procs_from_ != 0 ) delete [] procs_from_;
198  if( indices_from_ != 0 ) delete [] indices_from_;
199  if( starts_to_ != 0 ) delete [] starts_to_;
200  if( starts_from_ != 0 ) delete [] starts_from_;
201  }
202 
203  if( sizes_ != 0 ) delete [] sizes_;
204  if( sizes_to_ != 0 ) delete [] sizes_to_;
205  if( sizes_from_ != 0 ) delete [] sizes_from_;
206  if( starts_to_ptr_ != 0 ) delete [] starts_to_ptr_;
207  if( starts_from_ptr_ != 0 ) delete [] starts_from_ptr_;
208  if( indices_to_ptr_ != 0 ) delete [] indices_to_ptr_;
209  if( indices_from_ptr_ != 0 ) delete [] indices_from_ptr_;
210 
211  if( request_ != 0 ) delete [] request_;
212  if( status_ != 0 ) delete [] status_;
213 
214  if( send_array_size_ != 0 ) { delete [] send_array_; send_array_size_ = 0; }
215 
216  if( comm_plan_reverse_ != 0 ) delete comm_plan_reverse_;
217 }
218 
219 
220 //==============================================================================
221 int Epetra_MpiDistributor::CreateFromSends( const int & NumExportIDs,
222  const int * ExportPIDs,
223  bool Deterministic,
224  int & NumRemoteIDs )
225 {
226  (void)Deterministic; // Prevent compiler warnings for unused argument.
227  int my_proc;
228  MPI_Comm_rank( comm_, &my_proc );
229 
230  int nprocs;
231  MPI_Comm_size( comm_, &nprocs );
232 
233  // Do the forward map component
234  CreateSendStructures_(my_proc,nprocs,NumExportIDs,ExportPIDs);
235 
236  //Invert map to see what msgs are received and what length
237  EPETRA_CHK_ERR( ComputeRecvs_( my_proc, nprocs ) );
238 
239  if (nrecvs_>0) {
240  if( !request_ ) {
241  request_ = new MPI_Request[ nrecvs_ ];
242  status_ = new MPI_Status[ nrecvs_ ];
243  }
244  }
245 
246  NumRemoteIDs = total_recv_length_;
247 
248  return 0;
249 }
250 
251 //==============================================================================
252 int Epetra_MpiDistributor::CreateFromRecvs( const int & NumRemoteIDs,
253  const int * RemoteGIDs,
254  const int * RemotePIDs,
255  bool Deterministic,
256  int & NumExportIDs,
257  int *& ExportGIDs,
258  int *& ExportPIDs )
259 {
260  int my_proc;
261  MPI_Comm_rank( comm_, &my_proc );
262 
263  int nprocs;
264  MPI_Comm_size( comm_, &nprocs );
265 
266  EPETRA_CHK_ERR( ComputeSends_( NumRemoteIDs, RemoteGIDs, RemotePIDs, NumExportIDs,
267  ExportGIDs, ExportPIDs, my_proc) );
268 
269  int testNumRemoteIDs;
270  EPETRA_CHK_ERR( CreateFromSends( NumExportIDs, ExportPIDs,
271  Deterministic, testNumRemoteIDs ) );
272 
273  return(0);
274 }
275 
276 //==============================================================================
277 //---------------------------------------------------------------------------
278 //CreateFromRecvs Method
279 // - create communication plan given a known list of procs to recv from
280 //---------------------------------------------------------------------------
281 #ifndef EPETRA_NO_64BIT_GLOBAL_INDICES
282 int Epetra_MpiDistributor::CreateFromRecvs( const int & NumRemoteIDs,
283  const long long * RemoteGIDs,
284  const int * RemotePIDs,
285  bool Deterministic,
286  int & NumExportIDs,
287  long long *& ExportGIDs,
288  int *& ExportPIDs )
289 {
290  int my_proc;
291  MPI_Comm_rank( comm_, &my_proc );
292 
293  int nprocs;
294  MPI_Comm_size( comm_, &nprocs );
295 
296  EPETRA_CHK_ERR( ComputeSends_( NumRemoteIDs, RemoteGIDs, RemotePIDs, NumExportIDs,
297  ExportGIDs, ExportPIDs, my_proc) );
298 
299  int testNumRemoteIDs;
300  EPETRA_CHK_ERR( CreateFromSends( NumExportIDs, ExportPIDs,
301  Deterministic, testNumRemoteIDs ) );
302 
303  return(0);
304 }
305 #endif
306 
307 
308 
309 //==============================================================================
310 //---------------------------------------------------------------------------
311 //CreateFromSendsAndRecvs Method
312 //---------------------------------------------------------------------------
314  const int * ExportPIDs,
315  const int & NumRemoteIDs,
316  const int * RemoteGIDs,
317  const int * RemotePIDs,
318  bool Deterministic)
319 {
320  (void)RemoteGIDs;
321  (void)Deterministic; // Prevent compiler warnings for unused argument.
322  nexports_ = NumExportIDs;
323 
324  int my_proc;
325  MPI_Comm_rank( comm_, &my_proc );
326  int nprocs;
327  MPI_Comm_size( comm_, &nprocs );
328 
329  // Do the forward map component
330  CreateSendStructures_(my_proc,nprocs,NumExportIDs,ExportPIDs);
331 
332  // Do the reverse map component
333  CreateRecvStructures_(NumRemoteIDs,RemotePIDs);
334 
335  return 0;
336 }
337 #ifndef EPETRA_NO_64BIT_GLOBAL_INDICES
339  const int * ExportPIDs,
340  const int & NumRemoteIDs,
341  const long long * RemoteGIDs,
342  const int * RemotePIDs,
343  bool Deterministic)
344 {
345  (void)RemoteGIDs;
346  (void)Deterministic; // Prevent compiler warnings for unused argument.
347  nexports_ = NumExportIDs;
348 
349  int my_proc;
350  MPI_Comm_rank( comm_, &my_proc );
351  int nprocs;
352  MPI_Comm_size( comm_, &nprocs );
353 
354  // Do the forward map component
355  CreateSendStructures_(my_proc,nprocs,NumExportIDs,ExportPIDs);
356 
357  // Do the reverse map component
358  CreateRecvStructures_(NumRemoteIDs,RemotePIDs);
359 
360  return 0;
361 
362 
363 }
364 #endif
365 
366 
367 
368 //==============================================================================
370  int nprocs,
371  const int & NumExportIDs,
372  const int * ExportPIDs)
373 {
374  nexports_ = NumExportIDs;
375 
376  int i;
377 
378  // Check to see if items are grouped by processor w/o gaps
379  // If so, indices_to -> 0
380 
381  // Setup data structures for quick traversal of arrays
382  int * starts = new int[ nprocs + 1 ];
383  for( i = 0; i < nprocs; i++ )
384  starts[i] = 0;
385 
386  int nactive = 0;
387  bool no_send_buff = true;
388  int numDeadIndices = 0; // In some cases the GIDs will not be owned by any processors and the PID will be -1
389 
390  for( i = 0; i < NumExportIDs; i++ )
391  {
392  if( no_send_buff && i && (ExportPIDs[i] < ExportPIDs[i-1]) )
393  no_send_buff = false;
394  if( ExportPIDs[i] >= 0 )
395  {
396  ++starts[ ExportPIDs[i] ];
397  ++nactive;
398  }
399  else numDeadIndices++; // Increase the number of dead indices. Used below to leave these out of the analysis
400  }
401 
402  self_msg_ = ( starts[my_proc] != 0 ) ? 1 : 0;
403 
404  nsends_ = 0;
405 
406  if( no_send_buff ) //grouped by processor, no send buffer or indices_to_ needed
407  {
408  for( i = 0; i < nprocs; ++i )
409  if( starts[i] ) ++nsends_;
410 
411  if( nsends_ )
412  {
413  procs_to_ = new int[nsends_];
414  starts_to_ = new int[nsends_];
415  lengths_to_ = new int[nsends_];
416  }
417 
418  int index = numDeadIndices; // Leave off the dead indices (PID = -1)
419  int proc;
420  for( i = 0; i < nsends_; ++i )
421  {
422  starts_to_[i] = index;
423  proc = ExportPIDs[index];
424  procs_to_[i] = proc;
425  index += starts[proc];
426  }
427 
428  if( nsends_ )
429  Sort_ints_( procs_to_, starts_to_, nsends_ );
430 
431  max_send_length_ = 0;
432 
433  for( i = 0; i < nsends_; ++i )
434  {
435  proc = procs_to_[i];
436  lengths_to_[i] = starts[proc];
437  if( (proc != my_proc) && (lengths_to_[i] > max_send_length_) )
439  }
440  }
441  else //not grouped by processor, need send buffer and indices_to_
442  {
443  if( starts[0] != 0 ) nsends_ = 1;
444 
445  for( i = 1; i < nprocs; i++ )
446  {
447  if( starts[i] != 0 ) ++nsends_;
448  starts[i] += starts[i-1];
449  }
450 
451  for( i = nprocs-1; i != 0; i-- )
452  starts[i] = starts[i-1];
453 
454  starts[0] = 0;
455 
456  if (nactive>0) {
457  indices_to_ = new int[ nactive ];
458  size_indices_to_ = nactive;
459  }
460 
461  for( i = 0; i < NumExportIDs; i++ )
462  if( ExportPIDs[i] >= 0 )
463  {
464  indices_to_[ starts[ ExportPIDs[i] ] ] = i;
465  ++starts[ ExportPIDs[i] ];
466  }
467 
468  //Reconstuct starts array to index into indices_to.
469 
470  for( i = nprocs-1; i != 0; i-- )
471  starts[i] = starts[i-1];
472  starts[0] = 0;
473  starts[nprocs] = nactive;
474 
475  if (nsends_>0) {
476  lengths_to_ = new int[ nsends_ ];
477  procs_to_ = new int[ nsends_ ];
478  starts_to_ = new int[ nsends_ ];
479  }
480 
481  int j = 0;
482  max_send_length_ = 0;
483 
484  for( i = 0; i < nprocs; i++ )
485  if( starts[i+1] != starts[i] )
486  {
487  lengths_to_[j] = starts[i+1] - starts[i];
488  starts_to_[j] = starts[i];
489  if( ( i != my_proc ) && ( lengths_to_[j] > max_send_length_ ) )
491  procs_to_[j] = i;
492  j++;
493  }
494  }
495 
496  delete [] starts;
497 
498  nsends_ -= self_msg_;
499 
500  return 0;
501 }
502 
503 //==============================================================================
504 int Epetra_MpiDistributor::CreateRecvStructures_(const int & NumRemoteIDs,
505  const int * RemotePIDs)
506 {
507  int i, j;
508 
509  // Since the RemotePIDs should be sorted, counting the total number of recvs should be easy...
510  // use nsends as an initial guess for space.
511  std::vector<int> recv_list;
512  recv_list.reserve(nsends_);
513 
514  int last_pid=-2;
515  for(i=0; i<NumRemoteIDs; i++) {
516  if(RemotePIDs[i]>last_pid) {
517  recv_list.push_back(RemotePIDs[i]);
518  last_pid = RemotePIDs[i];
519  }
520  else if (RemotePIDs[i]<last_pid)
521  throw std::runtime_error("Epetra_MpiDistributor::CreateRecvStructures_ expected RemotePIDs to be in sorted order");
522  }
523  nrecvs_=recv_list.size();
524 
525  if (nrecvs_>0) {
526  starts_from_ = new int[nrecvs_];
527  procs_from_ = new int[nrecvs_];
528  lengths_from_ = new int[nrecvs_];
529  request_ = new MPI_Request[ nrecvs_ ];
530  status_ = new MPI_Status[ nrecvs_ ];
531  }
532 
533  for(i=0,j=0; i<nrecvs_; ++i) {
534  int jlast=j;
535  procs_from_[i] = recv_list[i];
536  starts_from_[i] = j;
537  for( ; j<NumRemoteIDs && RemotePIDs[jlast]==RemotePIDs[j] ; j++){;}
538  lengths_from_[i]=j-jlast;
539  }
540  total_recv_length_=NumRemoteIDs;
541 
542  nrecvs_ -= self_msg_;
543 
544  return 0;
545 }
546 
547 
548 //==============================================================================
549 //---------------------------------------------------------------------------
550 //ComputeRecvs Method
551 //---------------------------------------------------------------------------
553  int nprocs )
554 {
555  int * msg_count = new int[ nprocs ];
556  int * counts = new int[ nprocs ];
557 
558  int i;
559  MPI_Status status;
560 
561  for( i = 0; i < nprocs; i++ )
562  {
563  msg_count[i] = 0;
564  counts[i] = 1;
565  }
566 
567  for( i = 0; i < nsends_+self_msg_; i++ )
568  msg_count[ procs_to_[i] ] = 1;
569 
570 #if defined(REDUCE_SCATTER_BUG)
571 // the bug is found in mpich on linux platforms
572  MPI_Reduce(msg_count, counts, nprocs, MPI_INT, MPI_SUM, 0, comm_);
573  MPI_Scatter(counts, 1, MPI_INT, &nrecvs_, 1, MPI_INT, 0, comm_);
574 #else
575  MPI_Reduce_scatter( msg_count, &nrecvs_, counts, MPI_INT, MPI_SUM, comm_ );
576 #endif
577 
578  delete [] msg_count;
579  delete [] counts;
580 
581  if (nrecvs_>0) {
582  lengths_from_ = new int[nrecvs_];
583  procs_from_ = new int[nrecvs_];
584  for(i=0; i<nrecvs_; ++i) {
585  lengths_from_[i] = 0;
586  procs_from_[i] = 0;
587  }
588  }
589 
590 #ifndef NEW_COMM_PATTERN
591  for( i = 0; i < (nsends_+self_msg_); i++ )
592  if( procs_to_[i] != my_proc ) {
593  MPI_Send( &(lengths_to_[i]), 1, MPI_INT, procs_to_[i], tag_, comm_ );
594  }
595  else
596  {
597  //set self_msg_ to end block of recv arrays
599  procs_from_[nrecvs_-1] = my_proc;
600  }
601 
602  for( i = 0; i < (nrecvs_-self_msg_); i++ )
603  {
604  MPI_Recv( &(lengths_from_[i]), 1, MPI_INT, MPI_ANY_SOURCE, tag_, comm_, &status );
605  procs_from_[i] = status.MPI_SOURCE;
606  }
607 
608  MPI_Barrier( comm_ );
609 #else
610  if (nrecvs_>0) {
611  if( !request_ ) {
612  request_ = new MPI_Request[nrecvs_-self_msg_];
613  status_ = new MPI_Status[nrecvs_-self_msg_];
614  }
615  }
616 
617  for( i = 0; i < (nrecvs_-self_msg_); i++ )
618  MPI_Irecv( &(lengths_from_[i]), 1, MPI_INT, MPI_ANY_SOURCE, tag_, comm_, &(request_[i]) );
619 
620  MPI_Barrier( comm_ );
621 
622  for( i = 0; i < (nsends_+self_msg_); i++ )
623  if( procs_to_[i] != my_proc ) {
624  MPI_Rsend( &(lengths_to_[i]), 1, MPI_INT, procs_to_[i], tag_, comm_ );
625  }
626  else
627  {
628  //set self_msg_ to end block of recv arrays
630  procs_from_[nrecvs_-1] = my_proc;
631  }
632 
633  if( (nrecvs_-self_msg_) > 0 ) MPI_Waitall( (nrecvs_-self_msg_), request_, status_ );
634 
635  for( i = 0; i < (nrecvs_-self_msg_); i++ )
636  procs_from_[i] = status_[i].MPI_SOURCE;
637 #endif
638 
640 
641  // Compute indices_from_
642  // Seems to break some rvs communication
643 /* Not necessary since rvs communication is always blocked
644  size_indices_from_ = 0;
645  if( nrecvs_ > 0 )
646  {
647  for( i = 0; i < nrecvs_; i++ ) size_indices_from_ += lengths_from_[i];
648  indices_from_ = new int[ size_indices_from_ ];
649 
650  for (i=0; i<size_indices_from_; i++) indices_from_[i] = i;
651  }
652 */
653 
654  if (nrecvs_>0) starts_from_ = new int[nrecvs_];
655  int j = 0;
656  for( i=0; i<nrecvs_; ++i )
657  {
658  starts_from_[i] = j;
659  j += lengths_from_[i];
660  }
661 
662  total_recv_length_ = 0;
663  for( i = 0; i < nrecvs_; i++ )
665 
666  nrecvs_ -= self_msg_;
667 
668  MPI_Barrier( comm_ );
669 
670  return false;
671 }
672 
673 //==============================================================================
674 //---------------------------------------------------------------------------
675 //ComputeSends Method
676 //---------------------------------------------------------------------------
677 template<typename id_type>
679  const id_type *& import_ids,
680  const int *& import_procs,
681  int & num_exports,
682  id_type *& export_ids,
683  int *& export_procs,
684  int my_proc ) {
685 
686  Epetra_MpiDistributor tmp_plan(*epComm_);
687  int i;
688 
689  int * proc_list = 0;
690  int * import_objs = 0;
691  char * c_export_objs = 0;
692  const int pack_size = (1 + sizeof(id_type)/sizeof(int));
693 
694  if( num_imports > 0 )
695  {
696  proc_list = new int[ num_imports ];
697  import_objs = new int[ num_imports * pack_size];
698 
699  for( i = 0; i < num_imports; i++ )
700  {
701  proc_list[i] = import_procs[i];
702 
703  *(id_type*)(import_objs + pack_size*i) = import_ids[i];
704  *(import_objs + pack_size*i + (pack_size-1)) = my_proc;
705  }
706  }
707 
708  EPETRA_CHK_ERR(tmp_plan.CreateFromSends( num_imports, proc_list,
709  true, num_exports) );
710  if( num_exports > 0 )
711  {
712  //export_objs = new int[ 2 * num_exports ];
713  export_ids = new id_type[ num_exports ];
714  export_procs = new int[ num_exports ];
715  }
716  else
717  {
718  export_ids = 0;
719  export_procs = 0;
720  }
721 
722  int len_c_export_objs = 0;
723  EPETRA_CHK_ERR( tmp_plan.Do(reinterpret_cast<char *> (import_objs),
724  pack_size * (int)sizeof( int ),
725  len_c_export_objs,
726  c_export_objs) );
727  int * export_objs = reinterpret_cast<int *>(c_export_objs);
728 
729  for( i = 0; i < num_exports; i++ ) {
730  export_ids[i] = *(id_type*)(export_objs + pack_size*i);
731  export_procs[i] = *(export_objs + pack_size*i + (pack_size-1));
732  }
733 
734  if( proc_list != 0 ) delete [] proc_list;
735  if( import_objs != 0 ) delete [] import_objs;
736  if( len_c_export_objs != 0 ) delete [] c_export_objs;
737 
738  return(0);
739 
740 }
741 
742 //==============================================================================
743 int Epetra_MpiDistributor::Do( char * export_objs,
744  int obj_size,
745  int & len_import_objs,
746  char *& import_objs )
747 {
748  EPETRA_CHK_ERR( DoPosts(export_objs, obj_size, len_import_objs, import_objs) );
749  EPETRA_CHK_ERR( DoWaits() );
750 
751 
753  for(int i=0; i<NumSends(); i++)
754  lastRoundBytesSend_ += lengths_to_[i]*obj_size;
755  lastRoundBytesRecv_ =len_import_objs;
756 
757  return(0);
758 }
759 
760 //==============================================================================
761 int Epetra_MpiDistributor::DoReverse( char * export_objs,
762  int obj_size,
763  int & len_import_objs,
764  char *& import_objs )
765 {
766  EPETRA_CHK_ERR( DoReversePosts(export_objs, obj_size,
767  len_import_objs, import_objs) );
769 
770  // This is slightly weird, since DoReversePosts() actually calls DoPosts() on
771  // a reversed distributor
773  for(int i=0; i<NumReceives(); i++)
774  lastRoundBytesRecv_ += lengths_from_[i]*obj_size;
775  lastRoundBytesSend_ =len_import_objs;
776 
777  return(0);
778 }
779 
780 //==============================================================================
781 int Epetra_MpiDistributor::DoPosts( char * export_objs,
782  int obj_size,
783  int & len_import_objs,
784  char *& import_objs )
785 {
786  int i, j, k;
787 
788  int my_proc = 0;
789  int self_recv_address = 0;
790 
791  MPI_Comm_rank( comm_, &my_proc );
792 
793  if( len_import_objs < (total_recv_length_*obj_size) )
794  {
795  if( import_objs!=0 ) {delete [] import_objs; import_objs = 0;}
796  len_import_objs = total_recv_length_*obj_size;
797  if (len_import_objs>0) import_objs = new char[len_import_objs];
798  for( i=0; i<len_import_objs; ++i ) import_objs[i]=0;
799  }
800 
801  k = 0;
802 
803  j = 0;
804  for( i = 0; i < (nrecvs_+self_msg_); i++ )
805  {
806  if( procs_from_[i] != my_proc )
807  {
808  MPI_Irecv( &(import_objs[j]),
809  lengths_from_[i] * obj_size,
810  MPI_CHAR, procs_from_[i],
811  tag_, comm_,
812  &(request_[k]) );
813  k++;
814  }
815  else
816  self_recv_address = j;
817 
818  j += lengths_from_[i] * obj_size;
819  }
820 
821 #ifndef EPETRA_NO_READY_SEND_IN_DO_POSTS
822  // NOTE (mfh 19 Mar 2012):
823  //
824  // The ready-sends below require that each ready-send's matching
825  // receive (see above) has already been posted. We ensure this with
826  // a barrier. (Otherwise, some process that doesn't need to post
827  // receives might post its ready-send before the receiving process
828  // gets to post its receive.) If you want to remove the barrier,
829  // you'll have to replace the ready-sends below with standard sends
830  // or Isends.
831  MPI_Barrier( comm_ );
832 #endif // EPETRA_NO_READY_SEND_IN_DO_POSTS
833 
834  //setup scan through procs_to list starting w/ higher numbered procs
835  //Should help balance msg traffic
836  int nblocks = nsends_ + self_msg_;
837  int proc_index = 0;
838  while( proc_index < nblocks && procs_to_[proc_index] < my_proc )
839  ++proc_index;
840  if( proc_index == nblocks ) proc_index = 0;
841 
842  int self_num = 0, self_index = 0;
843  int p;
844 
845  if( !indices_to_ ) //data already blocked by processor
846  {
847  for( i = 0; i < nblocks; ++i )
848  {
849  p = i + proc_index;
850  if( p > (nblocks-1) ) p -= nblocks;
851 
852  if( procs_to_[p] != my_proc ) {
853 
854 #ifndef EPETRA_NO_READY_SEND_IN_DO_POSTS
855  MPI_Rsend( &export_objs[starts_to_[p]*obj_size],
856  lengths_to_[p]*obj_size,
857  MPI_CHAR,
858  procs_to_[p],
859  tag_,
860  comm_ );
861 #else
862  MPI_Send( &export_objs[starts_to_[p]*obj_size],
863  lengths_to_[p]*obj_size,
864  MPI_CHAR,
865  procs_to_[p],
866  tag_,
867  comm_ );
868 #endif // EPETRA_NO_READY_SEND_IN_DO_POSTS
869  }
870  else {
871  self_num = p;
872  }
873  }
874 
875  if( self_msg_ )
876  memcpy( &import_objs[self_recv_address],
877  &export_objs[starts_to_[self_num]*obj_size],
878  lengths_to_[self_num]*obj_size );
879  }
880  else //data not blocked by proc, use send buffer
881  {
882  if( send_array_size_ < (max_send_length_*obj_size) )
883  {
884  if( send_array_!=0 ) {delete [] send_array_; send_array_ = 0;}
886  if (send_array_size_>0) send_array_ = new char[send_array_size_];
887  }
888 
889  j = 0;
890  for( i = 0; i < nblocks; i++ )
891  {
892  p = i + proc_index;
893  if( p > (nblocks-1) ) p -= nblocks;
894  if( procs_to_[p] != my_proc )
895  {
896  int offset = 0;
897  j = starts_to_[p];
898  for( k = 0; k < lengths_to_[p]; k++ )
899  {
900  memcpy( &(send_array_[offset]),
901  &(export_objs[indices_to_[j]*obj_size]),
902  obj_size );
903  ++j;
904  offset += obj_size;
905  }
906 #ifndef EPETRA_NO_READY_SEND_IN_DO_POSTS
907  MPI_Rsend( send_array_,
908  lengths_to_[p] * obj_size,
909  MPI_CHAR,
910  procs_to_[p],
911  tag_, comm_ );
912 #else
913  MPI_Send( send_array_,
914  lengths_to_[p] * obj_size,
915  MPI_CHAR,
916  procs_to_[p],
917  tag_, comm_ );
918 #endif // EPETRA_NO_READY_SEND_IN_DO_POSTS
919  }
920  else
921  {
922  self_num = p;
923  self_index = starts_to_[p];
924  }
925  }
926 
927  if( self_msg_ )
928  for( k = 0; k < lengths_to_[self_num]; k++ )
929  {
930  memcpy( &(import_objs[self_recv_address]),
931  &(export_objs[indices_to_[self_index]*obj_size]),
932  obj_size );
933  self_index++;
934  self_recv_address += obj_size;
935  }
936  }
937  return(0);
938 }
939 
940 //==============================================================================
942 {
943  if( nrecvs_ > 0 ) MPI_Waitall( nrecvs_, request_, status_ );
944 
945  return(0);
946 }
947 
948 //==============================================================================
950  int obj_size,
951  int & len_import_objs,
952  char *& import_objs )
953 {
954  assert(indices_to_==0); //Can only do reverse comm when original data
955  // is blocked by processor
956 
957  // If we don't have a reverse distributor, make one.
958  if( comm_plan_reverse_ == 0 )
960 
961  int comm_flag = comm_plan_reverse_->DoPosts(export_objs, obj_size, len_import_objs, import_objs);
962 
963  return(comm_flag);
964 }
965 
966 //==============================================================================
968 {
969  if( comm_plan_reverse_ == 0 ) return (-1);
970 
971  int comm_flag = comm_plan_reverse_->DoWaits();
972 
973  return(comm_flag);
974 }
975 
976 //==============================================================================
977 //---------------------------------------------------------------------------
978 //Resize Method (Heaphy)
979 //---------------------------------------------------------------------------
981 {
982  int i, j, k; // loop counters
983  int sum;
984 
985  //if (sizes == 0) return 0;
986 
987  int my_proc;
988  MPI_Comm_rank (comm_, &my_proc);
989  int nprocs;
990  MPI_Comm_size( comm_, &nprocs );
991 
992  if( resized_ )
993  {
994  //test and see if we are already setup for these sizes
995  bool match = true;
996  for( i = 0; i < nexports_; ++i )
997  match = match && (sizes_[i]==sizes[i]);
998  int matched = match?1:0;
999  int match_count = 0;
1000  MPI_Allreduce( &matched, &match_count, 1, MPI_INT, MPI_SUM, comm_ );
1001  if( match_count == nprocs )
1002  return 0;
1003  else //reset existing sizing arrays
1004  max_send_length_ = 0;
1005  }
1006 
1007  if( !sizes_ && nexports_ ) sizes_ = new int[nexports_];
1008  for (i = 0; i < nexports_; i++)
1009  sizes_[i] = sizes[i];
1010 
1011  if( !sizes_to_ && (nsends_+self_msg_) ) sizes_to_ = new int[nsends_+self_msg_];
1012  for (i = 0; i < (nsends_+self_msg_); ++i)
1013  sizes_to_[i] = 0;
1014 
1016 
1017  if( !indices_to_ ) //blocked sends
1018  {
1019 
1020  int * index = 0;
1021  int * sort_val = 0;
1022  if (nsends_+self_msg_>0) {
1023  index = new int[nsends_+self_msg_];
1024  sort_val = new int[nsends_+self_msg_];
1025  }
1026  for( i = 0; i < (nsends_+self_msg_); ++i )
1027  {
1028  j = starts_to_[i];
1029  for( k = 0; k < lengths_to_[i]; ++k )
1030  sizes_to_[i] += sizes[j++];
1031  if( (sizes_to_[i] > max_send_length_) && (procs_to_[i] != my_proc) )
1033  }
1034 
1035  for( i = 0; i < (nsends_+self_msg_); ++i )
1036  {
1037  sort_val[i] = starts_to_[i];
1038  index[i] = i;
1039  }
1040 
1041  if( nsends_+self_msg_ )
1042  Sort_ints_( sort_val, index, (nsends_+self_msg_) );
1043 
1044  sum = 0;
1045  for( i = 0; i < (nsends_+self_msg_); ++i )
1046  {
1047  starts_to_ptr_[ index[i] ] = sum;
1048  sum += sizes_to_[ index[i] ];
1049  }
1050 
1051  if (index!=0) {delete [] index; index = 0;}
1052  if (sort_val!=0) {delete [] sort_val; sort_val = 0;}
1053  }
1054  else //Sends not blocked, so have to do more work
1055  {
1056  if( !indices_to_ptr_ && nexports_ ) indices_to_ptr_ = new int[nexports_];
1057  int * offset = 0;
1058  if( nexports_ ) offset = new int[nexports_];
1059 
1060  //Compute address for every item in send array
1061  sum = 0;
1062  for( i = 0; i < nexports_; ++i )
1063  {
1064  offset[i] = sum;
1065  sum += sizes_[i];
1066  }
1067 
1068  sum = 0;
1069  max_send_length_ = 0;
1070  for( i = 0; i < (nsends_+self_msg_); ++i )
1071  {
1072  starts_to_ptr_[i] = sum;
1073  for( j = starts_to_[i]; j < (starts_to_[i]+lengths_to_[i]); ++j )
1074  {
1075  indices_to_ptr_[j] = offset[ indices_to_[j] ];
1076  sizes_to_[i] += sizes_[ indices_to_[j] ];
1077  }
1078  if( sizes_to_[i] > max_send_length_ && procs_to_[i] != my_proc )
1080  sum += sizes_to_[i];
1081  }
1082 
1083  if (offset!=0) {delete [] offset; offset = 0;}
1084  }
1085 
1086  // Exchange sizes routine inserted here:
1087  int self_index_to = -1;
1088  total_recv_length_ = 0;
1089  if( !sizes_from_ && (nrecvs_+self_msg_) ) sizes_from_ = new int [nrecvs_+self_msg_];
1090 
1091 #ifndef EPETRA_NEW_COMM_PATTERN
1092  for (i = 0; i < (nsends_+self_msg_); i++)
1093  {
1094  if(procs_to_[i] != my_proc)
1095  MPI_Send ((void *) &(sizes_to_[i]), 1, MPI_INT, procs_to_[i], tag_, comm_);
1096  else
1097  self_index_to = i;
1098  }
1099 
1100  MPI_Status status;
1101  for (i = 0; i < (nrecvs_+self_msg_); ++i)
1102  {
1103  sizes_from_[i] = 0;
1104  if (procs_from_[i] != my_proc)
1105  MPI_Recv((void *) &(sizes_from_[i]), 1, MPI_INT, procs_from_[i], tag_, comm_, &status);
1106  else
1107  sizes_from_[i] = sizes_to_[self_index_to];
1109  }
1110 #else
1111  if (nrecvs_>0 && !request_) {
1112  request_ = new MPI_Request[ nrecvs_-self_msg_ ];
1113  status_ = new MPI_Status[ nrecvs_-self_msg_ ];
1114  }
1115 
1116  for (i = 0; i < (nsends_+self_msg_); i++)
1117  {
1118  if(procs_to_[i] == my_proc)
1119  self_index_to = i;
1120  }
1121 
1122  for (i = 0; i < (nrecvs_+self_msg_); ++i)
1123  {
1124  sizes_from_[i] = 0;
1125  if (procs_from_[i] != my_proc)
1126  MPI_Irecv((void *) &(sizes_from_[i]), 1, MPI_INT, procs_from_[i], tag_, comm_, &(request_[i]));
1127  else
1128  {
1129  sizes_from_[i] = sizes_to_[self_index_to];
1131  }
1132  }
1133 
1134  MPI_Barrier( comm_ );
1135 
1136  for (i = 0; i < (nsends_+self_msg_); i++)
1137  {
1138  if(procs_to_[i] != my_proc)
1139  MPI_Rsend ((void *) &(sizes_to_[i]), 1, MPI_INT, procs_to_[i], tag_, comm_);
1140  }
1141 
1142  if( nrecvs_ > 0 ) MPI_Waitall( nrecvs_, request_, status_ );
1143 
1144  for (i = 0; i < (nrecvs_+self_msg_); ++i)
1145  {
1146  if (procs_from_[i] != my_proc)
1148  }
1149 #endif
1150  // end of exchanges sizes insert
1151 
1152  sum = 0;
1154  for (i = 0; i < (nrecvs_+self_msg_); ++i)
1155  {
1156  starts_from_ptr_[i] = sum;
1157  sum += sizes_from_[i];
1158  }
1159 
1160  resized_ = true;
1161 
1162  return 0;
1163 }
1164 
1165 //==============================================================================
1166 int Epetra_MpiDistributor::Do( char * export_objs,
1167  int obj_size,
1168  int *& sizes,
1169  int & len_import_objs,
1170  char *& import_objs )
1171 {
1172  EPETRA_CHK_ERR( DoPosts(export_objs, obj_size, sizes,
1173  len_import_objs, import_objs) );
1174  EPETRA_CHK_ERR( DoWaits() );
1175 
1176  return(0);
1177 }
1178 
1179 //==============================================================================
1180 int Epetra_MpiDistributor::DoReverse( char * export_objs,
1181  int obj_size,
1182  int *& sizes,
1183  int & len_import_objs,
1184  char *& import_objs )
1185 {
1186  EPETRA_CHK_ERR( DoReversePosts(export_objs, obj_size, sizes,
1187  len_import_objs, import_objs) );
1189 
1190  return(0);
1191 }
1192 
1193 //==============================================================================
1194 int Epetra_MpiDistributor::DoPosts( char * export_objs,
1195  int obj_size,
1196  int *& sizes,
1197  int & len_import_objs,
1198  char *& import_objs )
1199 {
1200  int ierr = Resize_(sizes);
1201  if (ierr != 0) {
1202  return(ierr);
1203  }
1204 
1205  MPI_Barrier( comm_ );
1206 
1207  int i, j, k;
1208 
1209  int my_proc = 0;
1210  int self_recv_address = 0;
1211 
1212  MPI_Comm_rank( comm_, &my_proc );
1213 
1214  if( len_import_objs < (total_recv_length_*obj_size) )
1215  {
1216  if( import_objs!=0 ) {delete [] import_objs; import_objs = 0;}
1217  len_import_objs = total_recv_length_*obj_size;
1218  if (len_import_objs>0) import_objs = new char[len_import_objs];
1219  }
1220 
1221  k = 0;
1222 
1223  for( i = 0; i < (nrecvs_+self_msg_); ++i )
1224  {
1225  if( procs_from_[i] != my_proc )
1226  {
1227  MPI_Irecv( &(import_objs[starts_from_ptr_[i] * obj_size]),
1228  sizes_from_[i] * obj_size,
1229  MPI_CHAR, procs_from_[i], tag_, comm_, &(request_[k]) );
1230  k++;
1231  }
1232  else
1233  self_recv_address = starts_from_ptr_[i] * obj_size;
1234  }
1235 
1236  MPI_Barrier( comm_ );
1237 
1238  //setup scan through procs_to list starting w/ higher numbered procs
1239  //Should help balance msg traffic
1240  int nblocks = nsends_ + self_msg_;
1241  int proc_index = 0;
1242  while( proc_index < nblocks && procs_to_[proc_index] < my_proc )
1243  ++proc_index;
1244  if( proc_index == nblocks ) proc_index = 0;
1245 
1246  int self_num = 0;
1247  int p;
1248 
1249  if( !indices_to_ ) //data already blocked by processor
1250  {
1251  for( i = 0; i < nblocks; ++i )
1252  {
1253  p = i + proc_index;
1254  if( p > (nblocks-1) ) p -= nblocks;
1255 
1256  if( procs_to_[p] != my_proc )
1257  MPI_Rsend( &export_objs[starts_to_ptr_[p]*obj_size],
1258  sizes_to_[p]*obj_size,
1259  MPI_CHAR,
1260  procs_to_[p],
1261  tag_,
1262  comm_ );
1263  else
1264  self_num = p;
1265  }
1266 
1267  if( self_msg_ )
1268  memcpy( &import_objs[self_recv_address],
1269  &export_objs[starts_to_ptr_[self_num]*obj_size],
1270  sizes_to_[self_num]*obj_size );
1271  }
1272  else //data not blocked by proc, need to copy to buffer
1273  {
1274  if( send_array_size_ && send_array_size_ < (max_send_length_*obj_size) )
1275  {
1276  if (send_array_!=0) {delete [] send_array_; send_array_ = 0;}
1277  send_array_ = 0;
1278  send_array_size_ = 0;
1279  }
1280  if( !send_array_size_ )
1281  {
1283  if (send_array_size_>0) send_array_ = new char[send_array_size_];
1284  }
1285 
1286  for( i=0; i<nblocks; ++i )
1287  {
1288  p = i + proc_index;
1289  if( p > (nblocks-1) ) p -= nblocks;
1290 
1291  if( procs_to_[p] != my_proc )
1292  {
1293  int offset = 0;
1294  j = starts_to_[p];
1295  for( k=0; k<lengths_to_[p]; ++k )
1296  {
1297  memcpy( &send_array_[offset],
1298  &export_objs[indices_to_ptr_[j]*obj_size],
1299  sizes_[indices_to_[j]]*obj_size );
1300  offset += sizes_[indices_to_[j]]*obj_size;
1301  ++j;
1302  }
1303  MPI_Rsend( send_array_, sizes_to_[p]*obj_size,
1304  MPI_CHAR, procs_to_[p], tag_, comm_ );
1305  }
1306  else
1307  self_num = p;
1308  }
1309 
1310  if( self_msg_ )
1311  {
1312  int jj;
1313  j = starts_to_[self_num];
1314  for( k=0; k<lengths_to_[self_num]; ++k )
1315  {
1316  jj = indices_to_ptr_[j];
1317  memcpy( &import_objs[self_recv_address],
1318  &export_objs[jj*obj_size],
1319  sizes_[indices_to_[j]*obj_size] );
1320  self_recv_address += (obj_size*sizes_[indices_to_[j]]);
1321  ++jj;
1322  }
1323  }
1324  }
1325 
1326  return(0);
1327 }
1328 
1329 //==============================================================================
1331  int obj_size,
1332  int *& sizes,
1333  int & len_import_objs,
1334  char *& import_objs )
1335 {
1336  assert(indices_to_==0); //Can only do reverse comm when original data
1337  // is blocked by processor
1338 
1339  // If we don't have a reverse distributor, make one.
1340  if( comm_plan_reverse_ == 0 )
1342 
1343  int comm_flag = comm_plan_reverse_->DoPosts(export_objs, obj_size, sizes, len_import_objs, import_objs);
1344 
1345  return(comm_flag);
1346 }
1347 
1348 //==============================================================================
1349 void Epetra_MpiDistributor::Print(std::ostream & os) const
1350 {
1351  using std::endl;
1352 
1353  int myRank = 0, numProcs = 1;
1354  MPI_Comm_rank (comm_, &myRank);
1355  MPI_Comm_size (comm_, &numProcs);
1356 
1357  if (myRank == 0) {
1358  os << "Epetra_MpiDistributor (implements Epetra_Distributor)" << std::endl;
1359  }
1360  // Let each MPI process print its data. We assume that all
1361  // processes can print to the given output stream, and execute
1362  // barriers to make it more likely that the output will be in the
1363  // right order.
1364  for (int p = 0; p < numProcs; ++p) {
1365  if (myRank == p) {
1366  os << "[Node " << p << " of " << numProcs << "]" << std::endl;
1367  os << " selfMessage: " << self_msg_ << std::endl;
1368  os << " numSends: " << nsends_ << std::endl;
1369 
1370  os << " imagesTo: [";
1371  for (int i = 0; i < nsends_; ++i) {
1372  os << procs_to_[i];
1373  if (i < nsends_ - 1) {
1374  os << " ";
1375  }
1376  }
1377  os << "]" << std::endl;
1378 
1379  os << " lengthsTo: [";
1380  for (int i = 0; i < nsends_; ++i) {
1381  os << lengths_to_[i];
1382  if (i < nsends_ - 1) {
1383  os << " ";
1384  }
1385  }
1386  os << "]" << std::endl;
1387 
1388  os << " maxSendLength: " << max_send_length_ << std::endl;
1389 
1390  os << " startsTo: ";
1391  if (starts_to_ == NULL) {
1392  os << "(NULL)" << std::endl;
1393  } else {
1394  os << "[";
1395  for (int i = 0; i < nsends_; ++i) {
1396  os << starts_to_[i];
1397  if (i < nsends_ - 1) {
1398  os << " ";
1399  }
1400  }
1401  os << "]" << std::endl;
1402  }
1403 
1404  os << " indicesTo: ";
1405  if (indices_to_ == NULL) {
1406  os << "(NULL)" << std::endl;
1407  } else {
1408  os << "[";
1409  int k = 0;
1410  for (int i = 0; i < nsends_; ++i) {
1411  for (int j = 0; j < lengths_to_[i]; ++j) {
1412  os << " " << indices_to_[j+k];
1413  }
1414  k += lengths_to_[i];
1415  }
1416  os << "]" << std::endl;
1417  }
1418 
1419  os << " numReceives: " << nrecvs_ << std::endl;
1420  os << " totalReceiveLength: " << total_recv_length_ << std::endl;
1421 
1422  os << " lengthsFrom: [";
1423  for (int i = 0; i < nrecvs_; ++i) {
1424  os << lengths_from_[i];
1425  if (i < nrecvs_ - 1) {
1426  os << " ";
1427  }
1428  }
1429  os << "]" << std::endl;
1430 
1431  os << " startsFrom: [";
1432  for (int i = 0; i < nrecvs_; ++i) {
1433  os << starts_from_[i];
1434  if (i < nrecvs_ - 1) {
1435  os << " ";
1436  }
1437  }
1438  os << "]" << std::endl;
1439 
1440  os << " imagesFrom: [";
1441  for (int i = 0; i < nrecvs_; ++i) {
1442  os << procs_from_[i];
1443  if (i < nrecvs_ - 1) {
1444  os << " ";
1445  }
1446  }
1447  os << "]" << std::endl;
1448 
1449  // mfh 16 Dec 2011: I found this commented out here; not sure if
1450  // we want to print this, so I'm leaving it commented out.
1451  /*
1452  os << "indices_from: ";
1453  k = 0;
1454  for( i = 0; i < nrecvs_; i++ )
1455  {
1456  for( j = 0; j < lengths_from_[i]; j++ )
1457  os << " " << indices_from_[j+k];
1458  k += lengths_from_[i];
1459  }
1460  */
1461 
1462  // Last output is a flush; it leaves a space and also
1463  // helps synchronize output.
1464  os << std::flush;
1465  } // if it's my process' turn to print
1466 
1467  // Execute barriers to give output time to synchronize.
1468  // One barrier generally isn't enough.
1469  MPI_Barrier (comm_);
1470  MPI_Barrier (comm_);
1471  MPI_Barrier (comm_);
1472  }
1473 }
1474 
1475 //---------------------------------------------------------------------------
1477  int *vals_sort, // values to be sorted
1478  int *vals_other, // other array to be reordered with sort
1479  int nvals) // length of these two arrays
1480 {
1481 // It is primarily used to sort messages to improve communication flow.
1482 // This routine will also insure that the ordering produced by the invert_map
1483 // routines is deterministic. This should make bugs more reproducible. This
1484 // is accomplished by sorting the message lists by processor ID.
1485 // This is a distribution count sort algorithm (see Knuth)
1486 // This version assumes non negative integers.
1487 
1488  if (nvals <= 1) return 0;
1489 
1490  int i; // loop counter
1491 
1492  // find largest int, n, to size sorting array, then allocate and clear it
1493  int n = 0;
1494  for (i = 0; i < nvals; i++)
1495  if (n < vals_sort[i]) n = vals_sort[i];
1496  int *pos = new int [n+2];
1497  for (i = 0; i < n+2; i++) pos[i] = 0;
1498 
1499  // copy input arrays into temporary copies to allow sorting original arrays
1500  int *copy_sort = new int [nvals];
1501  int *copy_other = new int [nvals];
1502  for (i = 0; i < nvals; i++)
1503  {
1504  copy_sort[i] = vals_sort[i];
1505  copy_other[i] = vals_other[i];
1506  }
1507 
1508  // count the occurances of integers ("distribution count")
1509  int *p = pos+1;
1510  for (i = 0; i < nvals; i++) p[copy_sort[i]]++;
1511 
1512  // create the partial sum of distribution counts
1513  for (i = 1; i < n; i++) p[i] += p[i-1];
1514 
1515  // the shifted partitial sum is the index to store the data in sort order
1516  p = pos;
1517  for (i = 0; i < nvals; i++)
1518  {
1519  vals_sort [p[copy_sort [i]]] = copy_sort[i];
1520  vals_other [p[copy_sort [i]]++] = copy_other[i];
1521  }
1522 
1523  delete [] copy_sort;
1524  delete [] copy_other;
1525  delete [] pos;
1526 
1527  return 0;
1528 }
1529 
1530 //-------------------------------------------------------------------------
1532 {
1533  (void)src;
1534  //not currently supported
1535  bool throw_error = true;
1536  if (throw_error) {
1537  throw ReportError("Epetra_MpiDistributor::operator= not supported.",-1);
1538  }
1539  return( *this );
1540 }
1541 
1542 
1543 //-------------------------------------------------------------------------
1545  int i;
1546  int my_proc = 0;
1547 
1548  MPI_Comm_rank( comm_, &my_proc );
1549 
1550  if( comm_plan_reverse_ == 0 ) {
1551  int total_send_length = 0;
1552  for( i = 0; i < nsends_+self_msg_; i++ )
1553  total_send_length += lengths_to_[i];
1554 
1555  int max_recv_length = 0;
1556  for( i = 0; i < nrecvs_; i++ )
1557  if( procs_from_[i] != my_proc )
1558  if( lengths_from_[i] > max_recv_length )
1559  max_recv_length = lengths_from_[i];
1560 
1562 
1567 
1572 
1576 
1577  comm_plan_reverse_->max_send_length_ = max_recv_length;
1578  comm_plan_reverse_->total_recv_length_ = total_send_length;
1579 
1580  comm_plan_reverse_->request_ = new MPI_Request[ comm_plan_reverse_->nrecvs_ ];
1582 
1584  }
1585 
1586 }
1587 
1588 //-------------------------------------------------------------------------
1590  if(comm_plan_reverse_==0)
1592 
1593  return(dynamic_cast<Epetra_Distributor *>(new Epetra_MpiDistributor(*comm_plan_reverse_)));
1594 }
int CreateSendStructures_(int my_proc, int nprocs, const int &NumExportIDs, const int *ExportPIDs)
int ComputeSends_(int num_imports, const id_type *&import_ids, const int *&import_procs, int &num_exports, id_type *&export_ids, int *&export_procs, int my_proc)
int ComputeRecvs_(int my_proc, int nprocs)
Epetra_MpiDistributor & operator=(const Epetra_MpiDistributor &src)
MPI_Comm GetMpiComm() const
Get the MPI Communicator (identical to Comm() method; used when we know we are MPI.
int DoPosts(char *export_objs, int obj_size, int &len_import_objs, char *&import_objs)
Post buffer of export objects (can do other local work before executing Waits)
Epetra_Distributor: The Epetra Gather/Scatter Setup Base Class.
int CreateFromRecvs(const int &NumRemoteIDs, const int *RemoteGIDs, const int *RemotePIDs, bool Deterministic, int &NumExportIDs, int *&ExportGIDs, int *&ExportPIDs)
Create a communication plan from receive list.
int NumReceives() const
The number of procs from which we will receive data.
int DoReverseWaits()
Wait on a reverse set of posts.
#define EPETRA_CHK_ERR(a)
int CreateFromSendsAndRecvs(const int &NumExportIDs, const int *ExportPIDs, const int &NumRemoteIDs, const int *RemoteGIDs, const int *RemotePIDs, bool Deterministic)
Create a communication plan from send list and a recv list.
MPI implementation of Epetra_Distributor.
Epetra_MpiComm: The Epetra MPI Communication Class.
int CreateRecvStructures_(const int &NumRemoteIDs, const int *RemotePIDs)
int NumSends() const
The number of procs to which we will send data.
Epetra_Object: The base Epetra class.
Definition: Epetra_Object.h:57
virtual ~Epetra_MpiDistributor()
Destructor (declared virtual for memory safety).
int CreateFromSends(const int &NumExportIDs, const int *ExportPIDs, bool Deterministic, int &NumRemoteIDs)
Create a communication plan from send list.
MPI_Comm Comm() const
Extract MPI Communicator from a Epetra_MpiComm object.
int DoWaits()
Wait on a set of posts.
int DoReversePosts(char *export_objs, int obj_size, int &len_import_objs, char *&import_objs)
Do reverse post of buffer of export objects (can do other local work before executing Waits) ...
Epetra_Distributor * ReverseClone()
Create and extract the reverse version of the distributor.
int GetMpiTag() const
Acquire an MPI tag from the Epetra range of 24050-24099, increment tag.
void Print(std::ostream &os) const
Epetra_MpiDistributor * comm_plan_reverse_
int Sort_ints_(int *vals, int *other, int nvals)
int Do(char *export_objs, int obj_size, int &len_import_objs, char *&import_objs)
Execute plan on buffer of export objects in a single step.
Epetra_MpiDistributor(const Epetra_MpiComm &Comm)
Default constructor.
virtual int ReportError(const std::string Message, int ErrorCode) const
Error reporting method.
int DoReverse(char *export_objs, int obj_size, int &len_import_objs, char *&import_objs)
Execute reverse of plan on buffer of export objects in a single step.
const Epetra_MpiComm * epComm_
int n