Tpetra parallel linear algebra  Version of the Day
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
Tpetra_Details_DistributorPlan.cpp
1 // @HEADER
2 // *****************************************************************************
3 // Tpetra: Templated Linear Algebra Services Package
4 //
5 // Copyright 2008 NTESS and the Tpetra contributors.
6 // SPDX-License-Identifier: BSD-3-Clause
7 // *****************************************************************************
8 // @HEADER
9 
11 
12 #include "Teuchos_StandardParameterEntryValidators.hpp"
13 #include "Tpetra_Util.hpp"
15 #include <numeric>
16 
17 namespace Tpetra {
18 namespace Details {
19 
20 std::string
22 {
23  if (sendType == DISTRIBUTOR_ISEND) {
24  return "Isend";
25  }
26  else if (sendType == DISTRIBUTOR_SEND) {
27  return "Send";
28  }
29  else if (sendType == DISTRIBUTOR_ALLTOALL) {
30  return "Alltoall";
31  }
32 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
33  else if (sendType == DISTRIBUTOR_MPIADVANCE_ALLTOALL) {
34  return "MpiAdvanceAlltoall";
35  }
36  else if (sendType == DISTRIBUTOR_MPIADVANCE_NBRALLTOALLV) {
37  return "MpiAdvanceNbralltoallv";
38  }
39 #endif
40  else {
41  TEUCHOS_TEST_FOR_EXCEPTION(true, std::invalid_argument, "Invalid "
42  "EDistributorSendType enum value " << sendType << ".");
43  }
44 }
45 
46 std::string
48 {
49  switch (how) {
50  case Details::DISTRIBUTOR_NOT_INITIALIZED:
51  return "Not initialized yet";
52  case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS:
53  return "By createFromSends";
54  case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_RECVS:
55  return "By createFromRecvs";
56  case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS_N_RECVS:
57  return "By createFromSendsAndRecvs";
58  case Details::DISTRIBUTOR_INITIALIZED_BY_REVERSE:
59  return "By createReverseDistributor";
60  case Details::DISTRIBUTOR_INITIALIZED_BY_COPY:
61  return "By copy constructor";
62  default:
63  return "INVALID";
64  }
65 }
66 
67 DistributorPlan::DistributorPlan(Teuchos::RCP<const Teuchos::Comm<int>> comm)
68  : comm_(comm),
69 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
70  mpixComm_(Teuchos::null),
71 #endif
72  howInitialized_(DISTRIBUTOR_NOT_INITIALIZED),
73  reversePlan_(Teuchos::null),
74  sendType_(DISTRIBUTOR_SEND),
75  sendMessageToSelf_(false),
76  numSendsToOtherProcs_(0),
77  maxSendLength_(0),
78  numReceives_(0),
79  totalReceiveLength_(0)
80 { }
81 
82 DistributorPlan::DistributorPlan(const DistributorPlan& otherPlan)
83  : comm_(otherPlan.comm_),
84 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
85  mpixComm_(otherPlan.mpixComm_),
86 #endif
87  howInitialized_(DISTRIBUTOR_INITIALIZED_BY_COPY),
88  reversePlan_(otherPlan.reversePlan_),
89  sendType_(otherPlan.sendType_),
90  sendMessageToSelf_(otherPlan.sendMessageToSelf_),
91  numSendsToOtherProcs_(otherPlan.numSendsToOtherProcs_),
92  procIdsToSendTo_(otherPlan.procIdsToSendTo_),
93  startsTo_(otherPlan.startsTo_),
94  lengthsTo_(otherPlan.lengthsTo_),
95  maxSendLength_(otherPlan.maxSendLength_),
96  indicesTo_(otherPlan.indicesTo_),
97  numReceives_(otherPlan.numReceives_),
98  totalReceiveLength_(otherPlan.totalReceiveLength_),
99  lengthsFrom_(otherPlan.lengthsFrom_),
100  procsFrom_(otherPlan.procsFrom_),
101  startsFrom_(otherPlan.startsFrom_),
102  indicesFrom_(otherPlan.indicesFrom_)
103 { }
104 
105 size_t DistributorPlan::createFromSends(const Teuchos::ArrayView<const int>& exportProcIDs) {
106  using Teuchos::outArg;
107  using Teuchos::REDUCE_MAX;
108  using Teuchos::reduceAll;
109  using std::endl;
110  const char rawPrefix[] = "Tpetra::DistributorPlan::createFromSends";
111 
112  const size_t numExports = exportProcIDs.size();
113  const int myProcID = comm_->getRank();
114  const int numProcs = comm_->getSize();
115  const bool debug = Details::Behavior::debug("Distributor");
116 
117  // exportProcIDs tells us the communication pattern for this
118  // distributor. It dictates the way that the export data will be
119  // interpreted in doPosts(). We want to perform at most one
120  // send per process in doPosts; this is for two reasons:
121  // * minimize latency / overhead in the comm routines (nice)
122  // * match the number of receives and sends between processes
123  // (necessary)
124  //
125  // Teuchos::Comm requires that the data for a send are contiguous
126  // in a send buffer. Therefore, if the data in the send buffer
127  // for doPosts() are not contiguous, they will need to be copied
128  // into a contiguous buffer. The user has specified this
129  // noncontiguous pattern and we can't do anything about it.
130  // However, if they do not provide an efficient pattern, we will
131  // warn them if one of the following compile-time options has been
132  // set:
133  // * HAVE_TPETRA_PRINT_EFFICIENCY_WARNINGS
134  //
135  // If the data are contiguous, then we can post the sends in situ
136  // (i.e., without needing to copy them into a send buffer).
137  //
138  // Determine contiguity. There are a number of ways to do this:
139  // * If the export IDs are sorted, then all exports to a
140  // particular proc must be contiguous. This is what Epetra does.
141  // * If the export ID of the current export already has been
142  // listed, then the previous listing should correspond to the
143  // same export. This tests contiguity, but not sortedness.
144  //
145  // Both of these tests require O(n), where n is the number of
146  // exports. However, the latter will positively identify a greater
147  // portion of contiguous patterns. We use the latter method.
148  //
149  // Check to see if values are grouped by procs without gaps
150  // If so, indices_to -> 0.
151 
152  if (debug) {
153  // Test whether any process in the communicator got an invalid
154  // process ID. If badID != -1 on this process, then it equals
155  // this process' rank. The max of all badID over all processes
156  // is the max rank which has an invalid process ID.
157  int badID = -1;
158  for (size_t i = 0; i < numExports; ++i) {
159  const int exportID = exportProcIDs[i];
160  if (exportID >= numProcs || exportID < 0) {
161  badID = myProcID;
162  break;
163  }
164  }
165  int gbl_badID;
166  reduceAll<int, int> (*comm_, REDUCE_MAX, badID, outArg (gbl_badID));
167  TEUCHOS_TEST_FOR_EXCEPTION
168  (gbl_badID >= 0, std::runtime_error, rawPrefix << "Proc "
169  << gbl_badID << ", perhaps among other processes, got a bad "
170  "send process ID.");
171  }
172 
173  // Set up data structures for quick traversal of arrays.
174  // This contains the number of sends for each process ID.
175  //
176  // FIXME (mfh 20 Mar 2014) This is one of a few places in Tpetra
177  // that create an array of length the number of processes in the
178  // communicator (plus one). Given how this code uses this array,
179  // it should be straightforward to replace it with a hash table or
180  // some other more space-efficient data structure. In practice,
181  // most of the entries of starts should be zero for a sufficiently
182  // large process count, unless the communication pattern is dense.
183  // Note that it's important to be able to iterate through keys (i
184  // for which starts[i] is nonzero) in increasing order.
185  Teuchos::Array<size_t> starts (numProcs + 1, 0);
186 
187  // numActive is the number of sends that are not Null
188  size_t numActive = 0;
189  int needSendBuff = 0; // Boolean
190 
191  for (size_t i = 0; i < numExports; ++i) {
192  const int exportID = exportProcIDs[i];
193  if (exportID >= 0) {
194  // exportID is a valid process ID. Increment the number of
195  // messages this process will send to that process.
196  ++starts[exportID];
197 
198  // If we're sending more than one message to process exportID,
199  // then it is possible that the data are not contiguous.
200  // Check by seeing if the previous process ID in the list
201  // (exportProcIDs[i-1]) is the same. It's safe to use i-1,
202  // because if starts[exportID] > 1, then i must be > 1 (since
203  // the starts array was filled with zeros initially).
204 
205  // null entries break continuity.
206  // e.g., [ 0, 0, 0, 1, -99, 1, 2, 2, 2] is not contiguous
207  if (needSendBuff == 0 && starts[exportID] > 1 &&
208  exportID != exportProcIDs[i-1]) {
209  needSendBuff = 1;
210  }
211  ++numActive;
212  }
213  }
214 
215 #if defined(HAVE_TPETRA_PRINT_EFFICIENCY_WARNINGS)
216  {
217  int global_needSendBuff;
218  reduceAll<int, int> (*comm_, REDUCE_MAX, needSendBuff,
219  outArg (global_needSendBuff));
221  global_needSendBuff != 0,
222  "::createFromSends: Grouping export IDs together by process rank often "
223  "improves performance.");
224  }
225 #endif
226 
227  // Determine from the caller's data whether or not the current
228  // process should send (a) message(s) to itself.
229  if (starts[myProcID] != 0) {
230  sendMessageToSelf_ = true;
231  }
232  else {
233  sendMessageToSelf_ = false;
234  }
235 
236  if (! needSendBuff) {
237  // grouped by proc, no send buffer or indicesTo_ needed
238  numSendsToOtherProcs_ = 0;
239  // Count total number of sends, i.e., total number of procs to
240  // which we are sending. This includes myself, if applicable.
241  for (int i = 0; i < numProcs; ++i) {
242  if (starts[i]) {
243  ++numSendsToOtherProcs_;
244  }
245  }
246 
247  // Not only do we not need these, but we must clear them, as
248  // empty status of indicesTo is a flag used later.
249  indicesTo_.resize(0);
250  // Size these to numSendsToOtherProcs_; note, at the moment, numSendsToOtherProcs_
251  // includes self sends. Set their values to zeros.
252  procIdsToSendTo_.assign(numSendsToOtherProcs_,0);
253  startsTo_.assign(numSendsToOtherProcs_,0);
254  lengthsTo_.assign(numSendsToOtherProcs_,0);
255 
256  // set startsTo to the offset for each send (i.e., each proc ID)
257  // set procsTo to the proc ID for each send
258  // in interpreting this code, remember that we are assuming contiguity
259  // that is why index skips through the ranks
260  {
261  size_t procIndex = 0;
262  for (size_t i = 0; i < numSendsToOtherProcs_; ++i) {
263  while (exportProcIDs[procIndex] < 0) {
264  ++procIndex; // skip all negative proc IDs
265  }
266  startsTo_[i] = procIndex;
267  int procID = exportProcIDs[procIndex];
268  procIdsToSendTo_[i] = procID;
269  procIndex += starts[procID];
270  }
271  }
272  // sort the startsTo and proc IDs together, in ascending order, according
273  // to proc IDs
274  if (numSendsToOtherProcs_ > 0) {
275  sort2(procIdsToSendTo_.begin(), procIdsToSendTo_.end(), startsTo_.begin());
276  }
277  // compute the maximum send length
278  maxSendLength_ = 0;
279  for (size_t i = 0; i < numSendsToOtherProcs_; ++i) {
280  int procID = procIdsToSendTo_[i];
281  lengthsTo_[i] = starts[procID];
282  if ((procID != myProcID) && (lengthsTo_[i] > maxSendLength_)) {
283  maxSendLength_ = lengthsTo_[i];
284  }
285  }
286  }
287  else {
288  // not grouped by proc, need send buffer and indicesTo_
289 
290  // starts[i] is the number of sends to proc i
291  // numActive equals number of sends total, \sum_i starts[i]
292 
293  // this loop starts at starts[1], so explicitly check starts[0]
294  if (starts[0] == 0 ) {
295  numSendsToOtherProcs_ = 0;
296  }
297  else {
298  numSendsToOtherProcs_ = 1;
299  }
300  for (Teuchos::Array<size_t>::iterator i=starts.begin()+1,
301  im1=starts.begin();
302  i != starts.end(); ++i)
303  {
304  if (*i != 0) ++numSendsToOtherProcs_;
305  *i += *im1;
306  im1 = i;
307  }
308  // starts[i] now contains the number of exports to procs 0 through i
309 
310  for (Teuchos::Array<size_t>::reverse_iterator ip1=starts.rbegin(),
311  i=starts.rbegin()+1;
312  i != starts.rend(); ++i)
313  {
314  *ip1 = *i;
315  ip1 = i;
316  }
317  starts[0] = 0;
318  // starts[i] now contains the number of exports to procs 0 through
319  // i-1, i.e., all procs before proc i
320 
321  indicesTo_.resize(numActive);
322 
323  for (size_t i = 0; i < numExports; ++i) {
324  if (exportProcIDs[i] >= 0) {
325  // record the offset to the sendBuffer for this export
326  indicesTo_[starts[exportProcIDs[i]]] = i;
327  // now increment the offset for this proc
328  ++starts[exportProcIDs[i]];
329  }
330  }
331  // our send buffer will contain the export data for each of the procs
332  // we communicate with, in order by proc id
333  // sendBuffer = {proc_0_data, proc_1_data, ..., proc_np-1_data}
334  // indicesTo now maps each export to the location in our send buffer
335  // associated with the export
336  // data for export i located at sendBuffer[indicesTo[i]]
337  //
338  // starts[i] once again contains the number of exports to
339  // procs 0 through i
340  for (int proc = numProcs-1; proc != 0; --proc) {
341  starts[proc] = starts[proc-1];
342  }
343  starts.front() = 0;
344  starts[numProcs] = numActive;
345  //
346  // starts[proc] once again contains the number of exports to
347  // procs 0 through proc-1
348  // i.e., the start of my data in the sendBuffer
349 
350  // this contains invalid data at procs we don't care about, that is okay
351  procIdsToSendTo_.resize(numSendsToOtherProcs_);
352  startsTo_.resize(numSendsToOtherProcs_);
353  lengthsTo_.resize(numSendsToOtherProcs_);
354 
355  // for each group of sends/exports, record the destination proc,
356  // the length, and the offset for this send into the
357  // send buffer (startsTo_)
358  maxSendLength_ = 0;
359  size_t snd = 0;
360  for (int proc = 0; proc < numProcs; ++proc ) {
361  if (starts[proc+1] != starts[proc]) {
362  lengthsTo_[snd] = starts[proc+1] - starts[proc];
363  startsTo_[snd] = starts[proc];
364  // record max length for all off-proc sends
365  if ((proc != myProcID) && (lengthsTo_[snd] > maxSendLength_)) {
366  maxSendLength_ = lengthsTo_[snd];
367  }
368  procIdsToSendTo_[snd] = proc;
369  ++snd;
370  }
371  }
372  }
373 
374  if (sendMessageToSelf_) {
375  --numSendsToOtherProcs_;
376  }
377 
378  // Invert map to see what msgs are received and what length
379  computeReceives();
380 
381 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
382  initializeMpiAdvance();
383 #endif
384 
385  // createFromRecvs() calls createFromSends(), but will set
386  // howInitialized_ again after calling createFromSends().
387  howInitialized_ = Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS;
388 
389  return totalReceiveLength_;
390 }
391 
392 void DistributorPlan::createFromRecvs(const Teuchos::ArrayView<const int>& remoteProcIDs)
393 {
394  createFromSends(remoteProcIDs);
395  *this = *getReversePlan();
396  howInitialized_ = Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_RECVS;
397 }
398 
399 void DistributorPlan::createFromSendsAndRecvs(const Teuchos::ArrayView<const int>& exportProcIDs,
400  const Teuchos::ArrayView<const int>& remoteProcIDs)
401 {
402  // note the exportProcIDs and remoteProcIDs _must_ be a list that has
403  // an entry for each GID. If the export/remoteProcIDs is taken from
404  // the getProcs{From|To} lists that are extracted from a previous distributor,
405  // it will generate a wrong answer, because those lists have a unique entry
406  // for each processor id. A version of this with lengthsTo and lengthsFrom
407  // should be made.
408 
409  howInitialized_ = Tpetra::Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS_N_RECVS;
410 
411  int myProcID = comm_->getRank ();
412  int numProcs = comm_->getSize();
413 
414  const size_t numExportIDs = exportProcIDs.size();
415  Teuchos::Array<size_t> starts (numProcs + 1, 0);
416 
417  size_t numActive = 0;
418  int needSendBuff = 0; // Boolean
419 
420  for(size_t i = 0; i < numExportIDs; i++ )
421  {
422  if( needSendBuff==0 && i && (exportProcIDs[i] < exportProcIDs[i-1]) )
423  needSendBuff = 1;
424  if( exportProcIDs[i] >= 0 )
425  {
426  ++starts[ exportProcIDs[i] ];
427  ++numActive;
428  }
429  }
430 
431  sendMessageToSelf_ = ( starts[myProcID] != 0 ) ? 1 : 0;
432 
433  numSendsToOtherProcs_ = 0;
434 
435  if( needSendBuff ) //grouped by processor, no send buffer or indicesTo_ needed
436  {
437  if (starts[0] == 0 ) {
438  numSendsToOtherProcs_ = 0;
439  }
440  else {
441  numSendsToOtherProcs_ = 1;
442  }
443  for (Teuchos::Array<size_t>::iterator i=starts.begin()+1,
444  im1=starts.begin();
445  i != starts.end(); ++i)
446  {
447  if (*i != 0) ++numSendsToOtherProcs_;
448  *i += *im1;
449  im1 = i;
450  }
451  // starts[i] now contains the number of exports to procs 0 through i
452 
453  for (Teuchos::Array<size_t>::reverse_iterator ip1=starts.rbegin(),
454  i=starts.rbegin()+1;
455  i != starts.rend(); ++i)
456  {
457  *ip1 = *i;
458  ip1 = i;
459  }
460  starts[0] = 0;
461  // starts[i] now contains the number of exports to procs 0 through
462  // i-1, i.e., all procs before proc i
463 
464  indicesTo_.resize(numActive);
465 
466  for (size_t i = 0; i < numExportIDs; ++i) {
467  if (exportProcIDs[i] >= 0) {
468  // record the offset to the sendBuffer for this export
469  indicesTo_[starts[exportProcIDs[i]]] = i;
470  // now increment the offset for this proc
471  ++starts[exportProcIDs[i]];
472  }
473  }
474  for (int proc = numProcs-1; proc != 0; --proc) {
475  starts[proc] = starts[proc-1];
476  }
477  starts.front() = 0;
478  starts[numProcs] = numActive;
479  procIdsToSendTo_.resize(numSendsToOtherProcs_);
480  startsTo_.resize(numSendsToOtherProcs_);
481  lengthsTo_.resize(numSendsToOtherProcs_);
482  maxSendLength_ = 0;
483  size_t snd = 0;
484  for (int proc = 0; proc < numProcs; ++proc ) {
485  if (starts[proc+1] != starts[proc]) {
486  lengthsTo_[snd] = starts[proc+1] - starts[proc];
487  startsTo_[snd] = starts[proc];
488  // record max length for all off-proc sends
489  if ((proc != myProcID) && (lengthsTo_[snd] > maxSendLength_)) {
490  maxSendLength_ = lengthsTo_[snd];
491  }
492  procIdsToSendTo_[snd] = proc;
493  ++snd;
494  }
495  }
496  }
497  else {
498  // grouped by proc, no send buffer or indicesTo_ needed
499  numSendsToOtherProcs_ = 0;
500  // Count total number of sends, i.e., total number of procs to
501  // which we are sending. This includes myself, if applicable.
502  for (int i = 0; i < numProcs; ++i) {
503  if (starts[i]) {
504  ++numSendsToOtherProcs_;
505  }
506  }
507 
508  // Not only do we not need these, but we must clear them, as
509  // empty status of indicesTo is a flag used later.
510  indicesTo_.resize(0);
511  // Size these to numSendsToOtherProcs_; note, at the moment, numSendsToOtherProcs_
512  // includes self sends. Set their values to zeros.
513  procIdsToSendTo_.assign(numSendsToOtherProcs_,0);
514  startsTo_.assign(numSendsToOtherProcs_,0);
515  lengthsTo_.assign(numSendsToOtherProcs_,0);
516 
517  // set startsTo to the offset for each send (i.e., each proc ID)
518  // set procsTo to the proc ID for each send
519  // in interpreting this code, remember that we are assuming contiguity
520  // that is why index skips through the ranks
521  {
522  size_t procIndex = 0;
523  for (size_t i = 0; i < numSendsToOtherProcs_; ++i) {
524  while (exportProcIDs[procIndex] < 0) {
525  ++procIndex; // skip all negative proc IDs
526  }
527  startsTo_[i] = procIndex;
528  int procID = exportProcIDs[procIndex];
529  procIdsToSendTo_[i] = procID;
530  procIndex += starts[procID];
531  }
532  }
533  // sort the startsTo and proc IDs together, in ascending order, according
534  // to proc IDs
535  if (numSendsToOtherProcs_ > 0) {
536  sort2(procIdsToSendTo_.begin(), procIdsToSendTo_.end(), startsTo_.begin());
537  }
538  // compute the maximum send length
539  maxSendLength_ = 0;
540  for (size_t i = 0; i < numSendsToOtherProcs_; ++i) {
541  int procID = procIdsToSendTo_[i];
542  lengthsTo_[i] = starts[procID];
543  if ((procID != myProcID) && (lengthsTo_[i] > maxSendLength_)) {
544  maxSendLength_ = lengthsTo_[i];
545  }
546  }
547  }
548 
549 
550  numSendsToOtherProcs_ -= sendMessageToSelf_;
551  std::vector<int> recv_list;
552  recv_list.reserve(numSendsToOtherProcs_); //reserve an initial guess for size needed
553 
554  int last_pid=-2;
555  for(int i=0; i<remoteProcIDs.size(); i++) {
556  if(remoteProcIDs[i]>last_pid) {
557  recv_list.push_back(remoteProcIDs[i]);
558  last_pid = remoteProcIDs[i];
559  }
560  else if (remoteProcIDs[i]<last_pid)
561  throw std::runtime_error("Tpetra::Distributor:::createFromSendsAndRecvs expected RemotePIDs to be in sorted order");
562  }
563  numReceives_ = recv_list.size();
564  if(numReceives_) {
565  procsFrom_.assign(numReceives_,0);
566  lengthsFrom_.assign(numReceives_,0);
567  indicesFrom_.assign(numReceives_,0);
568  startsFrom_.assign(numReceives_,0);
569  }
570  for(size_t i=0,j=0; i<numReceives_; ++i) {
571  int jlast=j;
572  procsFrom_[i] = recv_list[i];
573  startsFrom_[i] = j;
574  for( ; j<(size_t)remoteProcIDs.size() &&
575  remoteProcIDs[jlast]==remoteProcIDs[j] ; j++){;}
576  lengthsFrom_[i] = j-jlast;
577  }
578  totalReceiveLength_ = remoteProcIDs.size();
579  indicesFrom_.clear ();
580  numReceives_-=sendMessageToSelf_;
581 
582 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
583  initializeMpiAdvance();
584 #endif
585 }
586 
587 Teuchos::RCP<DistributorPlan> DistributorPlan::getReversePlan() const {
588  if (reversePlan_.is_null()) createReversePlan();
589  return reversePlan_;
590 }
591 
592 void DistributorPlan::createReversePlan() const
593 {
594  reversePlan_ = Teuchos::rcp(new DistributorPlan(comm_));
595  reversePlan_->howInitialized_ = Details::DISTRIBUTOR_INITIALIZED_BY_REVERSE;
596  reversePlan_->sendType_ = sendType_;
597 
598  // The total length of all the sends of this DistributorPlan. We
599  // calculate it because it's the total length of all the receives
600  // of the reverse DistributorPlan.
601  size_t totalSendLength =
602  std::accumulate(lengthsTo_.begin(), lengthsTo_.end(), 0);
603 
604  // The maximum length of any of the receives of this DistributorPlan.
605  // We calculate it because it's the maximum length of any of the
606  // sends of the reverse DistributorPlan.
607  size_t maxReceiveLength = 0;
608  const int myProcID = comm_->getRank();
609  for (size_t i=0; i < numReceives_; ++i) {
610  if (procsFrom_[i] != myProcID) {
611  // Don't count receives for messages sent by myself to myself.
612  if (lengthsFrom_[i] > maxReceiveLength) {
613  maxReceiveLength = lengthsFrom_[i];
614  }
615  }
616  }
617 
618  reversePlan_->sendMessageToSelf_ = sendMessageToSelf_;
619  reversePlan_->numSendsToOtherProcs_ = numReceives_;
620  reversePlan_->procIdsToSendTo_ = procsFrom_;
621  reversePlan_->startsTo_ = startsFrom_;
622  reversePlan_->lengthsTo_ = lengthsFrom_;
623  reversePlan_->maxSendLength_ = maxReceiveLength;
624  reversePlan_->indicesTo_ = indicesFrom_;
625  reversePlan_->numReceives_ = numSendsToOtherProcs_;
626  reversePlan_->totalReceiveLength_ = totalSendLength;
627  reversePlan_->lengthsFrom_ = lengthsTo_;
628  reversePlan_->procsFrom_ = procIdsToSendTo_;
629  reversePlan_->startsFrom_ = startsTo_;
630  reversePlan_->indicesFrom_ = indicesTo_;
631 
632 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
633  // is there a smarter way to do this
634  reversePlan_->initializeMpiAdvance();
635 #endif
636 }
637 
638 void DistributorPlan::computeReceives()
639 {
640  using Teuchos::Array;
641  using Teuchos::ArrayRCP;
642  using Teuchos::as;
643  using Teuchos::CommStatus;
644  using Teuchos::CommRequest;
645  using Teuchos::ireceive;
646  using Teuchos::RCP;
647  using Teuchos::rcp;
648  using Teuchos::REDUCE_SUM;
649  using Teuchos::receive;
650  using Teuchos::reduce;
651  using Teuchos::scatter;
652  using Teuchos::send;
653  using Teuchos::waitAll;
654 
655  const int myRank = comm_->getRank();
656  const int numProcs = comm_->getSize();
657 
658  const int mpiTag = DEFAULT_MPI_TAG;
659 
660  // toProcsFromMe[i] == the number of messages sent by this process
661  // to process i. The data in numSendsToOtherProcs_, procIdsToSendTo_, and lengthsTo_
662  // concern the contiguous sends. Therefore, each process will be
663  // listed in procIdsToSendTo_ at most once, and so toProcsFromMe[i] will
664  // either be 0 or 1.
665  {
666  Array<int> toProcsFromMe (numProcs, 0);
667 #ifdef HAVE_TPETRA_DEBUG
668  bool counting_error = false;
669 #endif // HAVE_TPETRA_DEBUG
670  for (size_t i = 0; i < (numSendsToOtherProcs_ + (sendMessageToSelf_ ? 1 : 0)); ++i) {
671 #ifdef HAVE_TPETRA_DEBUG
672  if (toProcsFromMe[procIdsToSendTo_[i]] != 0) {
673  counting_error = true;
674  }
675 #endif // HAVE_TPETRA_DEBUG
676  toProcsFromMe[procIdsToSendTo_[i]] = 1;
677  }
678 #ifdef HAVE_TPETRA_DEBUG
679  // Note that SHARED_TEST_FOR_EXCEPTION does a global reduction
680  SHARED_TEST_FOR_EXCEPTION(counting_error, std::logic_error,
681  "Tpetra::Distributor::computeReceives: There was an error on at least "
682  "one process in counting the number of messages send by that process to "
683  "the other processs. Please report this bug to the Tpetra developers.",
684  *comm_);
685 #endif // HAVE_TPETRA_DEBUG
686 
687  // Compute the number of receives that this process needs to
688  // post. The number of receives includes any self sends (i.e.,
689  // messages sent by this process to itself).
690  //
691  // (We will use numReceives_ this below to post exactly that
692  // number of receives, with MPI_ANY_SOURCE as the sending rank.
693  // This will tell us from which processes this process expects
694  // to receive, and how many packets of data we expect to receive
695  // from each process.)
696  //
697  // toProcsFromMe[i] is the number of messages sent by this
698  // process to process i. Compute the sum (elementwise) of all
699  // the toProcsFromMe arrays on all processes in the
700  // communicator. If the array x is that sum, then if this
701  // process has rank j, x[j] is the number of messages sent
702  // to process j, that is, the number of receives on process j
703  // (including any messages sent by process j to itself).
704  //
705  // Yes, this requires storing and operating on an array of
706  // length P, where P is the number of processes in the
707  // communicator. Epetra does this too. Avoiding this O(P)
708  // memory bottleneck would require some research.
709  //
710  // mfh 09 Jan 2012, 15 Jul 2015: There are three ways to
711  // implement this O(P) memory algorithm.
712  //
713  // 1. Use MPI_Reduce and MPI_Scatter: reduce on the root
714  // process (0) from toProcsFromMe, to numRecvsOnEachProc.
715  // Then, scatter the latter, so that each process p gets
716  // numRecvsOnEachProc[p].
717  //
718  // 2. Like #1, but use MPI_Reduce_scatter instead of
719  // MPI_Reduce and MPI_Scatter. MPI_Reduce_scatter might be
720  // optimized to reduce the number of messages, but
721  // MPI_Reduce_scatter is more general than we need (it
722  // allows the equivalent of MPI_Scatterv). See Bug 6336.
723  //
724  // 3. Do an all-reduce on toProcsFromMe, and let my process
725  // (with rank myRank) get numReceives_ from
726  // toProcsFromMe[myRank]. The HPCCG miniapp uses the
727  // all-reduce method.
728  //
729  // Approaches 1 and 3 have the same critical path length.
730  // However, #3 moves more data. This is because the final
731  // result is just one integer, but #3 moves a whole array of
732  // results to all the processes. This is why we use Approach 1
733  // here.
734  //
735  // mfh 12 Apr 2013: See discussion in createFromSends() about
736  // how we could use this communication to propagate an error
737  // flag for "free" in a release build.
738 
739  const int root = 0; // rank of root process of the reduction
740  Array<int> numRecvsOnEachProc; // temp; only needed on root
741  if (myRank == root) {
742  numRecvsOnEachProc.resize (numProcs);
743  }
744  int numReceivesAsInt = 0; // output
745  reduce<int, int> (toProcsFromMe.getRawPtr (),
746  numRecvsOnEachProc.getRawPtr (),
747  numProcs, REDUCE_SUM, root, *comm_);
748  scatter<int, int> (numRecvsOnEachProc.getRawPtr (), 1,
749  &numReceivesAsInt, 1, root, *comm_);
750  numReceives_ = static_cast<size_t> (numReceivesAsInt);
751  }
752 
753  // Now we know numReceives_, which is this process' number of
754  // receives. Allocate the lengthsFrom_ and procsFrom_ arrays
755  // with this number of entries.
756  lengthsFrom_.assign (numReceives_, 0);
757  procsFrom_.assign (numReceives_, 0);
758 
759  //
760  // Ask (via nonblocking receive) each process from which we are
761  // receiving how many packets we should expect from it in the
762  // communication pattern.
763  //
764 
765  // At this point, numReceives_ includes any self message that
766  // there may be. At the end of this routine, we'll subtract off
767  // the self message (if there is one) from numReceives_. In this
768  // routine, we don't need to receive a message from ourselves in
769  // order to figure out our lengthsFrom_ and source process ID; we
770  // can just ask ourselves directly. Thus, the actual number of
771  // nonblocking receives we post here does not include the self
772  // message.
773  const size_t actualNumReceives = numReceives_ - (sendMessageToSelf_ ? 1 : 0);
774 
775  // Teuchos' wrapper for nonblocking receives requires receive
776  // buffers that it knows won't go away. This is why we use RCPs,
777  // one RCP per nonblocking receive request. They get allocated in
778  // the loop below.
779  Array<RCP<CommRequest<int> > > requests (actualNumReceives);
780  Array<ArrayRCP<size_t> > lengthsFromBuffers (actualNumReceives);
781  Array<RCP<CommStatus<int> > > statuses (actualNumReceives);
782 
783  // Teuchos::Comm treats a negative process ID as MPI_ANY_SOURCE
784  // (receive data from any process).
785 #ifdef HAVE_MPI
786  const int anySourceProc = MPI_ANY_SOURCE;
787 #else
788  const int anySourceProc = -1;
789 #endif
790 
791  // Post the (nonblocking) receives.
792  for (size_t i = 0; i < actualNumReceives; ++i) {
793  // Once the receive completes, we can ask the corresponding
794  // CommStatus object (output by wait()) for the sending process'
795  // ID (which we'll assign to procsFrom_[i] -- don't forget to
796  // do that!).
797  lengthsFromBuffers[i].resize (1);
798  lengthsFromBuffers[i][0] = as<size_t> (0);
799  requests[i] = ireceive<int, size_t> (lengthsFromBuffers[i], anySourceProc,
800  mpiTag, *comm_);
801  }
802 
803  // Post the sends: Tell each process to which we are sending how
804  // many packets it should expect from us in the communication
805  // pattern. We could use nonblocking sends here, as long as we do
806  // a waitAll() on all the sends and receives at once.
807  //
808  // We assume that numSendsToOtherProcs_ and sendMessageToSelf_ have already been
809  // set. The value of numSendsToOtherProcs_ (my process' number of sends) does
810  // not include any message that it might send to itself.
811  for (size_t i = 0; i < numSendsToOtherProcs_ + (sendMessageToSelf_ ? 1 : 0); ++i) {
812  if (procIdsToSendTo_[i] != myRank) {
813  // Send a message to procIdsToSendTo_[i], telling that process that
814  // this communication pattern will send that process
815  // lengthsTo_[i] blocks of packets.
816  const size_t* const lengthsTo_i = &lengthsTo_[i];
817  send<int, size_t> (lengthsTo_i, 1, as<int> (procIdsToSendTo_[i]), mpiTag, *comm_);
818  }
819  else {
820  // We don't need a send in the self-message case. If this
821  // process will send a message to itself in the communication
822  // pattern, then the last element of lengthsFrom_ and
823  // procsFrom_ corresponds to the self-message. Of course
824  // this process knows how long the message is, and the process
825  // ID is its own process ID.
826  lengthsFrom_[numReceives_-1] = lengthsTo_[i];
827  procsFrom_[numReceives_-1] = myRank;
828  }
829  }
830 
831  //
832  // Wait on all the receives. When they arrive, check the status
833  // output of wait() for the receiving process ID, unpack the
834  // request buffers into lengthsFrom_, and set procsFrom_ from the
835  // status.
836  //
837  waitAll (*comm_, requests (), statuses ());
838  for (size_t i = 0; i < actualNumReceives; ++i) {
839  lengthsFrom_[i] = *lengthsFromBuffers[i];
840  procsFrom_[i] = statuses[i]->getSourceRank ();
841  }
842 
843  // Sort the procsFrom_ array, and apply the same permutation to
844  // lengthsFrom_. This ensures that procsFrom_[i] and
845  // lengthsFrom_[i] refers to the same thing.
846  sort2 (procsFrom_.begin(), procsFrom_.end(), lengthsFrom_.begin());
847 
848  // Compute indicesFrom_
849  totalReceiveLength_ =
850  std::accumulate (lengthsFrom_.begin (), lengthsFrom_.end (), 0);
851  indicesFrom_.clear ();
852 
853  startsFrom_.clear ();
854  startsFrom_.reserve (numReceives_);
855  for (size_t i = 0, j = 0; i < numReceives_; ++i) {
856  startsFrom_.push_back(j);
857  j += lengthsFrom_[i];
858  }
859 
860  if (sendMessageToSelf_) {
861  --numReceives_;
862  }
863 }
864 
865 void DistributorPlan::setParameterList(const Teuchos::RCP<Teuchos::ParameterList>& plist)
866 {
867  using Teuchos::FancyOStream;
868  using Teuchos::getIntegralValue;
869  using Teuchos::ParameterList;
870  using Teuchos::parameterList;
871  using Teuchos::RCP;
872  using std::endl;
873 
874  if (! plist.is_null()) {
875  RCP<const ParameterList> validParams = getValidParameters ();
876  plist->validateParametersAndSetDefaults (*validParams);
877 
878  const Details::EDistributorSendType sendType =
879  getIntegralValue<Details::EDistributorSendType> (*plist, "Send type");
880 
881  // Now that we've validated the input list, save the results.
882  sendType_ = sendType;
883 
884  // ParameterListAcceptor semantics require pointer identity of the
885  // sublist passed to setParameterList(), so we save the pointer.
886  this->setMyParamList (plist);
887  }
888 }
889 
890 Teuchos::Array<std::string> distributorSendTypes()
891 {
892  Teuchos::Array<std::string> sendTypes;
893  sendTypes.push_back ("Isend");
894  sendTypes.push_back ("Send");
895  sendTypes.push_back ("Alltoall");
896 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
897  sendTypes.push_back ("MpiAdvanceAlltoall");
898  sendTypes.push_back ("MpiAdvanceNbralltoallv");
899 #endif
900  return sendTypes;
901 }
902 
903 Teuchos::RCP<const Teuchos::ParameterList>
904 DistributorPlan::getValidParameters() const
905 {
906  using Teuchos::Array;
907  using Teuchos::ParameterList;
908  using Teuchos::parameterList;
909  using Teuchos::RCP;
910  using Teuchos::setStringToIntegralParameter;
911 
912  Array<std::string> sendTypes = distributorSendTypes ();
913  const std::string defaultSendType ("Send");
914  Array<Details::EDistributorSendType> sendTypeEnums;
915  sendTypeEnums.push_back (Details::DISTRIBUTOR_ISEND);
916  sendTypeEnums.push_back (Details::DISTRIBUTOR_SEND);
917  sendTypeEnums.push_back (Details::DISTRIBUTOR_ALLTOALL);
918 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
919  sendTypeEnums.push_back (Details::DISTRIBUTOR_MPIADVANCE_ALLTOALL);
920  sendTypeEnums.push_back (Details::DISTRIBUTOR_MPIADVANCE_NBRALLTOALLV);
921 #endif
922 
923  RCP<ParameterList> plist = parameterList ("Tpetra::Distributor");
924 
925  setStringToIntegralParameter<Details::EDistributorSendType> ("Send type",
926  defaultSendType, "When using MPI, the variant of send to use in "
927  "do[Reverse]Posts()", sendTypes(), sendTypeEnums(), plist.getRawPtr());
928  plist->set ("Timer Label","","Label for Time Monitor output");
929 
930  return Teuchos::rcp_const_cast<const ParameterList> (plist);
931 }
932 
933 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
934 
935 // Used by Teuchos::RCP to clean up an owned MPIX_Comm*
936 struct MpixCommDeallocator {
937  void free(MPIX_Comm **comm) const {
938  MPIX_Comm_free(*comm);
939  }
940 };
941 
942 void DistributorPlan::initializeMpiAdvance() {
943 
944  // assert the mpix communicator is null. if this is not the case we will figure out why
945  TEUCHOS_ASSERT(mpixComm_.is_null());
946 
947  // use the members to initialize the graph for neightborhood mode, or just the MPIX communicator for non-neighborhood mode
948  Teuchos::RCP<const Teuchos::MpiComm<int> > mpiComm = Teuchos::rcp_dynamic_cast<const Teuchos::MpiComm<int> >(comm_);
949  Teuchos::RCP<const Teuchos::OpaqueWrapper<MPI_Comm> > rawComm = mpiComm->getRawMpiComm();
950  int err = 0;
951  if (sendType_ == DISTRIBUTOR_MPIADVANCE_ALLTOALL) {
952  MPIX_Comm **mpixComm = new(MPIX_Comm*);
953  err = MPIX_Comm_init(mpixComm, (*rawComm)());
954  mpixComm_ = Teuchos::RCP(mpixComm,
955  MpixCommDeallocator(),
956  true /*take ownership*/
957  );
958  }
959  else if (sendType_ == DISTRIBUTOR_MPIADVANCE_NBRALLTOALLV) {
960  int numRecvs = (int)(numReceives_ + (sendMessageToSelf_ ? 1 : 0));
961  int *sourceRanks = procsFrom_.data();
962 
963  // int *sourceWeights = static_cast<int*>(lengthsFrom_.data());// lengthsFrom_ may not be int
964  const int *sourceWeights = MPI_UNWEIGHTED;
965  int numSends = (int)(numSendsToOtherProcs_ + (sendMessageToSelf_ ? 1 : 0));
966  int *destRanks = procIdsToSendTo_.data();
967 
968  // int *destWeights = static_cast<int*>(lengthsTo_.data()); // lengthsTo_ may not be int
969  const int *destWeights = MPI_UNWEIGHTED; // lengthsTo_ may not be int
970 
971  MPIX_Comm **mpixComm = new(MPIX_Comm*);
972  err = MPIX_Dist_graph_create_adjacent((*rawComm)(), numRecvs, sourceRanks, sourceWeights, numSends, destRanks, destWeights, MPI_INFO_NULL, false, mpixComm);
973  mpixComm_ = Teuchos::RCP(mpixComm,
974  MpixCommDeallocator(),
975  true /*take ownership*/
976  );
977  }
978 
979  TEUCHOS_ASSERT(err == 0);
980 }
981 #endif
982 
983 }
984 }
EDistributorHowInitialized
Enum indicating how and whether a Distributor was initialized.
static bool debug()
Whether Tpetra is in debug mode.
std::string DistributorSendTypeEnumToString(EDistributorSendType sendType)
Convert an EDistributorSendType enum value to a string.
#define TPETRA_EFFICIENCY_WARNING(throw_exception_test, msg)
Print or throw an efficency warning.
void sort2(const IT1 &first1, const IT1 &last1, const IT2 &first2, const bool stableSort=false)
Sort the first array, and apply the resulting permutation to the second array.
Teuchos::Array< std::string > distributorSendTypes()
Valid values for Distributor&#39;s &quot;Send type&quot; parameter.
std::string DistributorHowInitializedEnumToString(EDistributorHowInitialized how)
Convert an EDistributorHowInitialized enum value to a string.
Stand-alone utility functions and macros.
#define SHARED_TEST_FOR_EXCEPTION(throw_exception_test, Exception, msg, comm)
Test for exception, with reduction over the given communicator.
EDistributorSendType
The type of MPI send that Distributor should use.
Declaration of Tpetra::Details::Behavior, a class that describes Tpetra&#39;s behavior.