Tpetra parallel linear algebra  Version of the Day
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
Tpetra_Details_DistributorPlan.cpp
1 // @HEADER
2 // *****************************************************************************
3 // Tpetra: Templated Linear Algebra Services Package
4 //
5 // Copyright 2008 NTESS and the Tpetra contributors.
6 // SPDX-License-Identifier: BSD-3-Clause
7 // *****************************************************************************
8 // @HEADER
9 
11 
12 #include "Teuchos_StandardParameterEntryValidators.hpp"
13 #include "Tpetra_Util.hpp"
15 #include <numeric>
16 
17 namespace Tpetra {
18 namespace Details {
19 
20 std::string
22 {
23  if (sendType == DISTRIBUTOR_ISEND) {
24  return "Isend";
25  }
26  else if (sendType == DISTRIBUTOR_SEND) {
27  return "Send";
28  }
29  else if (sendType == DISTRIBUTOR_ALLTOALL) {
30  return "Alltoall";
31  }
32 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
33  else if (sendType == DISTRIBUTOR_MPIADVANCE_ALLTOALL) {
34  return "MpiAdvanceAlltoall";
35  }
36  else if (sendType == DISTRIBUTOR_MPIADVANCE_NBRALLTOALLV) {
37  return "MpiAdvanceNbralltoallv";
38  }
39 #endif
40  else {
41  TEUCHOS_TEST_FOR_EXCEPTION(true, std::invalid_argument, "Invalid "
42  "EDistributorSendType enum value " << sendType << ".");
43  }
44 }
45 
46 std::string
48 {
49  switch (how) {
50  case Details::DISTRIBUTOR_NOT_INITIALIZED:
51  return "Not initialized yet";
52  case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS:
53  return "By createFromSends";
54  case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_RECVS:
55  return "By createFromRecvs";
56  case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS_N_RECVS:
57  return "By createFromSendsAndRecvs";
58  case Details::DISTRIBUTOR_INITIALIZED_BY_REVERSE:
59  return "By createReverseDistributor";
60  case Details::DISTRIBUTOR_INITIALIZED_BY_COPY:
61  return "By copy constructor";
62  default:
63  return "INVALID";
64  }
65 }
66 
67 DistributorPlan::DistributorPlan(Teuchos::RCP<const Teuchos::Comm<int>> comm)
68  : comm_(comm),
69 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
70  mpixComm_(Teuchos::null),
71 #endif
72  howInitialized_(DISTRIBUTOR_NOT_INITIALIZED),
73  reversePlan_(Teuchos::null),
74  sendType_(DISTRIBUTOR_SEND),
75  sendMessageToSelf_(false),
76  numSendsToOtherProcs_(0),
77  maxSendLength_(0),
78  numReceives_(0),
79  totalReceiveLength_(0)
80 { }
81 
82 DistributorPlan::DistributorPlan(const DistributorPlan& otherPlan)
83  : comm_(otherPlan.comm_),
84 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
85  mpixComm_(otherPlan.mpixComm_),
86 #endif
87  howInitialized_(DISTRIBUTOR_INITIALIZED_BY_COPY),
88  reversePlan_(otherPlan.reversePlan_),
89  sendType_(otherPlan.sendType_),
90  sendMessageToSelf_(otherPlan.sendMessageToSelf_),
91  numSendsToOtherProcs_(otherPlan.numSendsToOtherProcs_),
92  procIdsToSendTo_(otherPlan.procIdsToSendTo_),
93  startsTo_(otherPlan.startsTo_),
94  lengthsTo_(otherPlan.lengthsTo_),
95  maxSendLength_(otherPlan.maxSendLength_),
96  indicesTo_(otherPlan.indicesTo_),
97  numReceives_(otherPlan.numReceives_),
98  totalReceiveLength_(otherPlan.totalReceiveLength_),
99  lengthsFrom_(otherPlan.lengthsFrom_),
100  procsFrom_(otherPlan.procsFrom_),
101  startsFrom_(otherPlan.startsFrom_),
102  indicesFrom_(otherPlan.indicesFrom_)
103 { }
104 
105 size_t DistributorPlan::createFromSends(const Teuchos::ArrayView<const int>& exportProcIDs) {
106  using Teuchos::outArg;
107  using Teuchos::REDUCE_MAX;
108  using Teuchos::reduceAll;
109  using std::endl;
110  const char rawPrefix[] = "Tpetra::DistributorPlan::createFromSends";
111 
112  const size_t numExports = exportProcIDs.size();
113  const int myProcID = comm_->getRank();
114  const int numProcs = comm_->getSize();
115  const bool debug = Details::Behavior::debug("Distributor");
116 
117  // exportProcIDs tells us the communication pattern for this
118  // distributor. It dictates the way that the export data will be
119  // interpreted in doPosts(). We want to perform at most one
120  // send per process in doPosts; this is for two reasons:
121  // * minimize latency / overhead in the comm routines (nice)
122  // * match the number of receives and sends between processes
123  // (necessary)
124  //
125  // Teuchos::Comm requires that the data for a send are contiguous
126  // in a send buffer. Therefore, if the data in the send buffer
127  // for doPosts() are not contiguous, they will need to be copied
128  // into a contiguous buffer. The user has specified this
129  // noncontiguous pattern and we can't do anything about it.
130  // However, if they do not provide an efficient pattern, we will
131  // warn them if one of the following compile-time options has been
132  // set:
133  // * HAVE_TPETRA_PRINT_EFFICIENCY_WARNINGS
134  //
135  // If the data are contiguous, then we can post the sends in situ
136  // (i.e., without needing to copy them into a send buffer).
137  //
138  // Determine contiguity. There are a number of ways to do this:
139  // * If the export IDs are sorted, then all exports to a
140  // particular proc must be contiguous. This is what Epetra does.
141  // * If the export ID of the current export already has been
142  // listed, then the previous listing should correspond to the
143  // same export. This tests contiguity, but not sortedness.
144  //
145  // Both of these tests require O(n), where n is the number of
146  // exports. However, the latter will positively identify a greater
147  // portion of contiguous patterns. We use the latter method.
148  //
149  // Check to see if values are grouped by procs without gaps
150  // If so, indices_to -> 0.
151 
152  if (debug) {
153  // Test whether any process in the communicator got an invalid
154  // process ID. If badID != -1 on this process, then it equals
155  // this process' rank. The max of all badID over all processes
156  // is the max rank which has an invalid process ID.
157  int badID = -1;
158  for (size_t i = 0; i < numExports; ++i) {
159  const int exportID = exportProcIDs[i];
160  if (exportID >= numProcs || exportID < 0) {
161  badID = myProcID;
162  break;
163  }
164  }
165  int gbl_badID;
166  reduceAll<int, int> (*comm_, REDUCE_MAX, badID, outArg (gbl_badID));
167  TEUCHOS_TEST_FOR_EXCEPTION
168  (gbl_badID >= 0, std::runtime_error, rawPrefix << "Proc "
169  << gbl_badID << ", perhaps among other processes, got a bad "
170  "send process ID.");
171  }
172 
173  // Set up data structures for quick traversal of arrays.
174  // This contains the number of sends for each process ID.
175  //
176  // FIXME (mfh 20 Mar 2014) This is one of a few places in Tpetra
177  // that create an array of length the number of processes in the
178  // communicator (plus one). Given how this code uses this array,
179  // it should be straightforward to replace it with a hash table or
180  // some other more space-efficient data structure. In practice,
181  // most of the entries of starts should be zero for a sufficiently
182  // large process count, unless the communication pattern is dense.
183  // Note that it's important to be able to iterate through keys (i
184  // for which starts[i] is nonzero) in increasing order.
185  Teuchos::Array<size_t> starts (numProcs + 1, 0);
186 
187  // numActive is the number of sends that are not Null
188  size_t numActive = 0;
189  int needSendBuff = 0; // Boolean
190 
191  for (size_t i = 0; i < numExports; ++i) {
192  const int exportID = exportProcIDs[i];
193  if (exportID >= 0) {
194  // exportID is a valid process ID. Increment the number of
195  // messages this process will send to that process.
196  ++starts[exportID];
197 
198  // If we're sending more than one message to process exportID,
199  // then it is possible that the data are not contiguous.
200  // Check by seeing if the previous process ID in the list
201  // (exportProcIDs[i-1]) is the same. It's safe to use i-1,
202  // because if starts[exportID] > 1, then i must be > 1 (since
203  // the starts array was filled with zeros initially).
204 
205  // null entries break continuity.
206  // e.g., [ 0, 0, 0, 1, -99, 1, 2, 2, 2] is not contiguous
207  if (needSendBuff == 0 && starts[exportID] > 1 &&
208  exportID != exportProcIDs[i-1]) {
209  needSendBuff = 1;
210  }
211  ++numActive;
212  }
213  }
214 
215 #if defined(HAVE_TPETRA_PRINT_EFFICIENCY_WARNINGS)
216  {
217  int global_needSendBuff;
218  reduceAll<int, int> (*comm_, REDUCE_MAX, needSendBuff,
219  outArg (global_needSendBuff));
221  global_needSendBuff != 0,
222  "::createFromSends: Grouping export IDs together by process rank often "
223  "improves performance.");
224  }
225 #endif
226 
227  // Determine from the caller's data whether or not the current
228  // process should send (a) message(s) to itself.
229  if (starts[myProcID] != 0) {
230  sendMessageToSelf_ = true;
231  }
232  else {
233  sendMessageToSelf_ = false;
234  }
235 
236  if (! needSendBuff) {
237  // grouped by proc, no send buffer or indicesTo_ needed
238  numSendsToOtherProcs_ = 0;
239  // Count total number of sends, i.e., total number of procs to
240  // which we are sending. This includes myself, if applicable.
241  for (int i = 0; i < numProcs; ++i) {
242  if (starts[i]) {
243  ++numSendsToOtherProcs_;
244  }
245  }
246 
247  // Not only do we not need these, but we must clear them, as
248  // empty status of indicesTo is a flag used later.
249  indicesTo_.resize(0);
250  // Size these to numSendsToOtherProcs_; note, at the moment, numSendsToOtherProcs_
251  // includes self sends. Set their values to zeros.
252  procIdsToSendTo_.assign(numSendsToOtherProcs_,0);
253  startsTo_.assign(numSendsToOtherProcs_,0);
254  lengthsTo_.assign(numSendsToOtherProcs_,0);
255 
256  // set startsTo to the offset for each send (i.e., each proc ID)
257  // set procsTo to the proc ID for each send
258  // in interpreting this code, remember that we are assuming contiguity
259  // that is why index skips through the ranks
260  {
261  size_t procIndex = 0;
262  for (size_t i = 0; i < numSendsToOtherProcs_; ++i) {
263  while (exportProcIDs[procIndex] < 0) {
264  ++procIndex; // skip all negative proc IDs
265  }
266  startsTo_[i] = procIndex;
267  int procID = exportProcIDs[procIndex];
268  procIdsToSendTo_[i] = procID;
269  procIndex += starts[procID];
270  }
271  }
272  // sort the startsTo and proc IDs together, in ascending order, according
273  // to proc IDs
274  if (numSendsToOtherProcs_ > 0) {
275  sort2(procIdsToSendTo_.begin(), procIdsToSendTo_.end(), startsTo_.begin());
276  }
277  // compute the maximum send length
278  maxSendLength_ = 0;
279  for (size_t i = 0; i < numSendsToOtherProcs_; ++i) {
280  int procID = procIdsToSendTo_[i];
281  lengthsTo_[i] = starts[procID];
282  if ((procID != myProcID) && (lengthsTo_[i] > maxSendLength_)) {
283  maxSendLength_ = lengthsTo_[i];
284  }
285  }
286  }
287  else {
288  // not grouped by proc, need send buffer and indicesTo_
289 
290  // starts[i] is the number of sends to proc i
291  // numActive equals number of sends total, \sum_i starts[i]
292 
293  // this loop starts at starts[1], so explicitly check starts[0]
294  if (starts[0] == 0 ) {
295  numSendsToOtherProcs_ = 0;
296  }
297  else {
298  numSendsToOtherProcs_ = 1;
299  }
300  for (Teuchos::Array<size_t>::iterator i=starts.begin()+1,
301  im1=starts.begin();
302  i != starts.end(); ++i)
303  {
304  if (*i != 0) ++numSendsToOtherProcs_;
305  *i += *im1;
306  im1 = i;
307  }
308  // starts[i] now contains the number of exports to procs 0 through i
309 
310  for (Teuchos::Array<size_t>::reverse_iterator ip1=starts.rbegin(),
311  i=starts.rbegin()+1;
312  i != starts.rend(); ++i)
313  {
314  *ip1 = *i;
315  ip1 = i;
316  }
317  starts[0] = 0;
318  // starts[i] now contains the number of exports to procs 0 through
319  // i-1, i.e., all procs before proc i
320 
321  indicesTo_.resize(numActive);
322 
323  for (size_t i = 0; i < numExports; ++i) {
324  if (exportProcIDs[i] >= 0) {
325  // record the offset to the sendBuffer for this export
326  indicesTo_[starts[exportProcIDs[i]]] = i;
327  // now increment the offset for this proc
328  ++starts[exportProcIDs[i]];
329  }
330  }
331  // our send buffer will contain the export data for each of the procs
332  // we communicate with, in order by proc id
333  // sendBuffer = {proc_0_data, proc_1_data, ..., proc_np-1_data}
334  // indicesTo now maps each export to the location in our send buffer
335  // associated with the export
336  // data for export i located at sendBuffer[indicesTo[i]]
337  //
338  // starts[i] once again contains the number of exports to
339  // procs 0 through i
340  for (int proc = numProcs-1; proc != 0; --proc) {
341  starts[proc] = starts[proc-1];
342  }
343  starts.front() = 0;
344  starts[numProcs] = numActive;
345  //
346  // starts[proc] once again contains the number of exports to
347  // procs 0 through proc-1
348  // i.e., the start of my data in the sendBuffer
349 
350  // this contains invalid data at procs we don't care about, that is okay
351  procIdsToSendTo_.resize(numSendsToOtherProcs_);
352  startsTo_.resize(numSendsToOtherProcs_);
353  lengthsTo_.resize(numSendsToOtherProcs_);
354 
355  // for each group of sends/exports, record the destination proc,
356  // the length, and the offset for this send into the
357  // send buffer (startsTo_)
358  maxSendLength_ = 0;
359  size_t snd = 0;
360  for (int proc = 0; proc < numProcs; ++proc ) {
361  if (starts[proc+1] != starts[proc]) {
362  lengthsTo_[snd] = starts[proc+1] - starts[proc];
363  startsTo_[snd] = starts[proc];
364  // record max length for all off-proc sends
365  if ((proc != myProcID) && (lengthsTo_[snd] > maxSendLength_)) {
366  maxSendLength_ = lengthsTo_[snd];
367  }
368  procIdsToSendTo_[snd] = proc;
369  ++snd;
370  }
371  }
372  }
373 
374  if (sendMessageToSelf_) {
375  --numSendsToOtherProcs_;
376  }
377 
378  // Invert map to see what msgs are received and what length
379  computeReceives();
380 
381 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
382  initializeMpiAdvance();
383 #endif
384 
385  // createFromRecvs() calls createFromSends(), but will set
386  // howInitialized_ again after calling createFromSends().
387  howInitialized_ = Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS;
388 
389  return totalReceiveLength_;
390 }
391 
392 void DistributorPlan::createFromRecvs(const Teuchos::ArrayView<const int>& remoteProcIDs)
393 {
394  *this = *getReversePlan();
395  howInitialized_ = Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_RECVS;
396 }
397 
398 void DistributorPlan::createFromSendsAndRecvs(const Teuchos::ArrayView<const int>& exportProcIDs,
399  const Teuchos::ArrayView<const int>& remoteProcIDs)
400 {
401  // note the exportProcIDs and remoteProcIDs _must_ be a list that has
402  // an entry for each GID. If the export/remoteProcIDs is taken from
403  // the getProcs{From|To} lists that are extracted from a previous distributor,
404  // it will generate a wrong answer, because those lists have a unique entry
405  // for each processor id. A version of this with lengthsTo and lengthsFrom
406  // should be made.
407 
408  howInitialized_ = Tpetra::Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS_N_RECVS;
409 
410  int myProcID = comm_->getRank ();
411  int numProcs = comm_->getSize();
412 
413  const size_t numExportIDs = exportProcIDs.size();
414  Teuchos::Array<size_t> starts (numProcs + 1, 0);
415 
416  size_t numActive = 0;
417  int needSendBuff = 0; // Boolean
418 
419  for(size_t i = 0; i < numExportIDs; i++ )
420  {
421  if( needSendBuff==0 && i && (exportProcIDs[i] < exportProcIDs[i-1]) )
422  needSendBuff = 1;
423  if( exportProcIDs[i] >= 0 )
424  {
425  ++starts[ exportProcIDs[i] ];
426  ++numActive;
427  }
428  }
429 
430  sendMessageToSelf_ = ( starts[myProcID] != 0 ) ? 1 : 0;
431 
432  numSendsToOtherProcs_ = 0;
433 
434  if( needSendBuff ) //grouped by processor, no send buffer or indicesTo_ needed
435  {
436  if (starts[0] == 0 ) {
437  numSendsToOtherProcs_ = 0;
438  }
439  else {
440  numSendsToOtherProcs_ = 1;
441  }
442  for (Teuchos::Array<size_t>::iterator i=starts.begin()+1,
443  im1=starts.begin();
444  i != starts.end(); ++i)
445  {
446  if (*i != 0) ++numSendsToOtherProcs_;
447  *i += *im1;
448  im1 = i;
449  }
450  // starts[i] now contains the number of exports to procs 0 through i
451 
452  for (Teuchos::Array<size_t>::reverse_iterator ip1=starts.rbegin(),
453  i=starts.rbegin()+1;
454  i != starts.rend(); ++i)
455  {
456  *ip1 = *i;
457  ip1 = i;
458  }
459  starts[0] = 0;
460  // starts[i] now contains the number of exports to procs 0 through
461  // i-1, i.e., all procs before proc i
462 
463  indicesTo_.resize(numActive);
464 
465  for (size_t i = 0; i < numExportIDs; ++i) {
466  if (exportProcIDs[i] >= 0) {
467  // record the offset to the sendBuffer for this export
468  indicesTo_[starts[exportProcIDs[i]]] = i;
469  // now increment the offset for this proc
470  ++starts[exportProcIDs[i]];
471  }
472  }
473  for (int proc = numProcs-1; proc != 0; --proc) {
474  starts[proc] = starts[proc-1];
475  }
476  starts.front() = 0;
477  starts[numProcs] = numActive;
478  procIdsToSendTo_.resize(numSendsToOtherProcs_);
479  startsTo_.resize(numSendsToOtherProcs_);
480  lengthsTo_.resize(numSendsToOtherProcs_);
481  maxSendLength_ = 0;
482  size_t snd = 0;
483  for (int proc = 0; proc < numProcs; ++proc ) {
484  if (starts[proc+1] != starts[proc]) {
485  lengthsTo_[snd] = starts[proc+1] - starts[proc];
486  startsTo_[snd] = starts[proc];
487  // record max length for all off-proc sends
488  if ((proc != myProcID) && (lengthsTo_[snd] > maxSendLength_)) {
489  maxSendLength_ = lengthsTo_[snd];
490  }
491  procIdsToSendTo_[snd] = proc;
492  ++snd;
493  }
494  }
495  }
496  else {
497  // grouped by proc, no send buffer or indicesTo_ needed
498  numSendsToOtherProcs_ = 0;
499  // Count total number of sends, i.e., total number of procs to
500  // which we are sending. This includes myself, if applicable.
501  for (int i = 0; i < numProcs; ++i) {
502  if (starts[i]) {
503  ++numSendsToOtherProcs_;
504  }
505  }
506 
507  // Not only do we not need these, but we must clear them, as
508  // empty status of indicesTo is a flag used later.
509  indicesTo_.resize(0);
510  // Size these to numSendsToOtherProcs_; note, at the moment, numSendsToOtherProcs_
511  // includes self sends. Set their values to zeros.
512  procIdsToSendTo_.assign(numSendsToOtherProcs_,0);
513  startsTo_.assign(numSendsToOtherProcs_,0);
514  lengthsTo_.assign(numSendsToOtherProcs_,0);
515 
516  // set startsTo to the offset for each send (i.e., each proc ID)
517  // set procsTo to the proc ID for each send
518  // in interpreting this code, remember that we are assuming contiguity
519  // that is why index skips through the ranks
520  {
521  size_t procIndex = 0;
522  for (size_t i = 0; i < numSendsToOtherProcs_; ++i) {
523  while (exportProcIDs[procIndex] < 0) {
524  ++procIndex; // skip all negative proc IDs
525  }
526  startsTo_[i] = procIndex;
527  int procID = exportProcIDs[procIndex];
528  procIdsToSendTo_[i] = procID;
529  procIndex += starts[procID];
530  }
531  }
532  // sort the startsTo and proc IDs together, in ascending order, according
533  // to proc IDs
534  if (numSendsToOtherProcs_ > 0) {
535  sort2(procIdsToSendTo_.begin(), procIdsToSendTo_.end(), startsTo_.begin());
536  }
537  // compute the maximum send length
538  maxSendLength_ = 0;
539  for (size_t i = 0; i < numSendsToOtherProcs_; ++i) {
540  int procID = procIdsToSendTo_[i];
541  lengthsTo_[i] = starts[procID];
542  if ((procID != myProcID) && (lengthsTo_[i] > maxSendLength_)) {
543  maxSendLength_ = lengthsTo_[i];
544  }
545  }
546  }
547 
548 
549  numSendsToOtherProcs_ -= sendMessageToSelf_;
550  std::vector<int> recv_list;
551  recv_list.reserve(numSendsToOtherProcs_); //reserve an initial guess for size needed
552 
553  int last_pid=-2;
554  for(int i=0; i<remoteProcIDs.size(); i++) {
555  if(remoteProcIDs[i]>last_pid) {
556  recv_list.push_back(remoteProcIDs[i]);
557  last_pid = remoteProcIDs[i];
558  }
559  else if (remoteProcIDs[i]<last_pid)
560  throw std::runtime_error("Tpetra::Distributor:::createFromSendsAndRecvs expected RemotePIDs to be in sorted order");
561  }
562  numReceives_ = recv_list.size();
563  if(numReceives_) {
564  procsFrom_.assign(numReceives_,0);
565  lengthsFrom_.assign(numReceives_,0);
566  indicesFrom_.assign(numReceives_,0);
567  startsFrom_.assign(numReceives_,0);
568  }
569  for(size_t i=0,j=0; i<numReceives_; ++i) {
570  int jlast=j;
571  procsFrom_[i] = recv_list[i];
572  startsFrom_[i] = j;
573  for( ; j<(size_t)remoteProcIDs.size() &&
574  remoteProcIDs[jlast]==remoteProcIDs[j] ; j++){;}
575  lengthsFrom_[i] = j-jlast;
576  }
577  totalReceiveLength_ = remoteProcIDs.size();
578  indicesFrom_.clear ();
579  numReceives_-=sendMessageToSelf_;
580 
581 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
582  initializeMpiAdvance();
583 #endif
584 }
585 
586 Teuchos::RCP<DistributorPlan> DistributorPlan::getReversePlan() const {
587  if (reversePlan_.is_null()) createReversePlan();
588  return reversePlan_;
589 }
590 
591 void DistributorPlan::createReversePlan() const
592 {
593  reversePlan_ = Teuchos::rcp(new DistributorPlan(comm_));
594  reversePlan_->howInitialized_ = Details::DISTRIBUTOR_INITIALIZED_BY_REVERSE;
595  reversePlan_->sendType_ = sendType_;
596 
597  // The total length of all the sends of this DistributorPlan. We
598  // calculate it because it's the total length of all the receives
599  // of the reverse DistributorPlan.
600  size_t totalSendLength =
601  std::accumulate(lengthsTo_.begin(), lengthsTo_.end(), 0);
602 
603  // The maximum length of any of the receives of this DistributorPlan.
604  // We calculate it because it's the maximum length of any of the
605  // sends of the reverse DistributorPlan.
606  size_t maxReceiveLength = 0;
607  const int myProcID = comm_->getRank();
608  for (size_t i=0; i < numReceives_; ++i) {
609  if (procsFrom_[i] != myProcID) {
610  // Don't count receives for messages sent by myself to myself.
611  if (lengthsFrom_[i] > maxReceiveLength) {
612  maxReceiveLength = lengthsFrom_[i];
613  }
614  }
615  }
616 
617  reversePlan_->sendMessageToSelf_ = sendMessageToSelf_;
618  reversePlan_->numSendsToOtherProcs_ = numReceives_;
619  reversePlan_->procIdsToSendTo_ = procsFrom_;
620  reversePlan_->startsTo_ = startsFrom_;
621  reversePlan_->lengthsTo_ = lengthsFrom_;
622  reversePlan_->maxSendLength_ = maxReceiveLength;
623  reversePlan_->indicesTo_ = indicesFrom_;
624  reversePlan_->numReceives_ = numSendsToOtherProcs_;
625  reversePlan_->totalReceiveLength_ = totalSendLength;
626  reversePlan_->lengthsFrom_ = lengthsTo_;
627  reversePlan_->procsFrom_ = procIdsToSendTo_;
628  reversePlan_->startsFrom_ = startsTo_;
629  reversePlan_->indicesFrom_ = indicesTo_;
630 
631 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
632  // is there a smarter way to do this
633  reversePlan_->initializeMpiAdvance();
634 #endif
635 }
636 
637 void DistributorPlan::computeReceives()
638 {
639  using Teuchos::Array;
640  using Teuchos::ArrayRCP;
641  using Teuchos::as;
642  using Teuchos::CommStatus;
643  using Teuchos::CommRequest;
644  using Teuchos::ireceive;
645  using Teuchos::RCP;
646  using Teuchos::rcp;
647  using Teuchos::REDUCE_SUM;
648  using Teuchos::receive;
649  using Teuchos::reduce;
650  using Teuchos::scatter;
651  using Teuchos::send;
652  using Teuchos::waitAll;
653 
654  const int myRank = comm_->getRank();
655  const int numProcs = comm_->getSize();
656 
657  const int mpiTag = DEFAULT_MPI_TAG;
658 
659  // toProcsFromMe[i] == the number of messages sent by this process
660  // to process i. The data in numSendsToOtherProcs_, procIdsToSendTo_, and lengthsTo_
661  // concern the contiguous sends. Therefore, each process will be
662  // listed in procIdsToSendTo_ at most once, and so toProcsFromMe[i] will
663  // either be 0 or 1.
664  {
665  Array<int> toProcsFromMe (numProcs, 0);
666 #ifdef HAVE_TPETRA_DEBUG
667  bool counting_error = false;
668 #endif // HAVE_TPETRA_DEBUG
669  for (size_t i = 0; i < (numSendsToOtherProcs_ + (sendMessageToSelf_ ? 1 : 0)); ++i) {
670 #ifdef HAVE_TPETRA_DEBUG
671  if (toProcsFromMe[procIdsToSendTo_[i]] != 0) {
672  counting_error = true;
673  }
674 #endif // HAVE_TPETRA_DEBUG
675  toProcsFromMe[procIdsToSendTo_[i]] = 1;
676  }
677 #ifdef HAVE_TPETRA_DEBUG
678  // Note that SHARED_TEST_FOR_EXCEPTION does a global reduction
679  SHARED_TEST_FOR_EXCEPTION(counting_error, std::logic_error,
680  "Tpetra::Distributor::computeReceives: There was an error on at least "
681  "one process in counting the number of messages send by that process to "
682  "the other processs. Please report this bug to the Tpetra developers.",
683  *comm_);
684 #endif // HAVE_TPETRA_DEBUG
685 
686  // Compute the number of receives that this process needs to
687  // post. The number of receives includes any self sends (i.e.,
688  // messages sent by this process to itself).
689  //
690  // (We will use numReceives_ this below to post exactly that
691  // number of receives, with MPI_ANY_SOURCE as the sending rank.
692  // This will tell us from which processes this process expects
693  // to receive, and how many packets of data we expect to receive
694  // from each process.)
695  //
696  // toProcsFromMe[i] is the number of messages sent by this
697  // process to process i. Compute the sum (elementwise) of all
698  // the toProcsFromMe arrays on all processes in the
699  // communicator. If the array x is that sum, then if this
700  // process has rank j, x[j] is the number of messages sent
701  // to process j, that is, the number of receives on process j
702  // (including any messages sent by process j to itself).
703  //
704  // Yes, this requires storing and operating on an array of
705  // length P, where P is the number of processes in the
706  // communicator. Epetra does this too. Avoiding this O(P)
707  // memory bottleneck would require some research.
708  //
709  // mfh 09 Jan 2012, 15 Jul 2015: There are three ways to
710  // implement this O(P) memory algorithm.
711  //
712  // 1. Use MPI_Reduce and MPI_Scatter: reduce on the root
713  // process (0) from toProcsFromMe, to numRecvsOnEachProc.
714  // Then, scatter the latter, so that each process p gets
715  // numRecvsOnEachProc[p].
716  //
717  // 2. Like #1, but use MPI_Reduce_scatter instead of
718  // MPI_Reduce and MPI_Scatter. MPI_Reduce_scatter might be
719  // optimized to reduce the number of messages, but
720  // MPI_Reduce_scatter is more general than we need (it
721  // allows the equivalent of MPI_Scatterv). See Bug 6336.
722  //
723  // 3. Do an all-reduce on toProcsFromMe, and let my process
724  // (with rank myRank) get numReceives_ from
725  // toProcsFromMe[myRank]. The HPCCG miniapp uses the
726  // all-reduce method.
727  //
728  // Approaches 1 and 3 have the same critical path length.
729  // However, #3 moves more data. This is because the final
730  // result is just one integer, but #3 moves a whole array of
731  // results to all the processes. This is why we use Approach 1
732  // here.
733  //
734  // mfh 12 Apr 2013: See discussion in createFromSends() about
735  // how we could use this communication to propagate an error
736  // flag for "free" in a release build.
737 
738  const int root = 0; // rank of root process of the reduction
739  Array<int> numRecvsOnEachProc; // temp; only needed on root
740  if (myRank == root) {
741  numRecvsOnEachProc.resize (numProcs);
742  }
743  int numReceivesAsInt = 0; // output
744  reduce<int, int> (toProcsFromMe.getRawPtr (),
745  numRecvsOnEachProc.getRawPtr (),
746  numProcs, REDUCE_SUM, root, *comm_);
747  scatter<int, int> (numRecvsOnEachProc.getRawPtr (), 1,
748  &numReceivesAsInt, 1, root, *comm_);
749  numReceives_ = static_cast<size_t> (numReceivesAsInt);
750  }
751 
752  // Now we know numReceives_, which is this process' number of
753  // receives. Allocate the lengthsFrom_ and procsFrom_ arrays
754  // with this number of entries.
755  lengthsFrom_.assign (numReceives_, 0);
756  procsFrom_.assign (numReceives_, 0);
757 
758  //
759  // Ask (via nonblocking receive) each process from which we are
760  // receiving how many packets we should expect from it in the
761  // communication pattern.
762  //
763 
764  // At this point, numReceives_ includes any self message that
765  // there may be. At the end of this routine, we'll subtract off
766  // the self message (if there is one) from numReceives_. In this
767  // routine, we don't need to receive a message from ourselves in
768  // order to figure out our lengthsFrom_ and source process ID; we
769  // can just ask ourselves directly. Thus, the actual number of
770  // nonblocking receives we post here does not include the self
771  // message.
772  const size_t actualNumReceives = numReceives_ - (sendMessageToSelf_ ? 1 : 0);
773 
774  // Teuchos' wrapper for nonblocking receives requires receive
775  // buffers that it knows won't go away. This is why we use RCPs,
776  // one RCP per nonblocking receive request. They get allocated in
777  // the loop below.
778  Array<RCP<CommRequest<int> > > requests (actualNumReceives);
779  Array<ArrayRCP<size_t> > lengthsFromBuffers (actualNumReceives);
780  Array<RCP<CommStatus<int> > > statuses (actualNumReceives);
781 
782  // Teuchos::Comm treats a negative process ID as MPI_ANY_SOURCE
783  // (receive data from any process).
784 #ifdef HAVE_MPI
785  const int anySourceProc = MPI_ANY_SOURCE;
786 #else
787  const int anySourceProc = -1;
788 #endif
789 
790  // Post the (nonblocking) receives.
791  for (size_t i = 0; i < actualNumReceives; ++i) {
792  // Once the receive completes, we can ask the corresponding
793  // CommStatus object (output by wait()) for the sending process'
794  // ID (which we'll assign to procsFrom_[i] -- don't forget to
795  // do that!).
796  lengthsFromBuffers[i].resize (1);
797  lengthsFromBuffers[i][0] = as<size_t> (0);
798  requests[i] = ireceive<int, size_t> (lengthsFromBuffers[i], anySourceProc,
799  mpiTag, *comm_);
800  }
801 
802  // Post the sends: Tell each process to which we are sending how
803  // many packets it should expect from us in the communication
804  // pattern. We could use nonblocking sends here, as long as we do
805  // a waitAll() on all the sends and receives at once.
806  //
807  // We assume that numSendsToOtherProcs_ and sendMessageToSelf_ have already been
808  // set. The value of numSendsToOtherProcs_ (my process' number of sends) does
809  // not include any message that it might send to itself.
810  for (size_t i = 0; i < numSendsToOtherProcs_ + (sendMessageToSelf_ ? 1 : 0); ++i) {
811  if (procIdsToSendTo_[i] != myRank) {
812  // Send a message to procIdsToSendTo_[i], telling that process that
813  // this communication pattern will send that process
814  // lengthsTo_[i] blocks of packets.
815  const size_t* const lengthsTo_i = &lengthsTo_[i];
816  send<int, size_t> (lengthsTo_i, 1, as<int> (procIdsToSendTo_[i]), mpiTag, *comm_);
817  }
818  else {
819  // We don't need a send in the self-message case. If this
820  // process will send a message to itself in the communication
821  // pattern, then the last element of lengthsFrom_ and
822  // procsFrom_ corresponds to the self-message. Of course
823  // this process knows how long the message is, and the process
824  // ID is its own process ID.
825  lengthsFrom_[numReceives_-1] = lengthsTo_[i];
826  procsFrom_[numReceives_-1] = myRank;
827  }
828  }
829 
830  //
831  // Wait on all the receives. When they arrive, check the status
832  // output of wait() for the receiving process ID, unpack the
833  // request buffers into lengthsFrom_, and set procsFrom_ from the
834  // status.
835  //
836  waitAll (*comm_, requests (), statuses ());
837  for (size_t i = 0; i < actualNumReceives; ++i) {
838  lengthsFrom_[i] = *lengthsFromBuffers[i];
839  procsFrom_[i] = statuses[i]->getSourceRank ();
840  }
841 
842  // Sort the procsFrom_ array, and apply the same permutation to
843  // lengthsFrom_. This ensures that procsFrom_[i] and
844  // lengthsFrom_[i] refers to the same thing.
845  sort2 (procsFrom_.begin(), procsFrom_.end(), lengthsFrom_.begin());
846 
847  // Compute indicesFrom_
848  totalReceiveLength_ =
849  std::accumulate (lengthsFrom_.begin (), lengthsFrom_.end (), 0);
850  indicesFrom_.clear ();
851 
852  startsFrom_.clear ();
853  startsFrom_.reserve (numReceives_);
854  for (size_t i = 0, j = 0; i < numReceives_; ++i) {
855  startsFrom_.push_back(j);
856  j += lengthsFrom_[i];
857  }
858 
859  if (sendMessageToSelf_) {
860  --numReceives_;
861  }
862 }
863 
864 void DistributorPlan::setParameterList(const Teuchos::RCP<Teuchos::ParameterList>& plist)
865 {
866  using Teuchos::FancyOStream;
867  using Teuchos::getIntegralValue;
868  using Teuchos::ParameterList;
869  using Teuchos::parameterList;
870  using Teuchos::RCP;
871  using std::endl;
872 
873  if (! plist.is_null()) {
874  RCP<const ParameterList> validParams = getValidParameters ();
875  plist->validateParametersAndSetDefaults (*validParams);
876 
877  const Details::EDistributorSendType sendType =
878  getIntegralValue<Details::EDistributorSendType> (*plist, "Send type");
879 
880  // Now that we've validated the input list, save the results.
881  sendType_ = sendType;
882 
883  // ParameterListAcceptor semantics require pointer identity of the
884  // sublist passed to setParameterList(), so we save the pointer.
885  this->setMyParamList (plist);
886  }
887 }
888 
889 Teuchos::Array<std::string> distributorSendTypes()
890 {
891  Teuchos::Array<std::string> sendTypes;
892  sendTypes.push_back ("Isend");
893  sendTypes.push_back ("Send");
894  sendTypes.push_back ("Alltoall");
895 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
896  sendTypes.push_back ("MpiAdvanceAlltoall");
897  sendTypes.push_back ("MpiAdvanceNbralltoallv");
898 #endif
899  return sendTypes;
900 }
901 
902 Teuchos::RCP<const Teuchos::ParameterList>
903 DistributorPlan::getValidParameters() const
904 {
905  using Teuchos::Array;
906  using Teuchos::ParameterList;
907  using Teuchos::parameterList;
908  using Teuchos::RCP;
909  using Teuchos::setStringToIntegralParameter;
910 
911  Array<std::string> sendTypes = distributorSendTypes ();
912  const std::string defaultSendType ("Send");
913  Array<Details::EDistributorSendType> sendTypeEnums;
914  sendTypeEnums.push_back (Details::DISTRIBUTOR_ISEND);
915  sendTypeEnums.push_back (Details::DISTRIBUTOR_SEND);
916  sendTypeEnums.push_back (Details::DISTRIBUTOR_ALLTOALL);
917 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
918  sendTypeEnums.push_back (Details::DISTRIBUTOR_MPIADVANCE_ALLTOALL);
919  sendTypeEnums.push_back (Details::DISTRIBUTOR_MPIADVANCE_NBRALLTOALLV);
920 #endif
921 
922  RCP<ParameterList> plist = parameterList ("Tpetra::Distributor");
923 
924  setStringToIntegralParameter<Details::EDistributorSendType> ("Send type",
925  defaultSendType, "When using MPI, the variant of send to use in "
926  "do[Reverse]Posts()", sendTypes(), sendTypeEnums(), plist.getRawPtr());
927  plist->set ("Timer Label","","Label for Time Monitor output");
928 
929  return Teuchos::rcp_const_cast<const ParameterList> (plist);
930 }
931 
932 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
933 
934 // Used by Teuchos::RCP to clean up an owned MPIX_Comm*
935 struct MpixCommDeallocator {
936  void free(MPIX_Comm **comm) const {
937  MPIX_Comm_free(*comm);
938  }
939 };
940 
941 void DistributorPlan::initializeMpiAdvance() {
942 
943  // assert the mpix communicator is null. if this is not the case we will figure out why
944  TEUCHOS_ASSERT(mpixComm_.is_null());
945 
946  // use the members to initialize the graph for neightborhood mode, or just the MPIX communicator for non-neighborhood mode
947  Teuchos::RCP<const Teuchos::MpiComm<int> > mpiComm = Teuchos::rcp_dynamic_cast<const Teuchos::MpiComm<int> >(comm_);
948  Teuchos::RCP<const Teuchos::OpaqueWrapper<MPI_Comm> > rawComm = mpiComm->getRawMpiComm();
949  int err = 0;
950  if (sendType_ == DISTRIBUTOR_MPIADVANCE_ALLTOALL) {
951  MPIX_Comm **mpixComm = new(MPIX_Comm*);
952  err = MPIX_Comm_init(mpixComm, (*rawComm)());
953  mpixComm_ = Teuchos::RCP(mpixComm,
954  MpixCommDeallocator(),
955  true /*take ownership*/
956  );
957  }
958  else if (sendType_ == DISTRIBUTOR_MPIADVANCE_NBRALLTOALLV) {
959  int numRecvs = (int)(numReceives_ + (sendMessageToSelf_ ? 1 : 0));
960  int *sourceRanks = procsFrom_.data();
961 
962  // int *sourceWeights = static_cast<int*>(lengthsFrom_.data());// lengthsFrom_ may not be int
963  const int *sourceWeights = MPI_UNWEIGHTED;
964  int numSends = (int)(numSendsToOtherProcs_ + (sendMessageToSelf_ ? 1 : 0));
965  int *destRanks = procIdsToSendTo_.data();
966 
967  // int *destWeights = static_cast<int*>(lengthsTo_.data()); // lengthsTo_ may not be int
968  const int *destWeights = MPI_UNWEIGHTED; // lengthsTo_ may not be int
969 
970  MPIX_Comm **mpixComm = new(MPIX_Comm*);
971  err = MPIX_Dist_graph_create_adjacent((*rawComm)(), numRecvs, sourceRanks, sourceWeights, numSends, destRanks, destWeights, MPI_INFO_NULL, false, mpixComm);
972  mpixComm_ = Teuchos::RCP(mpixComm,
973  MpixCommDeallocator(),
974  true /*take ownership*/
975  );
976  }
977 
978  TEUCHOS_ASSERT(err == 0);
979 }
980 #endif
981 
982 
983  DistributorPlan::SubViewLimits DistributorPlan::getImportViewLimits(size_t numPackets) const {
984  const size_t actualNumReceives = getNumReceives() + (hasSelfMessage() ? 1 : 0);
985 
986  IndexView importStarts(actualNumReceives);
987  IndexView importLengths(actualNumReceives);
988 
989  size_t offset = 0;
990  for (size_t i = 0; i < actualNumReceives; ++i) {
991  importStarts[i] = offset;
992  offset += getLengthsFrom()[i] * numPackets;
993  importLengths[i] = getLengthsFrom()[i] * numPackets;
994  }
995  return std::make_pair(importStarts, importLengths);
996  }
997 
998  DistributorPlan::SubViewLimits DistributorPlan::getImportViewLimits(const Teuchos::ArrayView<const size_t> &numImportPacketsPerLID) const {
999 
1000  const size_t actualNumReceives = getNumReceives() + (hasSelfMessage() ? 1 : 0);
1001 
1002  IndexView importStarts(actualNumReceives);
1003  IndexView importLengths(actualNumReceives);
1004 
1005  size_t offset = 0;
1006  size_t curLIDoffset = 0;
1007  for (size_t i = 0; i < actualNumReceives; ++i) {
1008  size_t totalPacketsFrom_i = 0;
1009  for (size_t j = 0; j < getLengthsFrom()[i]; ++j) {
1010  totalPacketsFrom_i += numImportPacketsPerLID[curLIDoffset + j];
1011  }
1012  curLIDoffset += getLengthsFrom()[i];
1013  importStarts[i] = offset;
1014  offset += totalPacketsFrom_i;
1015  importLengths[i] = totalPacketsFrom_i;
1016  }
1017  return std::make_pair(importStarts, importLengths);
1018  }
1019 
1020 
1021  DistributorPlan::SubViewLimits DistributorPlan::getExportViewLimits(size_t numPackets) const {
1022  if (getIndicesTo().is_null()) {
1023 
1024  const size_t actualNumSends = getNumSends() + (hasSelfMessage() ? 1 : 0);
1025  IndexView exportStarts(actualNumSends);
1026  IndexView exportLengths(actualNumSends);
1027  for (size_t pp = 0; pp < actualNumSends; ++pp) {
1028  exportStarts[pp] = getStartsTo()[pp] * numPackets;
1029  exportLengths[pp] = getLengthsTo()[pp] * numPackets;
1030  }
1031  return std::make_pair(exportStarts, exportLengths);
1032  } else {
1033  const size_t numIndices = getIndicesTo().size();
1034  IndexView exportStarts(numIndices);
1035  IndexView exportLengths(numIndices);
1036  for (size_t j = 0; j < numIndices; ++j) {
1037  exportStarts[j] = getIndicesTo()[j]*numPackets;
1038  exportLengths[j] = numPackets;
1039  }
1040  return std::make_pair(exportStarts, exportLengths);
1041  }
1042  }
1043 
1044  DistributorPlan::SubViewLimits DistributorPlan::getExportViewLimits(const Teuchos::ArrayView<const size_t> &numExportPacketsPerLID) const {
1045  if (getIndicesTo().is_null()) {
1046  const size_t actualNumSends = getNumSends() + (hasSelfMessage() ? 1 : 0);
1047  IndexView exportStarts(actualNumSends);
1048  IndexView exportLengths(actualNumSends);
1049  size_t offset = 0;
1050  for (size_t pp = 0; pp < actualNumSends; ++pp) {
1051  size_t numPackets = 0;
1052  for (size_t j = getStartsTo()[pp];
1053  j < getStartsTo()[pp] + getLengthsTo()[pp]; ++j) {
1054  numPackets += numExportPacketsPerLID[j];
1055  }
1056  exportStarts[pp] = offset;
1057  offset += numPackets;
1058  exportLengths[pp] = numPackets;
1059  }
1060  return std::make_pair(exportStarts, exportLengths);
1061  } else {
1062  const size_t numIndices = getIndicesTo().size();
1063  IndexView exportStarts(numIndices);
1064  IndexView exportLengths(numIndices);
1065  size_t offset = 0;
1066  for (size_t j = 0; j < numIndices; ++j) {
1067  exportStarts[j] = offset;
1068  offset += numExportPacketsPerLID[j];
1069  exportLengths[j] = numExportPacketsPerLID[j];
1070  }
1071  return std::make_pair(exportStarts, exportLengths);
1072  }
1073  }
1074 
1075 }
1076 }
EDistributorHowInitialized
Enum indicating how and whether a Distributor was initialized.
static bool debug()
Whether Tpetra is in debug mode.
std::string DistributorSendTypeEnumToString(EDistributorSendType sendType)
Convert an EDistributorSendType enum value to a string.
#define TPETRA_EFFICIENCY_WARNING(throw_exception_test, msg)
Print or throw an efficency warning.
void sort2(const IT1 &first1, const IT1 &last1, const IT2 &first2, const bool stableSort=false)
Sort the first array, and apply the resulting permutation to the second array.
Teuchos::Array< std::string > distributorSendTypes()
Valid values for Distributor&#39;s &quot;Send type&quot; parameter.
std::string DistributorHowInitializedEnumToString(EDistributorHowInitialized how)
Convert an EDistributorHowInitialized enum value to a string.
Stand-alone utility functions and macros.
#define SHARED_TEST_FOR_EXCEPTION(throw_exception_test, Exception, msg, comm)
Test for exception, with reduction over the given communicator.
EDistributorSendType
The type of MPI send that Distributor should use.
Declaration of Tpetra::Details::Behavior, a class that describes Tpetra&#39;s behavior.