Tpetra parallel linear algebra  Version of the Day
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
Tpetra_Details_DistributorPlan.cpp
1 // @HEADER
2 // *****************************************************************************
3 // Tpetra: Templated Linear Algebra Services Package
4 //
5 // Copyright 2008 NTESS and the Tpetra contributors.
6 // SPDX-License-Identifier: BSD-3-Clause
7 // *****************************************************************************
8 // @HEADER
9 
11 
12 #include "Teuchos_StandardParameterEntryValidators.hpp"
13 #include "Tpetra_Util.hpp"
15 #include <numeric>
16 
17 namespace Tpetra {
18 namespace Details {
19 
20 std::string
22 {
23  if (sendType == DISTRIBUTOR_ISEND) {
24  return "Isend";
25  }
26  else if (sendType == DISTRIBUTOR_SEND) {
27  return "Send";
28  }
29  else if (sendType == DISTRIBUTOR_ALLTOALL) {
30  return "Alltoall";
31  }
32 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
33  else if (sendType == DISTRIBUTOR_MPIADVANCE_ALLTOALL) {
34  return "MpiAdvanceAlltoall";
35  }
36  else if (sendType == DISTRIBUTOR_MPIADVANCE_NBRALLTOALLV) {
37  return "MpiAdvanceNbralltoallv";
38  }
39 #endif
40  else {
41  TEUCHOS_TEST_FOR_EXCEPTION(true, std::invalid_argument, "Invalid "
42  "EDistributorSendType enum value " << sendType << ".");
43  }
44 }
45 
47 DistributorSendTypeStringToEnum (const std::string_view s)
48 {
49  if (s == "Isend") return DISTRIBUTOR_ISEND;
50  if (s == "Send") return DISTRIBUTOR_SEND;
51  if (s == "Alltoall") return DISTRIBUTOR_ALLTOALL;
52 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
53  if (s == "MpiAdvanceAlltoall") return DISTRIBUTOR_MPIADVANCE_ALLTOALL;
54  if (s == "MpiAdvanceNbralltoallv") return DISTRIBUTOR_MPIADVANCE_NBRALLTOALLV;
55 #endif
56  TEUCHOS_TEST_FOR_EXCEPTION(true, std::invalid_argument, "Invalid string to convert to EDistributorSendType enum value: " << s);
57 }
58 
60 const std::string &validSendTypeOrThrow(const std::string &s) {
61  const auto valids = distributorSendTypes();
62  if (std::find(valids.begin(), valids.end(), s) == valids.end()) {
63  TEUCHOS_TEST_FOR_EXCEPTION(true, std::invalid_argument, "Invalid string for EDistributorSendType enum value: " << s);
64  }
65  return s;
66 }
67 
68 std::string
70 {
71  switch (how) {
72  case Details::DISTRIBUTOR_NOT_INITIALIZED:
73  return "Not initialized yet";
74  case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS:
75  return "By createFromSends";
76  case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_RECVS:
77  return "By createFromRecvs";
78  case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS_N_RECVS:
79  return "By createFromSendsAndRecvs";
80  case Details::DISTRIBUTOR_INITIALIZED_BY_REVERSE:
81  return "By createReverseDistributor";
82  case Details::DISTRIBUTOR_INITIALIZED_BY_COPY:
83  return "By copy constructor";
84  default:
85  return "INVALID";
86  }
87 }
88 
89 DistributorPlan::DistributorPlan(Teuchos::RCP<const Teuchos::Comm<int>> comm)
90  : comm_(comm),
91 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
92  mpixComm_(Teuchos::null),
93 #endif
94  howInitialized_(DISTRIBUTOR_NOT_INITIALIZED),
95  reversePlan_(Teuchos::null),
96  sendType_(DistributorSendTypeStringToEnum(Behavior::defaultSendType())),
97  sendMessageToSelf_(false),
98  numSendsToOtherProcs_(0),
99  maxSendLength_(0),
100  numReceives_(0),
101  totalReceiveLength_(0)
102 { }
103 
104 DistributorPlan::DistributorPlan(const DistributorPlan& otherPlan)
105  : comm_(otherPlan.comm_),
106 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
107  mpixComm_(otherPlan.mpixComm_),
108 #endif
109  howInitialized_(DISTRIBUTOR_INITIALIZED_BY_COPY),
110  reversePlan_(otherPlan.reversePlan_),
111  sendType_(otherPlan.sendType_),
112  sendMessageToSelf_(otherPlan.sendMessageToSelf_),
113  numSendsToOtherProcs_(otherPlan.numSendsToOtherProcs_),
114  procIdsToSendTo_(otherPlan.procIdsToSendTo_),
115  startsTo_(otherPlan.startsTo_),
116  lengthsTo_(otherPlan.lengthsTo_),
117  maxSendLength_(otherPlan.maxSendLength_),
118  indicesTo_(otherPlan.indicesTo_),
119  numReceives_(otherPlan.numReceives_),
120  totalReceiveLength_(otherPlan.totalReceiveLength_),
121  lengthsFrom_(otherPlan.lengthsFrom_),
122  procsFrom_(otherPlan.procsFrom_),
123  startsFrom_(otherPlan.startsFrom_),
124  indicesFrom_(otherPlan.indicesFrom_)
125 { }
126 
127 size_t DistributorPlan::createFromSends(const Teuchos::ArrayView<const int>& exportProcIDs) {
128  using Teuchos::outArg;
129  using Teuchos::REDUCE_MAX;
130  using Teuchos::reduceAll;
131  using std::endl;
132  const char rawPrefix[] = "Tpetra::DistributorPlan::createFromSends";
133 
134  const size_t numExports = exportProcIDs.size();
135  const int myProcID = comm_->getRank();
136  const int numProcs = comm_->getSize();
137  const bool debug = Details::Behavior::debug("Distributor");
138 
139  // exportProcIDs tells us the communication pattern for this
140  // distributor. It dictates the way that the export data will be
141  // interpreted in doPosts(). We want to perform at most one
142  // send per process in doPosts; this is for two reasons:
143  // * minimize latency / overhead in the comm routines (nice)
144  // * match the number of receives and sends between processes
145  // (necessary)
146  //
147  // Teuchos::Comm requires that the data for a send are contiguous
148  // in a send buffer. Therefore, if the data in the send buffer
149  // for doPosts() are not contiguous, they will need to be copied
150  // into a contiguous buffer. The user has specified this
151  // noncontiguous pattern and we can't do anything about it.
152  // However, if they do not provide an efficient pattern, we will
153  // warn them if one of the following compile-time options has been
154  // set:
155  // * HAVE_TPETRA_PRINT_EFFICIENCY_WARNINGS
156  //
157  // If the data are contiguous, then we can post the sends in situ
158  // (i.e., without needing to copy them into a send buffer).
159  //
160  // Determine contiguity. There are a number of ways to do this:
161  // * If the export IDs are sorted, then all exports to a
162  // particular proc must be contiguous. This is what Epetra does.
163  // * If the export ID of the current export already has been
164  // listed, then the previous listing should correspond to the
165  // same export. This tests contiguity, but not sortedness.
166  //
167  // Both of these tests require O(n), where n is the number of
168  // exports. However, the latter will positively identify a greater
169  // portion of contiguous patterns. We use the latter method.
170  //
171  // Check to see if values are grouped by procs without gaps
172  // If so, indices_to -> 0.
173 
174  if (debug) {
175  // Test whether any process in the communicator got an invalid
176  // process ID. If badID != -1 on this process, then it equals
177  // this process' rank. The max of all badID over all processes
178  // is the max rank which has an invalid process ID.
179  int badID = -1;
180  for (size_t i = 0; i < numExports; ++i) {
181  const int exportID = exportProcIDs[i];
182  if (exportID >= numProcs || exportID < 0) {
183  badID = myProcID;
184  break;
185  }
186  }
187  int gbl_badID;
188  reduceAll<int, int> (*comm_, REDUCE_MAX, badID, outArg (gbl_badID));
189  TEUCHOS_TEST_FOR_EXCEPTION
190  (gbl_badID >= 0, std::runtime_error, rawPrefix << "Proc "
191  << gbl_badID << ", perhaps among other processes, got a bad "
192  "send process ID.");
193  }
194 
195  // Set up data structures for quick traversal of arrays.
196  // This contains the number of sends for each process ID.
197  //
198  // FIXME (mfh 20 Mar 2014) This is one of a few places in Tpetra
199  // that create an array of length the number of processes in the
200  // communicator (plus one). Given how this code uses this array,
201  // it should be straightforward to replace it with a hash table or
202  // some other more space-efficient data structure. In practice,
203  // most of the entries of starts should be zero for a sufficiently
204  // large process count, unless the communication pattern is dense.
205  // Note that it's important to be able to iterate through keys (i
206  // for which starts[i] is nonzero) in increasing order.
207  Teuchos::Array<size_t> starts (numProcs + 1, 0);
208 
209  // numActive is the number of sends that are not Null
210  size_t numActive = 0;
211  int needSendBuff = 0; // Boolean
212 
213  for (size_t i = 0; i < numExports; ++i) {
214  const int exportID = exportProcIDs[i];
215  if (exportID >= 0) {
216  // exportID is a valid process ID. Increment the number of
217  // messages this process will send to that process.
218  ++starts[exportID];
219 
220  // If we're sending more than one message to process exportID,
221  // then it is possible that the data are not contiguous.
222  // Check by seeing if the previous process ID in the list
223  // (exportProcIDs[i-1]) is the same. It's safe to use i-1,
224  // because if starts[exportID] > 1, then i must be > 1 (since
225  // the starts array was filled with zeros initially).
226 
227  // null entries break continuity.
228  // e.g., [ 0, 0, 0, 1, -99, 1, 2, 2, 2] is not contiguous
229  if (needSendBuff == 0 && starts[exportID] > 1 &&
230  exportID != exportProcIDs[i-1]) {
231  needSendBuff = 1;
232  }
233  ++numActive;
234  }
235  }
236 
237 #if defined(HAVE_TPETRA_PRINT_EFFICIENCY_WARNINGS)
238  {
239  int global_needSendBuff;
240  reduceAll<int, int> (*comm_, REDUCE_MAX, needSendBuff,
241  outArg (global_needSendBuff));
243  global_needSendBuff != 0,
244  "::createFromSends: Grouping export IDs together by process rank often "
245  "improves performance.");
246  }
247 #endif
248 
249  // Determine from the caller's data whether or not the current
250  // process should send (a) message(s) to itself.
251  if (starts[myProcID] != 0) {
252  sendMessageToSelf_ = true;
253  }
254  else {
255  sendMessageToSelf_ = false;
256  }
257 
258  if (! needSendBuff) {
259  // grouped by proc, no send buffer or indicesTo_ needed
260  numSendsToOtherProcs_ = 0;
261  // Count total number of sends, i.e., total number of procs to
262  // which we are sending. This includes myself, if applicable.
263  for (int i = 0; i < numProcs; ++i) {
264  if (starts[i]) {
265  ++numSendsToOtherProcs_;
266  }
267  }
268 
269  // Not only do we not need these, but we must clear them, as
270  // empty status of indicesTo is a flag used later.
271  indicesTo_.resize(0);
272  // Size these to numSendsToOtherProcs_; note, at the moment, numSendsToOtherProcs_
273  // includes self sends. Set their values to zeros.
274  procIdsToSendTo_.assign(numSendsToOtherProcs_,0);
275  startsTo_.assign(numSendsToOtherProcs_,0);
276  lengthsTo_.assign(numSendsToOtherProcs_,0);
277 
278  // set startsTo to the offset for each send (i.e., each proc ID)
279  // set procsTo to the proc ID for each send
280  // in interpreting this code, remember that we are assuming contiguity
281  // that is why index skips through the ranks
282  {
283  size_t procIndex = 0;
284  for (size_t i = 0; i < numSendsToOtherProcs_; ++i) {
285  while (exportProcIDs[procIndex] < 0) {
286  ++procIndex; // skip all negative proc IDs
287  }
288  startsTo_[i] = procIndex;
289  int procID = exportProcIDs[procIndex];
290  procIdsToSendTo_[i] = procID;
291  procIndex += starts[procID];
292  }
293  }
294  // sort the startsTo and proc IDs together, in ascending order, according
295  // to proc IDs
296  if (numSendsToOtherProcs_ > 0) {
297  sort2(procIdsToSendTo_.begin(), procIdsToSendTo_.end(), startsTo_.begin());
298  }
299  // compute the maximum send length
300  maxSendLength_ = 0;
301  for (size_t i = 0; i < numSendsToOtherProcs_; ++i) {
302  int procID = procIdsToSendTo_[i];
303  lengthsTo_[i] = starts[procID];
304  if ((procID != myProcID) && (lengthsTo_[i] > maxSendLength_)) {
305  maxSendLength_ = lengthsTo_[i];
306  }
307  }
308  }
309  else {
310  // not grouped by proc, need send buffer and indicesTo_
311 
312  // starts[i] is the number of sends to proc i
313  // numActive equals number of sends total, \sum_i starts[i]
314 
315  // this loop starts at starts[1], so explicitly check starts[0]
316  if (starts[0] == 0 ) {
317  numSendsToOtherProcs_ = 0;
318  }
319  else {
320  numSendsToOtherProcs_ = 1;
321  }
322  for (Teuchos::Array<size_t>::iterator i=starts.begin()+1,
323  im1=starts.begin();
324  i != starts.end(); ++i)
325  {
326  if (*i != 0) ++numSendsToOtherProcs_;
327  *i += *im1;
328  im1 = i;
329  }
330  // starts[i] now contains the number of exports to procs 0 through i
331 
332  for (Teuchos::Array<size_t>::reverse_iterator ip1=starts.rbegin(),
333  i=starts.rbegin()+1;
334  i != starts.rend(); ++i)
335  {
336  *ip1 = *i;
337  ip1 = i;
338  }
339  starts[0] = 0;
340  // starts[i] now contains the number of exports to procs 0 through
341  // i-1, i.e., all procs before proc i
342 
343  indicesTo_.resize(numActive);
344 
345  for (size_t i = 0; i < numExports; ++i) {
346  if (exportProcIDs[i] >= 0) {
347  // record the offset to the sendBuffer for this export
348  indicesTo_[starts[exportProcIDs[i]]] = i;
349  // now increment the offset for this proc
350  ++starts[exportProcIDs[i]];
351  }
352  }
353  // our send buffer will contain the export data for each of the procs
354  // we communicate with, in order by proc id
355  // sendBuffer = {proc_0_data, proc_1_data, ..., proc_np-1_data}
356  // indicesTo now maps each export to the location in our send buffer
357  // associated with the export
358  // data for export i located at sendBuffer[indicesTo[i]]
359  //
360  // starts[i] once again contains the number of exports to
361  // procs 0 through i
362  for (int proc = numProcs-1; proc != 0; --proc) {
363  starts[proc] = starts[proc-1];
364  }
365  starts.front() = 0;
366  starts[numProcs] = numActive;
367  //
368  // starts[proc] once again contains the number of exports to
369  // procs 0 through proc-1
370  // i.e., the start of my data in the sendBuffer
371 
372  // this contains invalid data at procs we don't care about, that is okay
373  procIdsToSendTo_.resize(numSendsToOtherProcs_);
374  startsTo_.resize(numSendsToOtherProcs_);
375  lengthsTo_.resize(numSendsToOtherProcs_);
376 
377  // for each group of sends/exports, record the destination proc,
378  // the length, and the offset for this send into the
379  // send buffer (startsTo_)
380  maxSendLength_ = 0;
381  size_t snd = 0;
382  for (int proc = 0; proc < numProcs; ++proc ) {
383  if (starts[proc+1] != starts[proc]) {
384  lengthsTo_[snd] = starts[proc+1] - starts[proc];
385  startsTo_[snd] = starts[proc];
386  // record max length for all off-proc sends
387  if ((proc != myProcID) && (lengthsTo_[snd] > maxSendLength_)) {
388  maxSendLength_ = lengthsTo_[snd];
389  }
390  procIdsToSendTo_[snd] = proc;
391  ++snd;
392  }
393  }
394  }
395 
396  if (sendMessageToSelf_) {
397  --numSendsToOtherProcs_;
398  }
399 
400  // Invert map to see what msgs are received and what length
401  computeReceives();
402 
403 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
404  initializeMpiAdvance();
405 #endif
406 
407  // createFromRecvs() calls createFromSends(), but will set
408  // howInitialized_ again after calling createFromSends().
409  howInitialized_ = Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS;
410 
411  return totalReceiveLength_;
412 }
413 
414 void DistributorPlan::createFromRecvs(const Teuchos::ArrayView<const int>& remoteProcIDs)
415 {
416  *this = *getReversePlan();
417  howInitialized_ = Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_RECVS;
418 }
419 
420 void DistributorPlan::createFromSendsAndRecvs(const Teuchos::ArrayView<const int>& exportProcIDs,
421  const Teuchos::ArrayView<const int>& remoteProcIDs)
422 {
423  // note the exportProcIDs and remoteProcIDs _must_ be a list that has
424  // an entry for each GID. If the export/remoteProcIDs is taken from
425  // the getProcs{From|To} lists that are extracted from a previous distributor,
426  // it will generate a wrong answer, because those lists have a unique entry
427  // for each processor id. A version of this with lengthsTo and lengthsFrom
428  // should be made.
429 
430  howInitialized_ = Tpetra::Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS_N_RECVS;
431 
432  int myProcID = comm_->getRank ();
433  int numProcs = comm_->getSize();
434 
435  const size_t numExportIDs = exportProcIDs.size();
436  Teuchos::Array<size_t> starts (numProcs + 1, 0);
437 
438  size_t numActive = 0;
439  int needSendBuff = 0; // Boolean
440 
441  for(size_t i = 0; i < numExportIDs; i++ )
442  {
443  if( needSendBuff==0 && i && (exportProcIDs[i] < exportProcIDs[i-1]) )
444  needSendBuff = 1;
445  if( exportProcIDs[i] >= 0 )
446  {
447  ++starts[ exportProcIDs[i] ];
448  ++numActive;
449  }
450  }
451 
452  sendMessageToSelf_ = ( starts[myProcID] != 0 ) ? 1 : 0;
453 
454  numSendsToOtherProcs_ = 0;
455 
456  if( needSendBuff ) //grouped by processor, no send buffer or indicesTo_ needed
457  {
458  if (starts[0] == 0 ) {
459  numSendsToOtherProcs_ = 0;
460  }
461  else {
462  numSendsToOtherProcs_ = 1;
463  }
464  for (Teuchos::Array<size_t>::iterator i=starts.begin()+1,
465  im1=starts.begin();
466  i != starts.end(); ++i)
467  {
468  if (*i != 0) ++numSendsToOtherProcs_;
469  *i += *im1;
470  im1 = i;
471  }
472  // starts[i] now contains the number of exports to procs 0 through i
473 
474  for (Teuchos::Array<size_t>::reverse_iterator ip1=starts.rbegin(),
475  i=starts.rbegin()+1;
476  i != starts.rend(); ++i)
477  {
478  *ip1 = *i;
479  ip1 = i;
480  }
481  starts[0] = 0;
482  // starts[i] now contains the number of exports to procs 0 through
483  // i-1, i.e., all procs before proc i
484 
485  indicesTo_.resize(numActive);
486 
487  for (size_t i = 0; i < numExportIDs; ++i) {
488  if (exportProcIDs[i] >= 0) {
489  // record the offset to the sendBuffer for this export
490  indicesTo_[starts[exportProcIDs[i]]] = i;
491  // now increment the offset for this proc
492  ++starts[exportProcIDs[i]];
493  }
494  }
495  for (int proc = numProcs-1; proc != 0; --proc) {
496  starts[proc] = starts[proc-1];
497  }
498  starts.front() = 0;
499  starts[numProcs] = numActive;
500  procIdsToSendTo_.resize(numSendsToOtherProcs_);
501  startsTo_.resize(numSendsToOtherProcs_);
502  lengthsTo_.resize(numSendsToOtherProcs_);
503  maxSendLength_ = 0;
504  size_t snd = 0;
505  for (int proc = 0; proc < numProcs; ++proc ) {
506  if (starts[proc+1] != starts[proc]) {
507  lengthsTo_[snd] = starts[proc+1] - starts[proc];
508  startsTo_[snd] = starts[proc];
509  // record max length for all off-proc sends
510  if ((proc != myProcID) && (lengthsTo_[snd] > maxSendLength_)) {
511  maxSendLength_ = lengthsTo_[snd];
512  }
513  procIdsToSendTo_[snd] = proc;
514  ++snd;
515  }
516  }
517  }
518  else {
519  // grouped by proc, no send buffer or indicesTo_ needed
520  numSendsToOtherProcs_ = 0;
521  // Count total number of sends, i.e., total number of procs to
522  // which we are sending. This includes myself, if applicable.
523  for (int i = 0; i < numProcs; ++i) {
524  if (starts[i]) {
525  ++numSendsToOtherProcs_;
526  }
527  }
528 
529  // Not only do we not need these, but we must clear them, as
530  // empty status of indicesTo is a flag used later.
531  indicesTo_.resize(0);
532  // Size these to numSendsToOtherProcs_; note, at the moment, numSendsToOtherProcs_
533  // includes self sends. Set their values to zeros.
534  procIdsToSendTo_.assign(numSendsToOtherProcs_,0);
535  startsTo_.assign(numSendsToOtherProcs_,0);
536  lengthsTo_.assign(numSendsToOtherProcs_,0);
537 
538  // set startsTo to the offset for each send (i.e., each proc ID)
539  // set procsTo to the proc ID for each send
540  // in interpreting this code, remember that we are assuming contiguity
541  // that is why index skips through the ranks
542  {
543  size_t procIndex = 0;
544  for (size_t i = 0; i < numSendsToOtherProcs_; ++i) {
545  while (exportProcIDs[procIndex] < 0) {
546  ++procIndex; // skip all negative proc IDs
547  }
548  startsTo_[i] = procIndex;
549  int procID = exportProcIDs[procIndex];
550  procIdsToSendTo_[i] = procID;
551  procIndex += starts[procID];
552  }
553  }
554  // sort the startsTo and proc IDs together, in ascending order, according
555  // to proc IDs
556  if (numSendsToOtherProcs_ > 0) {
557  sort2(procIdsToSendTo_.begin(), procIdsToSendTo_.end(), startsTo_.begin());
558  }
559  // compute the maximum send length
560  maxSendLength_ = 0;
561  for (size_t i = 0; i < numSendsToOtherProcs_; ++i) {
562  int procID = procIdsToSendTo_[i];
563  lengthsTo_[i] = starts[procID];
564  if ((procID != myProcID) && (lengthsTo_[i] > maxSendLength_)) {
565  maxSendLength_ = lengthsTo_[i];
566  }
567  }
568  }
569 
570 
571  numSendsToOtherProcs_ -= sendMessageToSelf_;
572  std::vector<int> recv_list;
573  recv_list.reserve(numSendsToOtherProcs_); //reserve an initial guess for size needed
574 
575  int last_pid=-2;
576  for(int i=0; i<remoteProcIDs.size(); i++) {
577  if(remoteProcIDs[i]>last_pid) {
578  recv_list.push_back(remoteProcIDs[i]);
579  last_pid = remoteProcIDs[i];
580  }
581  else if (remoteProcIDs[i]<last_pid)
582  throw std::runtime_error("Tpetra::Distributor:::createFromSendsAndRecvs expected RemotePIDs to be in sorted order");
583  }
584  numReceives_ = recv_list.size();
585  if(numReceives_) {
586  procsFrom_.assign(numReceives_,0);
587  lengthsFrom_.assign(numReceives_,0);
588  indicesFrom_.assign(numReceives_,0);
589  startsFrom_.assign(numReceives_,0);
590  }
591  for(size_t i=0,j=0; i<numReceives_; ++i) {
592  int jlast=j;
593  procsFrom_[i] = recv_list[i];
594  startsFrom_[i] = j;
595  for( ; j<(size_t)remoteProcIDs.size() &&
596  remoteProcIDs[jlast]==remoteProcIDs[j] ; j++){;}
597  lengthsFrom_[i] = j-jlast;
598  }
599  totalReceiveLength_ = remoteProcIDs.size();
600  indicesFrom_.clear ();
601  numReceives_-=sendMessageToSelf_;
602 
603 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
604  initializeMpiAdvance();
605 #endif
606 }
607 
608 Teuchos::RCP<DistributorPlan> DistributorPlan::getReversePlan() const {
609  if (reversePlan_.is_null()) createReversePlan();
610  return reversePlan_;
611 }
612 
613 void DistributorPlan::createReversePlan() const
614 {
615  reversePlan_ = Teuchos::rcp(new DistributorPlan(comm_));
616  reversePlan_->howInitialized_ = Details::DISTRIBUTOR_INITIALIZED_BY_REVERSE;
617  reversePlan_->sendType_ = sendType_;
618 
619  // The total length of all the sends of this DistributorPlan. We
620  // calculate it because it's the total length of all the receives
621  // of the reverse DistributorPlan.
622  size_t totalSendLength =
623  std::accumulate(lengthsTo_.begin(), lengthsTo_.end(), 0);
624 
625  // The maximum length of any of the receives of this DistributorPlan.
626  // We calculate it because it's the maximum length of any of the
627  // sends of the reverse DistributorPlan.
628  size_t maxReceiveLength = 0;
629  const int myProcID = comm_->getRank();
630  for (size_t i=0; i < numReceives_; ++i) {
631  if (procsFrom_[i] != myProcID) {
632  // Don't count receives for messages sent by myself to myself.
633  if (lengthsFrom_[i] > maxReceiveLength) {
634  maxReceiveLength = lengthsFrom_[i];
635  }
636  }
637  }
638 
639  reversePlan_->sendMessageToSelf_ = sendMessageToSelf_;
640  reversePlan_->numSendsToOtherProcs_ = numReceives_;
641  reversePlan_->procIdsToSendTo_ = procsFrom_;
642  reversePlan_->startsTo_ = startsFrom_;
643  reversePlan_->lengthsTo_ = lengthsFrom_;
644  reversePlan_->maxSendLength_ = maxReceiveLength;
645  reversePlan_->indicesTo_ = indicesFrom_;
646  reversePlan_->numReceives_ = numSendsToOtherProcs_;
647  reversePlan_->totalReceiveLength_ = totalSendLength;
648  reversePlan_->lengthsFrom_ = lengthsTo_;
649  reversePlan_->procsFrom_ = procIdsToSendTo_;
650  reversePlan_->startsFrom_ = startsTo_;
651  reversePlan_->indicesFrom_ = indicesTo_;
652 
653 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
654  // is there a smarter way to do this
655  reversePlan_->initializeMpiAdvance();
656 #endif
657 }
658 
659 void DistributorPlan::computeReceives()
660 {
661  using Teuchos::Array;
662  using Teuchos::ArrayRCP;
663  using Teuchos::as;
664  using Teuchos::CommStatus;
665  using Teuchos::CommRequest;
666  using Teuchos::ireceive;
667  using Teuchos::RCP;
668  using Teuchos::rcp;
669  using Teuchos::REDUCE_SUM;
670  using Teuchos::receive;
671  using Teuchos::reduce;
672  using Teuchos::scatter;
673  using Teuchos::send;
674  using Teuchos::waitAll;
675 
676  const int myRank = comm_->getRank();
677  const int numProcs = comm_->getSize();
678 
679  const int mpiTag = DEFAULT_MPI_TAG;
680 
681  // toProcsFromMe[i] == the number of messages sent by this process
682  // to process i. The data in numSendsToOtherProcs_, procIdsToSendTo_, and lengthsTo_
683  // concern the contiguous sends. Therefore, each process will be
684  // listed in procIdsToSendTo_ at most once, and so toProcsFromMe[i] will
685  // either be 0 or 1.
686  {
687  Array<int> toProcsFromMe (numProcs, 0);
688 #ifdef HAVE_TPETRA_DEBUG
689  bool counting_error = false;
690 #endif // HAVE_TPETRA_DEBUG
691  for (size_t i = 0; i < (numSendsToOtherProcs_ + (sendMessageToSelf_ ? 1 : 0)); ++i) {
692 #ifdef HAVE_TPETRA_DEBUG
693  if (toProcsFromMe[procIdsToSendTo_[i]] != 0) {
694  counting_error = true;
695  }
696 #endif // HAVE_TPETRA_DEBUG
697  toProcsFromMe[procIdsToSendTo_[i]] = 1;
698  }
699 #ifdef HAVE_TPETRA_DEBUG
700  // Note that SHARED_TEST_FOR_EXCEPTION does a global reduction
701  SHARED_TEST_FOR_EXCEPTION(counting_error, std::logic_error,
702  "Tpetra::Distributor::computeReceives: There was an error on at least "
703  "one process in counting the number of messages send by that process to "
704  "the other processs. Please report this bug to the Tpetra developers.",
705  *comm_);
706 #endif // HAVE_TPETRA_DEBUG
707 
708  // Compute the number of receives that this process needs to
709  // post. The number of receives includes any self sends (i.e.,
710  // messages sent by this process to itself).
711  //
712  // (We will use numReceives_ this below to post exactly that
713  // number of receives, with MPI_ANY_SOURCE as the sending rank.
714  // This will tell us from which processes this process expects
715  // to receive, and how many packets of data we expect to receive
716  // from each process.)
717  //
718  // toProcsFromMe[i] is the number of messages sent by this
719  // process to process i. Compute the sum (elementwise) of all
720  // the toProcsFromMe arrays on all processes in the
721  // communicator. If the array x is that sum, then if this
722  // process has rank j, x[j] is the number of messages sent
723  // to process j, that is, the number of receives on process j
724  // (including any messages sent by process j to itself).
725  //
726  // Yes, this requires storing and operating on an array of
727  // length P, where P is the number of processes in the
728  // communicator. Epetra does this too. Avoiding this O(P)
729  // memory bottleneck would require some research.
730  //
731  // mfh 09 Jan 2012, 15 Jul 2015: There are three ways to
732  // implement this O(P) memory algorithm.
733  //
734  // 1. Use MPI_Reduce and MPI_Scatter: reduce on the root
735  // process (0) from toProcsFromMe, to numRecvsOnEachProc.
736  // Then, scatter the latter, so that each process p gets
737  // numRecvsOnEachProc[p].
738  //
739  // 2. Like #1, but use MPI_Reduce_scatter instead of
740  // MPI_Reduce and MPI_Scatter. MPI_Reduce_scatter might be
741  // optimized to reduce the number of messages, but
742  // MPI_Reduce_scatter is more general than we need (it
743  // allows the equivalent of MPI_Scatterv). See Bug 6336.
744  //
745  // 3. Do an all-reduce on toProcsFromMe, and let my process
746  // (with rank myRank) get numReceives_ from
747  // toProcsFromMe[myRank]. The HPCCG miniapp uses the
748  // all-reduce method.
749  //
750  // Approaches 1 and 3 have the same critical path length.
751  // However, #3 moves more data. This is because the final
752  // result is just one integer, but #3 moves a whole array of
753  // results to all the processes. This is why we use Approach 1
754  // here.
755  //
756  // mfh 12 Apr 2013: See discussion in createFromSends() about
757  // how we could use this communication to propagate an error
758  // flag for "free" in a release build.
759 
760  const int root = 0; // rank of root process of the reduction
761  Array<int> numRecvsOnEachProc; // temp; only needed on root
762  if (myRank == root) {
763  numRecvsOnEachProc.resize (numProcs);
764  }
765  int numReceivesAsInt = 0; // output
766  reduce<int, int> (toProcsFromMe.getRawPtr (),
767  numRecvsOnEachProc.getRawPtr (),
768  numProcs, REDUCE_SUM, root, *comm_);
769  scatter<int, int> (numRecvsOnEachProc.getRawPtr (), 1,
770  &numReceivesAsInt, 1, root, *comm_);
771  numReceives_ = static_cast<size_t> (numReceivesAsInt);
772  }
773 
774  // Now we know numReceives_, which is this process' number of
775  // receives. Allocate the lengthsFrom_ and procsFrom_ arrays
776  // with this number of entries.
777  lengthsFrom_.assign (numReceives_, 0);
778  procsFrom_.assign (numReceives_, 0);
779 
780  //
781  // Ask (via nonblocking receive) each process from which we are
782  // receiving how many packets we should expect from it in the
783  // communication pattern.
784  //
785 
786  // At this point, numReceives_ includes any self message that
787  // there may be. At the end of this routine, we'll subtract off
788  // the self message (if there is one) from numReceives_. In this
789  // routine, we don't need to receive a message from ourselves in
790  // order to figure out our lengthsFrom_ and source process ID; we
791  // can just ask ourselves directly. Thus, the actual number of
792  // nonblocking receives we post here does not include the self
793  // message.
794  const size_t actualNumReceives = numReceives_ - (sendMessageToSelf_ ? 1 : 0);
795 
796  // Teuchos' wrapper for nonblocking receives requires receive
797  // buffers that it knows won't go away. This is why we use RCPs,
798  // one RCP per nonblocking receive request. They get allocated in
799  // the loop below.
800  Array<RCP<CommRequest<int> > > requests (actualNumReceives);
801  Array<ArrayRCP<size_t> > lengthsFromBuffers (actualNumReceives);
802  Array<RCP<CommStatus<int> > > statuses (actualNumReceives);
803 
804  // Teuchos::Comm treats a negative process ID as MPI_ANY_SOURCE
805  // (receive data from any process).
806 #ifdef HAVE_MPI
807  const int anySourceProc = MPI_ANY_SOURCE;
808 #else
809  const int anySourceProc = -1;
810 #endif
811 
812  // Post the (nonblocking) receives.
813  for (size_t i = 0; i < actualNumReceives; ++i) {
814  // Once the receive completes, we can ask the corresponding
815  // CommStatus object (output by wait()) for the sending process'
816  // ID (which we'll assign to procsFrom_[i] -- don't forget to
817  // do that!).
818  lengthsFromBuffers[i].resize (1);
819  lengthsFromBuffers[i][0] = as<size_t> (0);
820  requests[i] = ireceive<int, size_t> (lengthsFromBuffers[i], anySourceProc,
821  mpiTag, *comm_);
822  }
823 
824  // Post the sends: Tell each process to which we are sending how
825  // many packets it should expect from us in the communication
826  // pattern. We could use nonblocking sends here, as long as we do
827  // a waitAll() on all the sends and receives at once.
828  //
829  // We assume that numSendsToOtherProcs_ and sendMessageToSelf_ have already been
830  // set. The value of numSendsToOtherProcs_ (my process' number of sends) does
831  // not include any message that it might send to itself.
832  for (size_t i = 0; i < numSendsToOtherProcs_ + (sendMessageToSelf_ ? 1 : 0); ++i) {
833  if (procIdsToSendTo_[i] != myRank) {
834  // Send a message to procIdsToSendTo_[i], telling that process that
835  // this communication pattern will send that process
836  // lengthsTo_[i] blocks of packets.
837  const size_t* const lengthsTo_i = &lengthsTo_[i];
838  send<int, size_t> (lengthsTo_i, 1, as<int> (procIdsToSendTo_[i]), mpiTag, *comm_);
839  }
840  else {
841  // We don't need a send in the self-message case. If this
842  // process will send a message to itself in the communication
843  // pattern, then the last element of lengthsFrom_ and
844  // procsFrom_ corresponds to the self-message. Of course
845  // this process knows how long the message is, and the process
846  // ID is its own process ID.
847  lengthsFrom_[numReceives_-1] = lengthsTo_[i];
848  procsFrom_[numReceives_-1] = myRank;
849  }
850  }
851 
852  //
853  // Wait on all the receives. When they arrive, check the status
854  // output of wait() for the receiving process ID, unpack the
855  // request buffers into lengthsFrom_, and set procsFrom_ from the
856  // status.
857  //
858  waitAll (*comm_, requests (), statuses ());
859  for (size_t i = 0; i < actualNumReceives; ++i) {
860  lengthsFrom_[i] = *lengthsFromBuffers[i];
861  procsFrom_[i] = statuses[i]->getSourceRank ();
862  }
863 
864  // Sort the procsFrom_ array, and apply the same permutation to
865  // lengthsFrom_. This ensures that procsFrom_[i] and
866  // lengthsFrom_[i] refers to the same thing.
867  sort2 (procsFrom_.begin(), procsFrom_.end(), lengthsFrom_.begin());
868 
869  // Compute indicesFrom_
870  totalReceiveLength_ =
871  std::accumulate (lengthsFrom_.begin (), lengthsFrom_.end (), 0);
872  indicesFrom_.clear ();
873 
874  startsFrom_.clear ();
875  startsFrom_.reserve (numReceives_);
876  for (size_t i = 0, j = 0; i < numReceives_; ++i) {
877  startsFrom_.push_back(j);
878  j += lengthsFrom_[i];
879  }
880 
881  if (sendMessageToSelf_) {
882  --numReceives_;
883  }
884 }
885 
886 void DistributorPlan::setParameterList(const Teuchos::RCP<Teuchos::ParameterList>& plist)
887 {
888  using Teuchos::FancyOStream;
889  using Teuchos::getIntegralValue;
890  using Teuchos::ParameterList;
891  using Teuchos::parameterList;
892  using Teuchos::RCP;
893  using std::endl;
894 
895  if (! plist.is_null()) {
896  RCP<const ParameterList> validParams = getValidParameters ();
897  plist->validateParametersAndSetDefaults (*validParams);
898 
899  const Details::EDistributorSendType sendType =
900  getIntegralValue<Details::EDistributorSendType> (*plist, "Send type");
901 
902  // Now that we've validated the input list, save the results.
903  sendType_ = sendType;
904 
905  // ParameterListAcceptor semantics require pointer identity of the
906  // sublist passed to setParameterList(), so we save the pointer.
907  this->setMyParamList (plist);
908  }
909 }
910 
911 Teuchos::Array<std::string> distributorSendTypes()
912 {
913  Teuchos::Array<std::string> sendTypes;
914  sendTypes.push_back ("Isend");
915  sendTypes.push_back ("Send");
916  sendTypes.push_back ("Alltoall");
917 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
918  sendTypes.push_back ("MpiAdvanceAlltoall");
919  sendTypes.push_back ("MpiAdvanceNbralltoallv");
920 #endif
921  return sendTypes;
922 }
923 
924 Teuchos::Array<EDistributorSendType> distributorSendTypeEnums() {
925  Teuchos::Array<EDistributorSendType> res;
926  res.push_back (DISTRIBUTOR_ISEND);
927  res.push_back (DISTRIBUTOR_SEND);
928  res.push_back (DISTRIBUTOR_ALLTOALL);
929 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
930  res.push_back (DISTRIBUTOR_MPIADVANCE_ALLTOALL);
931  res.push_back (DISTRIBUTOR_MPIADVANCE_NBRALLTOALLV);
932 #endif
933  return res;
934 }
935 
936 Teuchos::RCP<const Teuchos::ParameterList>
937 DistributorPlan::getValidParameters() const
938 {
939  using Teuchos::Array;
940  using Teuchos::ParameterList;
941  using Teuchos::parameterList;
942  using Teuchos::RCP;
943  using Teuchos::setStringToIntegralParameter;
944 
945  Array<std::string> sendTypes = distributorSendTypes ();
946  const Array<Details::EDistributorSendType> sendTypeEnums = distributorSendTypeEnums ();
947 
948  const std::string validatedSendType = validSendTypeOrThrow(Behavior::defaultSendType());
949 
950  RCP<ParameterList> plist = parameterList ("Tpetra::Distributor");
951 
952  setStringToIntegralParameter<Details::EDistributorSendType> ("Send type",
953  validatedSendType, "When using MPI, the variant of send to use in "
954  "do[Reverse]Posts()", sendTypes(), sendTypeEnums(), plist.getRawPtr());
955  plist->set ("Timer Label","","Label for Time Monitor output");
956 
957  return Teuchos::rcp_const_cast<const ParameterList> (plist);
958 }
959 
960 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
961 
962 // Used by Teuchos::RCP to clean up an owned MPIX_Comm*
963 struct MpixCommDeallocator {
964  void free(MPIX_Comm **comm) const {
965  MPIX_Comm_free(*comm);
966  }
967 };
968 
969 void DistributorPlan::initializeMpiAdvance() {
970 
971  // assert the mpix communicator is null. if this is not the case we will figure out why
972  TEUCHOS_ASSERT(mpixComm_.is_null());
973 
974  // use the members to initialize the graph for neightborhood mode, or just the MPIX communicator for non-neighborhood mode
975  Teuchos::RCP<const Teuchos::MpiComm<int> > mpiComm = Teuchos::rcp_dynamic_cast<const Teuchos::MpiComm<int> >(comm_);
976  Teuchos::RCP<const Teuchos::OpaqueWrapper<MPI_Comm> > rawComm = mpiComm->getRawMpiComm();
977  int err = 0;
978  if (sendType_ == DISTRIBUTOR_MPIADVANCE_ALLTOALL) {
979  MPIX_Comm **mpixComm = new(MPIX_Comm*);
980  err = MPIX_Comm_init(mpixComm, (*rawComm)());
981  mpixComm_ = Teuchos::RCP(mpixComm,
982  MpixCommDeallocator(),
983  true /*take ownership*/
984  );
985  }
986  else if (sendType_ == DISTRIBUTOR_MPIADVANCE_NBRALLTOALLV) {
987  int numRecvs = (int)(numReceives_ + (sendMessageToSelf_ ? 1 : 0));
988  int *sourceRanks = procsFrom_.data();
989 
990  // int *sourceWeights = static_cast<int*>(lengthsFrom_.data());// lengthsFrom_ may not be int
991  const int *sourceWeights = MPI_UNWEIGHTED;
992  int numSends = (int)(numSendsToOtherProcs_ + (sendMessageToSelf_ ? 1 : 0));
993  int *destRanks = procIdsToSendTo_.data();
994 
995  // int *destWeights = static_cast<int*>(lengthsTo_.data()); // lengthsTo_ may not be int
996  const int *destWeights = MPI_UNWEIGHTED; // lengthsTo_ may not be int
997 
998  MPIX_Comm **mpixComm = new(MPIX_Comm*);
999  err = MPIX_Dist_graph_create_adjacent((*rawComm)(), numRecvs, sourceRanks, sourceWeights, numSends, destRanks, destWeights, MPI_INFO_NULL, false, mpixComm);
1000  mpixComm_ = Teuchos::RCP(mpixComm,
1001  MpixCommDeallocator(),
1002  true /*take ownership*/
1003  );
1004  }
1005 
1006  TEUCHOS_ASSERT(err == 0);
1007 }
1008 #endif
1009 
1010 
1011  DistributorPlan::SubViewLimits DistributorPlan::getImportViewLimits(size_t numPackets) const {
1012  const size_t actualNumReceives = getNumReceives() + (hasSelfMessage() ? 1 : 0);
1013 
1014  IndexView importStarts(actualNumReceives);
1015  IndexView importLengths(actualNumReceives);
1016 
1017  size_t offset = 0;
1018  for (size_t i = 0; i < actualNumReceives; ++i) {
1019  importStarts[i] = offset;
1020  offset += getLengthsFrom()[i] * numPackets;
1021  importLengths[i] = getLengthsFrom()[i] * numPackets;
1022  }
1023  return std::make_pair(importStarts, importLengths);
1024  }
1025 
1026  DistributorPlan::SubViewLimits DistributorPlan::getImportViewLimits(const Teuchos::ArrayView<const size_t> &numImportPacketsPerLID) const {
1027 
1028  const size_t actualNumReceives = getNumReceives() + (hasSelfMessage() ? 1 : 0);
1029 
1030  IndexView importStarts(actualNumReceives);
1031  IndexView importLengths(actualNumReceives);
1032 
1033  size_t offset = 0;
1034  size_t curLIDoffset = 0;
1035  for (size_t i = 0; i < actualNumReceives; ++i) {
1036  size_t totalPacketsFrom_i = 0;
1037  for (size_t j = 0; j < getLengthsFrom()[i]; ++j) {
1038  totalPacketsFrom_i += numImportPacketsPerLID[curLIDoffset + j];
1039  }
1040  curLIDoffset += getLengthsFrom()[i];
1041  importStarts[i] = offset;
1042  offset += totalPacketsFrom_i;
1043  importLengths[i] = totalPacketsFrom_i;
1044  }
1045  return std::make_pair(importStarts, importLengths);
1046  }
1047 
1048 
1049  DistributorPlan::SubViewLimits DistributorPlan::getExportViewLimits(size_t numPackets) const {
1050  if (getIndicesTo().is_null()) {
1051 
1052  const size_t actualNumSends = getNumSends() + (hasSelfMessage() ? 1 : 0);
1053  IndexView exportStarts(actualNumSends);
1054  IndexView exportLengths(actualNumSends);
1055  for (size_t pp = 0; pp < actualNumSends; ++pp) {
1056  exportStarts[pp] = getStartsTo()[pp] * numPackets;
1057  exportLengths[pp] = getLengthsTo()[pp] * numPackets;
1058  }
1059  return std::make_pair(exportStarts, exportLengths);
1060  } else {
1061  const size_t numIndices = getIndicesTo().size();
1062  IndexView exportStarts(numIndices);
1063  IndexView exportLengths(numIndices);
1064  for (size_t j = 0; j < numIndices; ++j) {
1065  exportStarts[j] = getIndicesTo()[j]*numPackets;
1066  exportLengths[j] = numPackets;
1067  }
1068  return std::make_pair(exportStarts, exportLengths);
1069  }
1070  }
1071 
1072  DistributorPlan::SubViewLimits DistributorPlan::getExportViewLimits(const Teuchos::ArrayView<const size_t> &numExportPacketsPerLID) const {
1073  if (getIndicesTo().is_null()) {
1074  const size_t actualNumSends = getNumSends() + (hasSelfMessage() ? 1 : 0);
1075  IndexView exportStarts(actualNumSends);
1076  IndexView exportLengths(actualNumSends);
1077  size_t offset = 0;
1078  for (size_t pp = 0; pp < actualNumSends; ++pp) {
1079  size_t numPackets = 0;
1080  for (size_t j = getStartsTo()[pp];
1081  j < getStartsTo()[pp] + getLengthsTo()[pp]; ++j) {
1082  numPackets += numExportPacketsPerLID[j];
1083  }
1084  exportStarts[pp] = offset;
1085  offset += numPackets;
1086  exportLengths[pp] = numPackets;
1087  }
1088  return std::make_pair(exportStarts, exportLengths);
1089  } else {
1090  const size_t numIndices = getIndicesTo().size();
1091  IndexView exportStarts(numIndices);
1092  IndexView exportLengths(numIndices);
1093  size_t offset = 0;
1094  for (size_t j = 0; j < numIndices; ++j) {
1095  exportStarts[j] = offset;
1096  offset += numExportPacketsPerLID[j];
1097  exportLengths[j] = numExportPacketsPerLID[j];
1098  }
1099  return std::make_pair(exportStarts, exportLengths);
1100  }
1101  }
1102 
1103 }
1104 }
EDistributorHowInitialized
Enum indicating how and whether a Distributor was initialized.
static bool debug()
Whether Tpetra is in debug mode.
const std::string & validSendTypeOrThrow(const std::string &s)
Valid enum values of distributor send types.
std::string DistributorSendTypeEnumToString(EDistributorSendType sendType)
Convert an EDistributorSendType enum value to a string.
#define TPETRA_EFFICIENCY_WARNING(throw_exception_test, msg)
Print or throw an efficency warning.
EDistributorSendType DistributorSendTypeStringToEnum(const std::string_view s)
Convert a string to an EDistributorSendType. Throw on error.
Teuchos::Array< EDistributorSendType > distributorSendTypeEnums()
Valid enum values of distributor send types.
void sort2(const IT1 &first1, const IT1 &last1, const IT2 &first2, const bool stableSort=false)
Sort the first array, and apply the resulting permutation to the second array.
std::string DistributorHowInitializedEnumToString(EDistributorHowInitialized how)
Convert an EDistributorHowInitialized enum value to a string.
Stand-alone utility functions and macros.
Teuchos::Array< std::string > distributorSendTypes()
Valid string values for Distributor&#39;s &quot;Send type&quot; parameter.
#define SHARED_TEST_FOR_EXCEPTION(throw_exception_test, Exception, msg, comm)
Test for exception, with reduction over the given communicator.
EDistributorSendType
The type of MPI send that Distributor should use.
Declaration of Tpetra::Details::Behavior, a class that describes Tpetra&#39;s behavior.