Tpetra parallel linear algebra  Version of the Day
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
Tpetra_Details_DistributorPlan.cpp
1 // @HEADER
2 // *****************************************************************************
3 // Tpetra: Templated Linear Algebra Services Package
4 //
5 // Copyright 2008 NTESS and the Tpetra contributors.
6 // SPDX-License-Identifier: BSD-3-Clause
7 // *****************************************************************************
8 // @HEADER
9 
11 
12 #include "Teuchos_StandardParameterEntryValidators.hpp"
13 #include "Tpetra_Util.hpp"
15 #include <numeric>
16 
17 namespace Tpetra {
18 namespace Details {
19 
20 std::string
22 {
23  if (sendType == DISTRIBUTOR_ISEND) {
24  return "Isend";
25  }
26  else if (sendType == DISTRIBUTOR_SEND) {
27  return "Send";
28  }
29  else if (sendType == DISTRIBUTOR_ALLTOALL) {
30  return "Alltoall";
31  }
32 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
33  else if (sendType == DISTRIBUTOR_MPIADVANCE_ALLTOALL) {
34  return "MpiAdvanceAlltoall";
35  }
36  else if (sendType == DISTRIBUTOR_MPIADVANCE_NBRALLTOALLV) {
37  return "MpiAdvanceNbralltoallv";
38  }
39 #endif
40  else {
41  TEUCHOS_TEST_FOR_EXCEPTION(true, std::invalid_argument, "Invalid "
42  "EDistributorSendType enum value " << sendType << ".");
43  }
44 }
45 
47 DistributorSendTypeStringToEnum (const std::string_view s)
48 {
49  if (s == "Isend") return DISTRIBUTOR_ISEND;
50  if (s == "Send") return DISTRIBUTOR_SEND;
51  if (s == "Alltoall") return DISTRIBUTOR_ALLTOALL;
52 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
53  if (s == "MpiAdvanceAlltoall") return DISTRIBUTOR_MPIADVANCE_ALLTOALL;
54  if (s == "MpiAdvanceNbralltoallv") return DISTRIBUTOR_MPIADVANCE_NBRALLTOALLV;
55 #endif
56  TEUCHOS_TEST_FOR_EXCEPTION(true, std::invalid_argument, "Invalid string to convert to EDistributorSendType enum value: " << s);
57 }
58 
60 const std::string &validSendTypeOrThrow(const std::string &s) {
61  const auto valids = distributorSendTypes();
62  if (std::find(valids.begin(), valids.end(), s) == valids.end()) {
63  TEUCHOS_TEST_FOR_EXCEPTION(true, std::invalid_argument, "Invalid string for EDistributorSendType enum value: " << s);
64  }
65  return s;
66 }
67 
68 std::string
70 {
71  switch (how) {
72  case Details::DISTRIBUTOR_NOT_INITIALIZED:
73  return "Not initialized yet";
74  case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS:
75  return "By createFromSends";
76  case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_RECVS:
77  return "By createFromRecvs";
78  case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS_N_RECVS:
79  return "By createFromSendsAndRecvs";
80  case Details::DISTRIBUTOR_INITIALIZED_BY_REVERSE:
81  return "By createReverseDistributor";
82  case Details::DISTRIBUTOR_INITIALIZED_BY_COPY:
83  return "By copy constructor";
84  default:
85  return "INVALID";
86  }
87 }
88 
89 DistributorPlan::DistributorPlan(Teuchos::RCP<const Teuchos::Comm<int>> comm)
90  : comm_(comm),
91 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
92  mpixComm_(Teuchos::null),
93 #endif
94  howInitialized_(DISTRIBUTOR_NOT_INITIALIZED),
95  reversePlan_(Teuchos::null),
96  sendType_(DistributorSendTypeStringToEnum(Behavior::defaultSendType())),
97  sendMessageToSelf_(false),
98  numSendsToOtherProcs_(0),
99  maxSendLength_(0),
100  numReceives_(0),
101  totalReceiveLength_(0)
102 { }
103 
104 DistributorPlan::DistributorPlan(const DistributorPlan& otherPlan)
105  : comm_(otherPlan.comm_),
106 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
107  mpixComm_(otherPlan.mpixComm_),
108 #endif
109  howInitialized_(DISTRIBUTOR_INITIALIZED_BY_COPY),
110  reversePlan_(otherPlan.reversePlan_),
111  sendType_(otherPlan.sendType_),
112  sendMessageToSelf_(otherPlan.sendMessageToSelf_),
113  numSendsToOtherProcs_(otherPlan.numSendsToOtherProcs_),
114  procIdsToSendTo_(otherPlan.procIdsToSendTo_),
115  startsTo_(otherPlan.startsTo_),
116  lengthsTo_(otherPlan.lengthsTo_),
117  maxSendLength_(otherPlan.maxSendLength_),
118  indicesTo_(otherPlan.indicesTo_),
119  numReceives_(otherPlan.numReceives_),
120  totalReceiveLength_(otherPlan.totalReceiveLength_),
121  lengthsFrom_(otherPlan.lengthsFrom_),
122  procsFrom_(otherPlan.procsFrom_),
123  startsFrom_(otherPlan.startsFrom_),
124  indicesFrom_(otherPlan.indicesFrom_)
125 { }
126 
127 size_t DistributorPlan::createFromSends(const Teuchos::ArrayView<const int>& exportProcIDs) {
128  using Teuchos::outArg;
129  using Teuchos::REDUCE_MAX;
130  using Teuchos::reduceAll;
131  using std::endl;
132  const char rawPrefix[] = "Tpetra::DistributorPlan::createFromSends";
133 
134  const size_t numExports = exportProcIDs.size();
135  const int myProcID = comm_->getRank();
136  const int numProcs = comm_->getSize();
137  const bool debug = Details::Behavior::debug("Distributor");
138 
139  // exportProcIDs tells us the communication pattern for this
140  // distributor. It dictates the way that the export data will be
141  // interpreted in doPosts(). We want to perform at most one
142  // send per process in doPosts; this is for two reasons:
143  // * minimize latency / overhead in the comm routines (nice)
144  // * match the number of receives and sends between processes
145  // (necessary)
146  //
147  // Teuchos::Comm requires that the data for a send are contiguous
148  // in a send buffer. Therefore, if the data in the send buffer
149  // for doPosts() are not contiguous, they will need to be copied
150  // into a contiguous buffer. The user has specified this
151  // noncontiguous pattern and we can't do anything about it.
152  // However, if they do not provide an efficient pattern, we will
153  // warn them if one of the following compile-time options has been
154  // set:
155  // * HAVE_TPETRA_PRINT_EFFICIENCY_WARNINGS
156  //
157  // If the data are contiguous, then we can post the sends in situ
158  // (i.e., without needing to copy them into a send buffer).
159  //
160  // Determine contiguity. There are a number of ways to do this:
161  // * If the export IDs are sorted, then all exports to a
162  // particular proc must be contiguous. This is what Epetra does.
163  // * If the export ID of the current export already has been
164  // listed, then the previous listing should correspond to the
165  // same export. This tests contiguity, but not sortedness.
166  //
167  // Both of these tests require O(n), where n is the number of
168  // exports. However, the latter will positively identify a greater
169  // portion of contiguous patterns. We use the latter method.
170  //
171  // Check to see if values are grouped by procs without gaps
172  // If so, indices_to -> 0.
173 
174  if (debug) {
175  // Test whether any process in the communicator got an invalid
176  // process ID. If badID != -1 on this process, then it equals
177  // this process' rank. The max of all badID over all processes
178  // is the max rank which has an invalid process ID.
179  int badID = -1;
180  for (size_t i = 0; i < numExports; ++i) {
181  const int exportID = exportProcIDs[i];
182  if (exportID >= numProcs || exportID < 0) {
183  badID = myProcID;
184  break;
185  }
186  }
187  int gbl_badID;
188  reduceAll<int, int> (*comm_, REDUCE_MAX, badID, outArg (gbl_badID));
189  TEUCHOS_TEST_FOR_EXCEPTION
190  (gbl_badID >= 0, std::runtime_error, rawPrefix << "Proc "
191  << gbl_badID << ", perhaps among other processes, got a bad "
192  "send process ID.");
193  }
194 
195  // Set up data structures for quick traversal of arrays.
196  // This contains the number of sends for each process ID.
197  //
198  // FIXME (mfh 20 Mar 2014) This is one of a few places in Tpetra
199  // that create an array of length the number of processes in the
200  // communicator (plus one). Given how this code uses this array,
201  // it should be straightforward to replace it with a hash table or
202  // some other more space-efficient data structure. In practice,
203  // most of the entries of starts should be zero for a sufficiently
204  // large process count, unless the communication pattern is dense.
205  // Note that it's important to be able to iterate through keys (i
206  // for which starts[i] is nonzero) in increasing order.
207  Teuchos::Array<size_t> starts (numProcs + 1, 0);
208 
209  // numActive is the number of sends that are not Null
210  size_t numActive = 0;
211  int needSendBuff = 0; // Boolean
212 
213  for (size_t i = 0; i < numExports; ++i) {
214  const int exportID = exportProcIDs[i];
215  if (exportID >= 0) {
216  // exportID is a valid process ID. Increment the number of
217  // messages this process will send to that process.
218  ++starts[exportID];
219 
220  // If we're sending more than one message to process exportID,
221  // then it is possible that the data are not contiguous.
222  // Check by seeing if the previous process ID in the list
223  // (exportProcIDs[i-1]) is the same. It's safe to use i-1,
224  // because if starts[exportID] > 1, then i must be > 1 (since
225  // the starts array was filled with zeros initially).
226 
227  // null entries break continuity.
228  // e.g., [ 0, 0, 0, 1, -99, 1, 2, 2, 2] is not contiguous
229  if (needSendBuff == 0 && starts[exportID] > 1 &&
230  exportID != exportProcIDs[i-1]) {
231  needSendBuff = 1;
232  }
233  ++numActive;
234  }
235  }
236 
237 #if defined(HAVE_TPETRA_PRINT_EFFICIENCY_WARNINGS)
238  {
239  int global_needSendBuff;
240  reduceAll<int, int> (*comm_, REDUCE_MAX, needSendBuff,
241  outArg (global_needSendBuff));
243  global_needSendBuff != 0,
244  "::createFromSends: Grouping export IDs together by process rank often "
245  "improves performance.");
246  }
247 #endif
248 
249  // Determine from the caller's data whether or not the current
250  // process should send (a) message(s) to itself.
251  if (starts[myProcID] != 0) {
252  sendMessageToSelf_ = true;
253  }
254  else {
255  sendMessageToSelf_ = false;
256  }
257 
258  if (! needSendBuff) {
259  // grouped by proc, no send buffer or indicesTo_ needed
260  numSendsToOtherProcs_ = 0;
261  // Count total number of sends, i.e., total number of procs to
262  // which we are sending. This includes myself, if applicable.
263  for (int i = 0; i < numProcs; ++i) {
264  if (starts[i]) {
265  ++numSendsToOtherProcs_;
266  }
267  }
268 
269  // Not only do we not need these, but we must clear them, as
270  // empty status of indicesTo is a flag used later.
271  indicesTo_.resize(0);
272  // Size these to numSendsToOtherProcs_; note, at the moment, numSendsToOtherProcs_
273  // includes self sends. Set their values to zeros.
274  procIdsToSendTo_.assign(numSendsToOtherProcs_,0);
275  startsTo_.assign(numSendsToOtherProcs_,0);
276  lengthsTo_.assign(numSendsToOtherProcs_,0);
277 
278  // set startsTo to the offset for each send (i.e., each proc ID)
279  // set procsTo to the proc ID for each send
280  // in interpreting this code, remember that we are assuming contiguity
281  // that is why index skips through the ranks
282  {
283  size_t procIndex = 0;
284  for (size_t i = 0; i < numSendsToOtherProcs_; ++i) {
285  while (exportProcIDs[procIndex] < 0) {
286  ++procIndex; // skip all negative proc IDs
287  }
288  startsTo_[i] = procIndex;
289  int procID = exportProcIDs[procIndex];
290  procIdsToSendTo_[i] = procID;
291  procIndex += starts[procID];
292  }
293  }
294  // sort the startsTo and proc IDs together, in ascending order, according
295  // to proc IDs
296  if (numSendsToOtherProcs_ > 0) {
297  sort2(procIdsToSendTo_.begin(), procIdsToSendTo_.end(), startsTo_.begin());
298  }
299  // compute the maximum send length
300  maxSendLength_ = 0;
301  for (size_t i = 0; i < numSendsToOtherProcs_; ++i) {
302  int procID = procIdsToSendTo_[i];
303  lengthsTo_[i] = starts[procID];
304  if ((procID != myProcID) && (lengthsTo_[i] > maxSendLength_)) {
305  maxSendLength_ = lengthsTo_[i];
306  }
307  }
308  }
309  else {
310  // not grouped by proc, need send buffer and indicesTo_
311 
312  // starts[i] is the number of sends to proc i
313  // numActive equals number of sends total, \sum_i starts[i]
314 
315  // this loop starts at starts[1], so explicitly check starts[0]
316  if (starts[0] == 0 ) {
317  numSendsToOtherProcs_ = 0;
318  }
319  else {
320  numSendsToOtherProcs_ = 1;
321  }
322  for (Teuchos::Array<size_t>::iterator i=starts.begin()+1,
323  im1=starts.begin();
324  i != starts.end(); ++i)
325  {
326  if (*i != 0) ++numSendsToOtherProcs_;
327  *i += *im1;
328  im1 = i;
329  }
330  // starts[i] now contains the number of exports to procs 0 through i
331 
332  for (Teuchos::Array<size_t>::reverse_iterator ip1=starts.rbegin(),
333  i=starts.rbegin()+1;
334  i != starts.rend(); ++i)
335  {
336  *ip1 = *i;
337  ip1 = i;
338  }
339  starts[0] = 0;
340  // starts[i] now contains the number of exports to procs 0 through
341  // i-1, i.e., all procs before proc i
342 
343  indicesTo_.resize(numActive);
344 
345  for (size_t i = 0; i < numExports; ++i) {
346  if (exportProcIDs[i] >= 0) {
347  // record the offset to the sendBuffer for this export
348  indicesTo_[starts[exportProcIDs[i]]] = i;
349  // now increment the offset for this proc
350  ++starts[exportProcIDs[i]];
351  }
352  }
353  // our send buffer will contain the export data for each of the procs
354  // we communicate with, in order by proc id
355  // sendBuffer = {proc_0_data, proc_1_data, ..., proc_np-1_data}
356  // indicesTo now maps each export to the location in our send buffer
357  // associated with the export
358  // data for export i located at sendBuffer[indicesTo[i]]
359  //
360  // starts[i] once again contains the number of exports to
361  // procs 0 through i
362  for (int proc = numProcs-1; proc != 0; --proc) {
363  starts[proc] = starts[proc-1];
364  }
365  starts.front() = 0;
366  starts[numProcs] = numActive;
367  //
368  // starts[proc] once again contains the number of exports to
369  // procs 0 through proc-1
370  // i.e., the start of my data in the sendBuffer
371 
372  // this contains invalid data at procs we don't care about, that is okay
373  procIdsToSendTo_.resize(numSendsToOtherProcs_);
374  startsTo_.resize(numSendsToOtherProcs_);
375  lengthsTo_.resize(numSendsToOtherProcs_);
376 
377  // for each group of sends/exports, record the destination proc,
378  // the length, and the offset for this send into the
379  // send buffer (startsTo_)
380  maxSendLength_ = 0;
381  size_t snd = 0;
382  for (int proc = 0; proc < numProcs; ++proc ) {
383  if (starts[proc+1] != starts[proc]) {
384  lengthsTo_[snd] = starts[proc+1] - starts[proc];
385  startsTo_[snd] = starts[proc];
386  // record max length for all off-proc sends
387  if ((proc != myProcID) && (lengthsTo_[snd] > maxSendLength_)) {
388  maxSendLength_ = lengthsTo_[snd];
389  }
390  procIdsToSendTo_[snd] = proc;
391  ++snd;
392  }
393  }
394  }
395 
396  if (sendMessageToSelf_) {
397  --numSendsToOtherProcs_;
398  }
399 
400  // Invert map to see what msgs are received and what length
401  computeReceives();
402 
403  // createFromRecvs() calls createFromSends(), but will set
404  // howInitialized_ again after calling createFromSends().
405  howInitialized_ = Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS;
406 
407  return totalReceiveLength_;
408 }
409 
410 void DistributorPlan::createFromRecvs(const Teuchos::ArrayView<const int>& remoteProcIDs)
411 {
412  *this = *getReversePlan();
413  howInitialized_ = Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_RECVS;
414 }
415 
416 void DistributorPlan::createFromSendsAndRecvs(const Teuchos::ArrayView<const int>& exportProcIDs,
417  const Teuchos::ArrayView<const int>& remoteProcIDs)
418 {
419  // note the exportProcIDs and remoteProcIDs _must_ be a list that has
420  // an entry for each GID. If the export/remoteProcIDs is taken from
421  // the getProcs{From|To} lists that are extracted from a previous distributor,
422  // it will generate a wrong answer, because those lists have a unique entry
423  // for each processor id. A version of this with lengthsTo and lengthsFrom
424  // should be made.
425 
426  howInitialized_ = Tpetra::Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS_N_RECVS;
427 
428  int myProcID = comm_->getRank ();
429  int numProcs = comm_->getSize();
430 
431  const size_t numExportIDs = exportProcIDs.size();
432  Teuchos::Array<size_t> starts (numProcs + 1, 0);
433 
434  size_t numActive = 0;
435  int needSendBuff = 0; // Boolean
436 
437  for(size_t i = 0; i < numExportIDs; i++ )
438  {
439  if( needSendBuff==0 && i && (exportProcIDs[i] < exportProcIDs[i-1]) )
440  needSendBuff = 1;
441  if( exportProcIDs[i] >= 0 )
442  {
443  ++starts[ exportProcIDs[i] ];
444  ++numActive;
445  }
446  }
447 
448  sendMessageToSelf_ = ( starts[myProcID] != 0 ) ? 1 : 0;
449 
450  numSendsToOtherProcs_ = 0;
451 
452  if( needSendBuff ) //grouped by processor, no send buffer or indicesTo_ needed
453  {
454  if (starts[0] == 0 ) {
455  numSendsToOtherProcs_ = 0;
456  }
457  else {
458  numSendsToOtherProcs_ = 1;
459  }
460  for (Teuchos::Array<size_t>::iterator i=starts.begin()+1,
461  im1=starts.begin();
462  i != starts.end(); ++i)
463  {
464  if (*i != 0) ++numSendsToOtherProcs_;
465  *i += *im1;
466  im1 = i;
467  }
468  // starts[i] now contains the number of exports to procs 0 through i
469 
470  for (Teuchos::Array<size_t>::reverse_iterator ip1=starts.rbegin(),
471  i=starts.rbegin()+1;
472  i != starts.rend(); ++i)
473  {
474  *ip1 = *i;
475  ip1 = i;
476  }
477  starts[0] = 0;
478  // starts[i] now contains the number of exports to procs 0 through
479  // i-1, i.e., all procs before proc i
480 
481  indicesTo_.resize(numActive);
482 
483  for (size_t i = 0; i < numExportIDs; ++i) {
484  if (exportProcIDs[i] >= 0) {
485  // record the offset to the sendBuffer for this export
486  indicesTo_[starts[exportProcIDs[i]]] = i;
487  // now increment the offset for this proc
488  ++starts[exportProcIDs[i]];
489  }
490  }
491  for (int proc = numProcs-1; proc != 0; --proc) {
492  starts[proc] = starts[proc-1];
493  }
494  starts.front() = 0;
495  starts[numProcs] = numActive;
496  procIdsToSendTo_.resize(numSendsToOtherProcs_);
497  startsTo_.resize(numSendsToOtherProcs_);
498  lengthsTo_.resize(numSendsToOtherProcs_);
499  maxSendLength_ = 0;
500  size_t snd = 0;
501  for (int proc = 0; proc < numProcs; ++proc ) {
502  if (starts[proc+1] != starts[proc]) {
503  lengthsTo_[snd] = starts[proc+1] - starts[proc];
504  startsTo_[snd] = starts[proc];
505  // record max length for all off-proc sends
506  if ((proc != myProcID) && (lengthsTo_[snd] > maxSendLength_)) {
507  maxSendLength_ = lengthsTo_[snd];
508  }
509  procIdsToSendTo_[snd] = proc;
510  ++snd;
511  }
512  }
513  }
514  else {
515  // grouped by proc, no send buffer or indicesTo_ needed
516  numSendsToOtherProcs_ = 0;
517  // Count total number of sends, i.e., total number of procs to
518  // which we are sending. This includes myself, if applicable.
519  for (int i = 0; i < numProcs; ++i) {
520  if (starts[i]) {
521  ++numSendsToOtherProcs_;
522  }
523  }
524 
525  // Not only do we not need these, but we must clear them, as
526  // empty status of indicesTo is a flag used later.
527  indicesTo_.resize(0);
528  // Size these to numSendsToOtherProcs_; note, at the moment, numSendsToOtherProcs_
529  // includes self sends. Set their values to zeros.
530  procIdsToSendTo_.assign(numSendsToOtherProcs_,0);
531  startsTo_.assign(numSendsToOtherProcs_,0);
532  lengthsTo_.assign(numSendsToOtherProcs_,0);
533 
534  // set startsTo to the offset for each send (i.e., each proc ID)
535  // set procsTo to the proc ID for each send
536  // in interpreting this code, remember that we are assuming contiguity
537  // that is why index skips through the ranks
538  {
539  size_t procIndex = 0;
540  for (size_t i = 0; i < numSendsToOtherProcs_; ++i) {
541  while (exportProcIDs[procIndex] < 0) {
542  ++procIndex; // skip all negative proc IDs
543  }
544  startsTo_[i] = procIndex;
545  int procID = exportProcIDs[procIndex];
546  procIdsToSendTo_[i] = procID;
547  procIndex += starts[procID];
548  }
549  }
550  // sort the startsTo and proc IDs together, in ascending order, according
551  // to proc IDs
552  if (numSendsToOtherProcs_ > 0) {
553  sort2(procIdsToSendTo_.begin(), procIdsToSendTo_.end(), startsTo_.begin());
554  }
555  // compute the maximum send length
556  maxSendLength_ = 0;
557  for (size_t i = 0; i < numSendsToOtherProcs_; ++i) {
558  int procID = procIdsToSendTo_[i];
559  lengthsTo_[i] = starts[procID];
560  if ((procID != myProcID) && (lengthsTo_[i] > maxSendLength_)) {
561  maxSendLength_ = lengthsTo_[i];
562  }
563  }
564  }
565 
566 
567  numSendsToOtherProcs_ -= sendMessageToSelf_;
568  std::vector<int> recv_list;
569  recv_list.reserve(numSendsToOtherProcs_); //reserve an initial guess for size needed
570 
571  int last_pid=-2;
572  for(int i=0; i<remoteProcIDs.size(); i++) {
573  if(remoteProcIDs[i]>last_pid) {
574  recv_list.push_back(remoteProcIDs[i]);
575  last_pid = remoteProcIDs[i];
576  }
577  else if (remoteProcIDs[i]<last_pid)
578  throw std::runtime_error("Tpetra::Distributor:::createFromSendsAndRecvs expected RemotePIDs to be in sorted order");
579  }
580  numReceives_ = recv_list.size();
581  if(numReceives_) {
582  procsFrom_.assign(numReceives_,0);
583  lengthsFrom_.assign(numReceives_,0);
584  indicesFrom_.assign(numReceives_,0);
585  startsFrom_.assign(numReceives_,0);
586  }
587  for(size_t i=0,j=0; i<numReceives_; ++i) {
588  int jlast=j;
589  procsFrom_[i] = recv_list[i];
590  startsFrom_[i] = j;
591  for( ; j<(size_t)remoteProcIDs.size() &&
592  remoteProcIDs[jlast]==remoteProcIDs[j] ; j++){;}
593  lengthsFrom_[i] = j-jlast;
594  }
595  totalReceiveLength_ = remoteProcIDs.size();
596  indicesFrom_.clear ();
597  numReceives_-=sendMessageToSelf_;
598 }
599 
600 Teuchos::RCP<DistributorPlan> DistributorPlan::getReversePlan() const {
601  if (reversePlan_.is_null()) createReversePlan();
602  return reversePlan_;
603 }
604 
605 void DistributorPlan::createReversePlan() const
606 {
607  reversePlan_ = Teuchos::rcp(new DistributorPlan(comm_));
608  reversePlan_->howInitialized_ = Details::DISTRIBUTOR_INITIALIZED_BY_REVERSE;
609  reversePlan_->sendType_ = sendType_;
610 
611  // The total length of all the sends of this DistributorPlan. We
612  // calculate it because it's the total length of all the receives
613  // of the reverse DistributorPlan.
614  size_t totalSendLength =
615  std::accumulate(lengthsTo_.begin(), lengthsTo_.end(), 0);
616 
617  // The maximum length of any of the receives of this DistributorPlan.
618  // We calculate it because it's the maximum length of any of the
619  // sends of the reverse DistributorPlan.
620  size_t maxReceiveLength = 0;
621  const int myProcID = comm_->getRank();
622  for (size_t i=0; i < numReceives_; ++i) {
623  if (procsFrom_[i] != myProcID) {
624  // Don't count receives for messages sent by myself to myself.
625  if (lengthsFrom_[i] > maxReceiveLength) {
626  maxReceiveLength = lengthsFrom_[i];
627  }
628  }
629  }
630 
631  reversePlan_->sendMessageToSelf_ = sendMessageToSelf_;
632  reversePlan_->numSendsToOtherProcs_ = numReceives_;
633  reversePlan_->procIdsToSendTo_ = procsFrom_;
634  reversePlan_->startsTo_ = startsFrom_;
635  reversePlan_->lengthsTo_ = lengthsFrom_;
636  reversePlan_->maxSendLength_ = maxReceiveLength;
637  reversePlan_->indicesTo_ = indicesFrom_;
638  reversePlan_->numReceives_ = numSendsToOtherProcs_;
639  reversePlan_->totalReceiveLength_ = totalSendLength;
640  reversePlan_->lengthsFrom_ = lengthsTo_;
641  reversePlan_->procsFrom_ = procIdsToSendTo_;
642  reversePlan_->startsFrom_ = startsTo_;
643  reversePlan_->indicesFrom_ = indicesTo_;
644 
645 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
646  // is there a smarter way to do this
647  reversePlan_->initializeMpiAdvance();
648 #endif
649 }
650 
651 void DistributorPlan::computeReceives()
652 {
653  using Teuchos::Array;
654  using Teuchos::ArrayRCP;
655  using Teuchos::as;
656  using Teuchos::CommStatus;
657  using Teuchos::CommRequest;
658  using Teuchos::ireceive;
659  using Teuchos::RCP;
660  using Teuchos::rcp;
661  using Teuchos::REDUCE_SUM;
662  using Teuchos::receive;
663  using Teuchos::reduce;
664  using Teuchos::scatter;
665  using Teuchos::send;
666  using Teuchos::waitAll;
667 
668  const int myRank = comm_->getRank();
669  const int numProcs = comm_->getSize();
670 
671  const int mpiTag = DEFAULT_MPI_TAG;
672 
673  // toProcsFromMe[i] == the number of messages sent by this process
674  // to process i. The data in numSendsToOtherProcs_, procIdsToSendTo_, and lengthsTo_
675  // concern the contiguous sends. Therefore, each process will be
676  // listed in procIdsToSendTo_ at most once, and so toProcsFromMe[i] will
677  // either be 0 or 1.
678  {
679  Array<int> toProcsFromMe (numProcs, 0);
680 #ifdef HAVE_TPETRA_DEBUG
681  bool counting_error = false;
682 #endif // HAVE_TPETRA_DEBUG
683  for (size_t i = 0; i < (numSendsToOtherProcs_ + (sendMessageToSelf_ ? 1 : 0)); ++i) {
684 #ifdef HAVE_TPETRA_DEBUG
685  if (toProcsFromMe[procIdsToSendTo_[i]] != 0) {
686  counting_error = true;
687  }
688 #endif // HAVE_TPETRA_DEBUG
689  toProcsFromMe[procIdsToSendTo_[i]] = 1;
690  }
691 #ifdef HAVE_TPETRA_DEBUG
692  // Note that SHARED_TEST_FOR_EXCEPTION does a global reduction
693  SHARED_TEST_FOR_EXCEPTION(counting_error, std::logic_error,
694  "Tpetra::Distributor::computeReceives: There was an error on at least "
695  "one process in counting the number of messages send by that process to "
696  "the other processs. Please report this bug to the Tpetra developers.",
697  *comm_);
698 #endif // HAVE_TPETRA_DEBUG
699 
700  // Compute the number of receives that this process needs to
701  // post. The number of receives includes any self sends (i.e.,
702  // messages sent by this process to itself).
703  //
704  // (We will use numReceives_ this below to post exactly that
705  // number of receives, with MPI_ANY_SOURCE as the sending rank.
706  // This will tell us from which processes this process expects
707  // to receive, and how many packets of data we expect to receive
708  // from each process.)
709  //
710  // toProcsFromMe[i] is the number of messages sent by this
711  // process to process i. Compute the sum (elementwise) of all
712  // the toProcsFromMe arrays on all processes in the
713  // communicator. If the array x is that sum, then if this
714  // process has rank j, x[j] is the number of messages sent
715  // to process j, that is, the number of receives on process j
716  // (including any messages sent by process j to itself).
717  //
718  // Yes, this requires storing and operating on an array of
719  // length P, where P is the number of processes in the
720  // communicator. Epetra does this too. Avoiding this O(P)
721  // memory bottleneck would require some research.
722  //
723  // mfh 09 Jan 2012, 15 Jul 2015: There are three ways to
724  // implement this O(P) memory algorithm.
725  //
726  // 1. Use MPI_Reduce and MPI_Scatter: reduce on the root
727  // process (0) from toProcsFromMe, to numRecvsOnEachProc.
728  // Then, scatter the latter, so that each process p gets
729  // numRecvsOnEachProc[p].
730  //
731  // 2. Like #1, but use MPI_Reduce_scatter instead of
732  // MPI_Reduce and MPI_Scatter. MPI_Reduce_scatter might be
733  // optimized to reduce the number of messages, but
734  // MPI_Reduce_scatter is more general than we need (it
735  // allows the equivalent of MPI_Scatterv). See Bug 6336.
736  //
737  // 3. Do an all-reduce on toProcsFromMe, and let my process
738  // (with rank myRank) get numReceives_ from
739  // toProcsFromMe[myRank]. The HPCCG miniapp uses the
740  // all-reduce method.
741  //
742  // Approaches 1 and 3 have the same critical path length.
743  // However, #3 moves more data. This is because the final
744  // result is just one integer, but #3 moves a whole array of
745  // results to all the processes. This is why we use Approach 1
746  // here.
747  //
748  // mfh 12 Apr 2013: See discussion in createFromSends() about
749  // how we could use this communication to propagate an error
750  // flag for "free" in a release build.
751 
752  const int root = 0; // rank of root process of the reduction
753  Array<int> numRecvsOnEachProc; // temp; only needed on root
754  if (myRank == root) {
755  numRecvsOnEachProc.resize (numProcs);
756  }
757  int numReceivesAsInt = 0; // output
758  reduce<int, int> (toProcsFromMe.getRawPtr (),
759  numRecvsOnEachProc.getRawPtr (),
760  numProcs, REDUCE_SUM, root, *comm_);
761  scatter<int, int> (numRecvsOnEachProc.getRawPtr (), 1,
762  &numReceivesAsInt, 1, root, *comm_);
763  numReceives_ = static_cast<size_t> (numReceivesAsInt);
764  }
765 
766  // Now we know numReceives_, which is this process' number of
767  // receives. Allocate the lengthsFrom_ and procsFrom_ arrays
768  // with this number of entries.
769  lengthsFrom_.assign (numReceives_, 0);
770  procsFrom_.assign (numReceives_, 0);
771 
772  //
773  // Ask (via nonblocking receive) each process from which we are
774  // receiving how many packets we should expect from it in the
775  // communication pattern.
776  //
777 
778  // At this point, numReceives_ includes any self message that
779  // there may be. At the end of this routine, we'll subtract off
780  // the self message (if there is one) from numReceives_. In this
781  // routine, we don't need to receive a message from ourselves in
782  // order to figure out our lengthsFrom_ and source process ID; we
783  // can just ask ourselves directly. Thus, the actual number of
784  // nonblocking receives we post here does not include the self
785  // message.
786  const size_t actualNumReceives = numReceives_ - (sendMessageToSelf_ ? 1 : 0);
787 
788  // Teuchos' wrapper for nonblocking receives requires receive
789  // buffers that it knows won't go away. This is why we use RCPs,
790  // one RCP per nonblocking receive request. They get allocated in
791  // the loop below.
792  Array<RCP<CommRequest<int> > > requests (actualNumReceives);
793  Array<ArrayRCP<size_t> > lengthsFromBuffers (actualNumReceives);
794  Array<RCP<CommStatus<int> > > statuses (actualNumReceives);
795 
796  // Teuchos::Comm treats a negative process ID as MPI_ANY_SOURCE
797  // (receive data from any process).
798 #ifdef HAVE_MPI
799  const int anySourceProc = MPI_ANY_SOURCE;
800 #else
801  const int anySourceProc = -1;
802 #endif
803 
804  // Post the (nonblocking) receives.
805  for (size_t i = 0; i < actualNumReceives; ++i) {
806  // Once the receive completes, we can ask the corresponding
807  // CommStatus object (output by wait()) for the sending process'
808  // ID (which we'll assign to procsFrom_[i] -- don't forget to
809  // do that!).
810  lengthsFromBuffers[i].resize (1);
811  lengthsFromBuffers[i][0] = as<size_t> (0);
812  requests[i] = ireceive<int, size_t> (lengthsFromBuffers[i], anySourceProc,
813  mpiTag, *comm_);
814  }
815 
816  // Post the sends: Tell each process to which we are sending how
817  // many packets it should expect from us in the communication
818  // pattern. We could use nonblocking sends here, as long as we do
819  // a waitAll() on all the sends and receives at once.
820  //
821  // We assume that numSendsToOtherProcs_ and sendMessageToSelf_ have already been
822  // set. The value of numSendsToOtherProcs_ (my process' number of sends) does
823  // not include any message that it might send to itself.
824  for (size_t i = 0; i < numSendsToOtherProcs_ + (sendMessageToSelf_ ? 1 : 0); ++i) {
825  if (procIdsToSendTo_[i] != myRank) {
826  // Send a message to procIdsToSendTo_[i], telling that process that
827  // this communication pattern will send that process
828  // lengthsTo_[i] blocks of packets.
829  const size_t* const lengthsTo_i = &lengthsTo_[i];
830  send<int, size_t> (lengthsTo_i, 1, as<int> (procIdsToSendTo_[i]), mpiTag, *comm_);
831  }
832  else {
833  // We don't need a send in the self-message case. If this
834  // process will send a message to itself in the communication
835  // pattern, then the last element of lengthsFrom_ and
836  // procsFrom_ corresponds to the self-message. Of course
837  // this process knows how long the message is, and the process
838  // ID is its own process ID.
839  lengthsFrom_[numReceives_-1] = lengthsTo_[i];
840  procsFrom_[numReceives_-1] = myRank;
841  }
842  }
843 
844  //
845  // Wait on all the receives. When they arrive, check the status
846  // output of wait() for the receiving process ID, unpack the
847  // request buffers into lengthsFrom_, and set procsFrom_ from the
848  // status.
849  //
850  waitAll (*comm_, requests (), statuses ());
851  for (size_t i = 0; i < actualNumReceives; ++i) {
852  lengthsFrom_[i] = *lengthsFromBuffers[i];
853  procsFrom_[i] = statuses[i]->getSourceRank ();
854  }
855 
856  // Sort the procsFrom_ array, and apply the same permutation to
857  // lengthsFrom_. This ensures that procsFrom_[i] and
858  // lengthsFrom_[i] refers to the same thing.
859  sort2 (procsFrom_.begin(), procsFrom_.end(), lengthsFrom_.begin());
860 
861  // Compute indicesFrom_
862  totalReceiveLength_ =
863  std::accumulate (lengthsFrom_.begin (), lengthsFrom_.end (), 0);
864  indicesFrom_.clear ();
865 
866  startsFrom_.clear ();
867  startsFrom_.reserve (numReceives_);
868  for (size_t i = 0, j = 0; i < numReceives_; ++i) {
869  startsFrom_.push_back(j);
870  j += lengthsFrom_[i];
871  }
872 
873  if (sendMessageToSelf_) {
874  --numReceives_;
875  }
876 }
877 
878 void DistributorPlan::setParameterList(const Teuchos::RCP<Teuchos::ParameterList>& plist)
879 {
880  using Teuchos::FancyOStream;
881  using Teuchos::getIntegralValue;
882  using Teuchos::ParameterList;
883  using Teuchos::parameterList;
884  using Teuchos::RCP;
885  using std::endl;
886 
887  if (! plist.is_null()) {
888  RCP<const ParameterList> validParams = getValidParameters ();
889  plist->validateParametersAndSetDefaults (*validParams);
890 
891  const Details::EDistributorSendType sendType =
892  getIntegralValue<Details::EDistributorSendType> (*plist, "Send type");
893 
894  // Now that we've validated the input list, save the results.
895  sendType_ = sendType;
896 
897  #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
898  initializeMpiAdvance();
899  #endif
900 
901  // ParameterListAcceptor semantics require pointer identity of the
902  // sublist passed to setParameterList(), so we save the pointer.
903  this->setMyParamList (plist);
904  }
905 }
906 
907 Teuchos::Array<std::string> distributorSendTypes()
908 {
909  Teuchos::Array<std::string> sendTypes;
910  sendTypes.push_back ("Isend");
911  sendTypes.push_back ("Send");
912  sendTypes.push_back ("Alltoall");
913 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
914  sendTypes.push_back ("MpiAdvanceAlltoall");
915  sendTypes.push_back ("MpiAdvanceNbralltoallv");
916 #endif
917  return sendTypes;
918 }
919 
920 Teuchos::Array<EDistributorSendType> distributorSendTypeEnums() {
921  Teuchos::Array<EDistributorSendType> res;
922  res.push_back (DISTRIBUTOR_ISEND);
923  res.push_back (DISTRIBUTOR_SEND);
924  res.push_back (DISTRIBUTOR_ALLTOALL);
925 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
926  res.push_back (DISTRIBUTOR_MPIADVANCE_ALLTOALL);
927  res.push_back (DISTRIBUTOR_MPIADVANCE_NBRALLTOALLV);
928 #endif
929  return res;
930 }
931 
932 Teuchos::RCP<const Teuchos::ParameterList>
933 DistributorPlan::getValidParameters() const
934 {
935  using Teuchos::Array;
936  using Teuchos::ParameterList;
937  using Teuchos::parameterList;
938  using Teuchos::RCP;
939  using Teuchos::setStringToIntegralParameter;
940 
941  Array<std::string> sendTypes = distributorSendTypes ();
942  const Array<Details::EDistributorSendType> sendTypeEnums = distributorSendTypeEnums ();
943 
944  const std::string validatedSendType = validSendTypeOrThrow(Behavior::defaultSendType());
945 
946  RCP<ParameterList> plist = parameterList ("Tpetra::Distributor");
947 
948  setStringToIntegralParameter<Details::EDistributorSendType> ("Send type",
949  validatedSendType, "When using MPI, the variant of send to use in "
950  "do[Reverse]Posts()", sendTypes(), sendTypeEnums(), plist.getRawPtr());
951  plist->set ("Timer Label","","Label for Time Monitor output");
952 
953  return Teuchos::rcp_const_cast<const ParameterList> (plist);
954 }
955 
956 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
957 
958 // Used by Teuchos::RCP to clean up an owned MPIX_Comm*
959 struct MpixCommDeallocator {
960  void free(MPIX_Comm **comm) const {
961  MPIX_Comm_free(comm);
962  }
963 };
964 
965 void DistributorPlan::initializeMpiAdvance() {
966 
967  // assert the mpix communicator is null. if this is not the case we will figure out why
968  TEUCHOS_ASSERT(mpixComm_.is_null());
969 
970  // use the members to initialize the graph for neightborhood mode, or just the MPIX communicator for non-neighborhood mode
971  Teuchos::RCP<const Teuchos::MpiComm<int> > mpiComm = Teuchos::rcp_dynamic_cast<const Teuchos::MpiComm<int> >(comm_);
972  Teuchos::RCP<const Teuchos::OpaqueWrapper<MPI_Comm> > rawComm = mpiComm->getRawMpiComm();
973  int err = 0;
974  if (sendType_ == DISTRIBUTOR_MPIADVANCE_ALLTOALL ||
975  sendType_ == DISTRIBUTOR_MPIADVANCE_NBRALLTOALLV ) {
976  MPIX_Comm **mpixComm = new(MPIX_Comm*);
977  err = MPIX_Comm_init(mpixComm, (*rawComm)());
978  mpixComm_ = Teuchos::RCP(mpixComm,
979  MpixCommDeallocator(),
980  true /*take ownership*/
981  );
982  }
983 
984  TEUCHOS_ASSERT(err == 0);
985 }
986 #endif
987 
988 
989  DistributorPlan::SubViewLimits DistributorPlan::getImportViewLimits(size_t numPackets) const {
990  const size_t actualNumReceives = getNumReceives() + (hasSelfMessage() ? 1 : 0);
991 
992  IndexView importStarts(actualNumReceives);
993  IndexView importLengths(actualNumReceives);
994 
995  size_t offset = 0;
996  for (size_t i = 0; i < actualNumReceives; ++i) {
997  importStarts[i] = offset;
998  offset += getLengthsFrom()[i] * numPackets;
999  importLengths[i] = getLengthsFrom()[i] * numPackets;
1000  }
1001  return std::make_pair(importStarts, importLengths);
1002  }
1003 
1004  DistributorPlan::SubViewLimits DistributorPlan::getImportViewLimits(const Teuchos::ArrayView<const size_t> &numImportPacketsPerLID) const {
1005 
1006  const size_t actualNumReceives = getNumReceives() + (hasSelfMessage() ? 1 : 0);
1007 
1008  IndexView importStarts(actualNumReceives);
1009  IndexView importLengths(actualNumReceives);
1010 
1011  size_t offset = 0;
1012  size_t curLIDoffset = 0;
1013  for (size_t i = 0; i < actualNumReceives; ++i) {
1014  size_t totalPacketsFrom_i = 0;
1015  for (size_t j = 0; j < getLengthsFrom()[i]; ++j) {
1016  totalPacketsFrom_i += numImportPacketsPerLID[curLIDoffset + j];
1017  }
1018  curLIDoffset += getLengthsFrom()[i];
1019  importStarts[i] = offset;
1020  offset += totalPacketsFrom_i;
1021  importLengths[i] = totalPacketsFrom_i;
1022  }
1023  return std::make_pair(importStarts, importLengths);
1024  }
1025 
1026 
1027  DistributorPlan::SubViewLimits DistributorPlan::getExportViewLimits(size_t numPackets) const {
1028  if (getIndicesTo().is_null()) {
1029 
1030  const size_t actualNumSends = getNumSends() + (hasSelfMessage() ? 1 : 0);
1031  IndexView exportStarts(actualNumSends);
1032  IndexView exportLengths(actualNumSends);
1033  for (size_t pp = 0; pp < actualNumSends; ++pp) {
1034  exportStarts[pp] = getStartsTo()[pp] * numPackets;
1035  exportLengths[pp] = getLengthsTo()[pp] * numPackets;
1036  }
1037  return std::make_pair(exportStarts, exportLengths);
1038  } else {
1039  const size_t numIndices = getIndicesTo().size();
1040  IndexView exportStarts(numIndices);
1041  IndexView exportLengths(numIndices);
1042  for (size_t j = 0; j < numIndices; ++j) {
1043  exportStarts[j] = getIndicesTo()[j]*numPackets;
1044  exportLengths[j] = numPackets;
1045  }
1046  return std::make_pair(exportStarts, exportLengths);
1047  }
1048  }
1049 
1050  DistributorPlan::SubViewLimits DistributorPlan::getExportViewLimits(const Teuchos::ArrayView<const size_t> &numExportPacketsPerLID) const {
1051  if (getIndicesTo().is_null()) {
1052  const size_t actualNumSends = getNumSends() + (hasSelfMessage() ? 1 : 0);
1053  IndexView exportStarts(actualNumSends);
1054  IndexView exportLengths(actualNumSends);
1055  size_t offset = 0;
1056  for (size_t pp = 0; pp < actualNumSends; ++pp) {
1057  size_t numPackets = 0;
1058  for (size_t j = getStartsTo()[pp];
1059  j < getStartsTo()[pp] + getLengthsTo()[pp]; ++j) {
1060  numPackets += numExportPacketsPerLID[j];
1061  }
1062  exportStarts[pp] = offset;
1063  offset += numPackets;
1064  exportLengths[pp] = numPackets;
1065  }
1066  return std::make_pair(exportStarts, exportLengths);
1067  } else {
1068  const size_t numIndices = getIndicesTo().size();
1069  IndexView exportStarts(numIndices);
1070  IndexView exportLengths(numIndices);
1071  size_t offset = 0;
1072  for (size_t j = 0; j < numIndices; ++j) {
1073  exportStarts[j] = offset;
1074  offset += numExportPacketsPerLID[j];
1075  exportLengths[j] = numExportPacketsPerLID[j];
1076  }
1077  return std::make_pair(exportStarts, exportLengths);
1078  }
1079  }
1080 
1081 }
1082 }
EDistributorHowInitialized
Enum indicating how and whether a Distributor was initialized.
static bool debug()
Whether Tpetra is in debug mode.
const std::string & validSendTypeOrThrow(const std::string &s)
Valid enum values of distributor send types.
std::string DistributorSendTypeEnumToString(EDistributorSendType sendType)
Convert an EDistributorSendType enum value to a string.
#define TPETRA_EFFICIENCY_WARNING(throw_exception_test, msg)
Print or throw an efficency warning.
EDistributorSendType DistributorSendTypeStringToEnum(const std::string_view s)
Convert a string to an EDistributorSendType. Throw on error.
Teuchos::Array< EDistributorSendType > distributorSendTypeEnums()
Valid enum values of distributor send types.
void sort2(const IT1 &first1, const IT1 &last1, const IT2 &first2, const bool stableSort=false)
Sort the first array, and apply the resulting permutation to the second array.
std::string DistributorHowInitializedEnumToString(EDistributorHowInitialized how)
Convert an EDistributorHowInitialized enum value to a string.
Stand-alone utility functions and macros.
Teuchos::Array< std::string > distributorSendTypes()
Valid string values for Distributor&#39;s &quot;Send type&quot; parameter.
#define SHARED_TEST_FOR_EXCEPTION(throw_exception_test, Exception, msg, comm)
Test for exception, with reduction over the given communicator.
EDistributorSendType
The type of MPI send that Distributor should use.
Declaration of Tpetra::Details::Behavior, a class that describes Tpetra&#39;s behavior.