Tpetra parallel linear algebra  Version of the Day
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
Tpetra_Details_DistributorPlan.cpp
1 // ***********************************************************************
2 //
3 // Tpetra: Templated Linear Algebra Services Package
4 // Copyright (2008) Sandia Corporation
5 //
6 // Under the terms of Contract DE-AC04-94AL85000 with Sandia Corporation,
7 // the U.S. Government retains certain rights in this software.
8 //
9 // Redistribution and use in source and binary forms, with or without
10 // modification, are permitted provided that the following conditions are
11 // met:
12 //
13 // 1. Redistributions of source code must retain the above copyright
14 // notice, this list of conditions and the following disclaimer.
15 //
16 // 2. Redistributions in binary form must reproduce the above copyright
17 // notice, this list of conditions and the following disclaimer in the
18 // documentation and/or other materials provided with the distribution.
19 //
20 // 3. Neither the name of the Corporation nor the names of the
21 // contributors may be used to endorse or promote products derived from
22 // this software without specific prior written permission.
23 //
24 // THIS SOFTWARE IS PROVIDED BY SANDIA CORPORATION "AS IS" AND ANY
25 // EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
27 // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SANDIA CORPORATION OR THE
28 // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
29 // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
30 // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
31 // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
32 // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
33 // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
34 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
35 //
36 // ************************************************************************
37 // @HEADER
38 
40 
41 #include "Teuchos_StandardParameterEntryValidators.hpp"
42 #include "Tpetra_Util.hpp"
44 #include <numeric>
45 
46 namespace Tpetra {
47 namespace Details {
48 
49 std::string
51 {
52  if (sendType == DISTRIBUTOR_ISEND) {
53  return "Isend";
54  }
55  else if (sendType == DISTRIBUTOR_SEND) {
56  return "Send";
57  }
58  else if (sendType == DISTRIBUTOR_ALLTOALL) {
59  return "Alltoall";
60  }
61 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
62  else if (sendType == DISTRIBUTOR_MPIADVANCE_ALLTOALL) {
63  return "MpiAdvanceAlltoall";
64  }
65  else if (sendType == DISTRIBUTOR_MPIADVANCE_NBRALLTOALLV) {
66  return "MpiAdvanceNbralltoallv";
67  }
68 #endif
69  else {
70  TEUCHOS_TEST_FOR_EXCEPTION(true, std::invalid_argument, "Invalid "
71  "EDistributorSendType enum value " << sendType << ".");
72  }
73 }
74 
75 std::string
77 {
78  switch (how) {
79  case Details::DISTRIBUTOR_NOT_INITIALIZED:
80  return "Not initialized yet";
81  case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS:
82  return "By createFromSends";
83  case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_RECVS:
84  return "By createFromRecvs";
85  case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS_N_RECVS:
86  return "By createFromSendsAndRecvs";
87  case Details::DISTRIBUTOR_INITIALIZED_BY_REVERSE:
88  return "By createReverseDistributor";
89  case Details::DISTRIBUTOR_INITIALIZED_BY_COPY:
90  return "By copy constructor";
91  default:
92  return "INVALID";
93  }
94 }
95 
96 DistributorPlan::DistributorPlan(Teuchos::RCP<const Teuchos::Comm<int>> comm)
97  : comm_(comm),
98 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
99  mpixComm_(Teuchos::null),
100 #endif
101  howInitialized_(DISTRIBUTOR_NOT_INITIALIZED),
102  reversePlan_(Teuchos::null),
103  sendType_(DISTRIBUTOR_SEND),
104  sendMessageToSelf_(false),
105  numSendsToOtherProcs_(0),
106  maxSendLength_(0),
107  numReceives_(0),
108  totalReceiveLength_(0)
109 { }
110 
111 DistributorPlan::DistributorPlan(const DistributorPlan& otherPlan)
112  : comm_(otherPlan.comm_),
113 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
114  mpixComm_(otherPlan.mpixComm_),
115 #endif
116  howInitialized_(DISTRIBUTOR_INITIALIZED_BY_COPY),
117  reversePlan_(otherPlan.reversePlan_),
118  sendType_(otherPlan.sendType_),
119  sendMessageToSelf_(otherPlan.sendMessageToSelf_),
120  numSendsToOtherProcs_(otherPlan.numSendsToOtherProcs_),
121  procIdsToSendTo_(otherPlan.procIdsToSendTo_),
122  startsTo_(otherPlan.startsTo_),
123  lengthsTo_(otherPlan.lengthsTo_),
124  maxSendLength_(otherPlan.maxSendLength_),
125  indicesTo_(otherPlan.indicesTo_),
126  numReceives_(otherPlan.numReceives_),
127  totalReceiveLength_(otherPlan.totalReceiveLength_),
128  lengthsFrom_(otherPlan.lengthsFrom_),
129  procsFrom_(otherPlan.procsFrom_),
130  startsFrom_(otherPlan.startsFrom_),
131  indicesFrom_(otherPlan.indicesFrom_)
132 { }
133 
134 size_t DistributorPlan::createFromSends(const Teuchos::ArrayView<const int>& exportProcIDs) {
135  using Teuchos::outArg;
136  using Teuchos::REDUCE_MAX;
137  using Teuchos::reduceAll;
138  using std::endl;
139  const char rawPrefix[] = "Tpetra::DistributorPlan::createFromSends";
140 
141  const size_t numExports = exportProcIDs.size();
142  const int myProcID = comm_->getRank();
143  const int numProcs = comm_->getSize();
144  const bool debug = Details::Behavior::debug("Distributor");
145 
146  // exportProcIDs tells us the communication pattern for this
147  // distributor. It dictates the way that the export data will be
148  // interpreted in doPosts(). We want to perform at most one
149  // send per process in doPosts; this is for two reasons:
150  // * minimize latency / overhead in the comm routines (nice)
151  // * match the number of receives and sends between processes
152  // (necessary)
153  //
154  // Teuchos::Comm requires that the data for a send are contiguous
155  // in a send buffer. Therefore, if the data in the send buffer
156  // for doPosts() are not contiguous, they will need to be copied
157  // into a contiguous buffer. The user has specified this
158  // noncontiguous pattern and we can't do anything about it.
159  // However, if they do not provide an efficient pattern, we will
160  // warn them if one of the following compile-time options has been
161  // set:
162  // * HAVE_TPETRA_PRINT_EFFICIENCY_WARNINGS
163  //
164  // If the data are contiguous, then we can post the sends in situ
165  // (i.e., without needing to copy them into a send buffer).
166  //
167  // Determine contiguity. There are a number of ways to do this:
168  // * If the export IDs are sorted, then all exports to a
169  // particular proc must be contiguous. This is what Epetra does.
170  // * If the export ID of the current export already has been
171  // listed, then the previous listing should correspond to the
172  // same export. This tests contiguity, but not sortedness.
173  //
174  // Both of these tests require O(n), where n is the number of
175  // exports. However, the latter will positively identify a greater
176  // portion of contiguous patterns. We use the latter method.
177  //
178  // Check to see if values are grouped by procs without gaps
179  // If so, indices_to -> 0.
180 
181  if (debug) {
182  // Test whether any process in the communicator got an invalid
183  // process ID. If badID != -1 on this process, then it equals
184  // this process' rank. The max of all badID over all processes
185  // is the max rank which has an invalid process ID.
186  int badID = -1;
187  for (size_t i = 0; i < numExports; ++i) {
188  const int exportID = exportProcIDs[i];
189  if (exportID >= numProcs || exportID < 0) {
190  badID = myProcID;
191  break;
192  }
193  }
194  int gbl_badID;
195  reduceAll<int, int> (*comm_, REDUCE_MAX, badID, outArg (gbl_badID));
196  TEUCHOS_TEST_FOR_EXCEPTION
197  (gbl_badID >= 0, std::runtime_error, rawPrefix << "Proc "
198  << gbl_badID << ", perhaps among other processes, got a bad "
199  "send process ID.");
200  }
201 
202  // Set up data structures for quick traversal of arrays.
203  // This contains the number of sends for each process ID.
204  //
205  // FIXME (mfh 20 Mar 2014) This is one of a few places in Tpetra
206  // that create an array of length the number of processes in the
207  // communicator (plus one). Given how this code uses this array,
208  // it should be straightforward to replace it with a hash table or
209  // some other more space-efficient data structure. In practice,
210  // most of the entries of starts should be zero for a sufficiently
211  // large process count, unless the communication pattern is dense.
212  // Note that it's important to be able to iterate through keys (i
213  // for which starts[i] is nonzero) in increasing order.
214  Teuchos::Array<size_t> starts (numProcs + 1, 0);
215 
216  // numActive is the number of sends that are not Null
217  size_t numActive = 0;
218  int needSendBuff = 0; // Boolean
219 
220  for (size_t i = 0; i < numExports; ++i) {
221  const int exportID = exportProcIDs[i];
222  if (exportID >= 0) {
223  // exportID is a valid process ID. Increment the number of
224  // messages this process will send to that process.
225  ++starts[exportID];
226 
227  // If we're sending more than one message to process exportID,
228  // then it is possible that the data are not contiguous.
229  // Check by seeing if the previous process ID in the list
230  // (exportProcIDs[i-1]) is the same. It's safe to use i-1,
231  // because if starts[exportID] > 1, then i must be > 1 (since
232  // the starts array was filled with zeros initially).
233 
234  // null entries break continuity.
235  // e.g., [ 0, 0, 0, 1, -99, 1, 2, 2, 2] is not contiguous
236  if (needSendBuff == 0 && starts[exportID] > 1 &&
237  exportID != exportProcIDs[i-1]) {
238  needSendBuff = 1;
239  }
240  ++numActive;
241  }
242  }
243 
244 #if defined(HAVE_TPETRA_PRINT_EFFICIENCY_WARNINGS)
245  {
246  int global_needSendBuff;
247  reduceAll<int, int> (*comm_, REDUCE_MAX, needSendBuff,
248  outArg (global_needSendBuff));
250  global_needSendBuff != 0,
251  "::createFromSends: Grouping export IDs together by process rank often "
252  "improves performance.");
253  }
254 #endif
255 
256  // Determine from the caller's data whether or not the current
257  // process should send (a) message(s) to itself.
258  if (starts[myProcID] != 0) {
259  sendMessageToSelf_ = true;
260  }
261  else {
262  sendMessageToSelf_ = false;
263  }
264 
265  if (! needSendBuff) {
266  // grouped by proc, no send buffer or indicesTo_ needed
267  numSendsToOtherProcs_ = 0;
268  // Count total number of sends, i.e., total number of procs to
269  // which we are sending. This includes myself, if applicable.
270  for (int i = 0; i < numProcs; ++i) {
271  if (starts[i]) {
272  ++numSendsToOtherProcs_;
273  }
274  }
275 
276  // Not only do we not need these, but we must clear them, as
277  // empty status of indicesTo is a flag used later.
278  indicesTo_.resize(0);
279  // Size these to numSendsToOtherProcs_; note, at the moment, numSendsToOtherProcs_
280  // includes self sends. Set their values to zeros.
281  procIdsToSendTo_.assign(numSendsToOtherProcs_,0);
282  startsTo_.assign(numSendsToOtherProcs_,0);
283  lengthsTo_.assign(numSendsToOtherProcs_,0);
284 
285  // set startsTo to the offset for each send (i.e., each proc ID)
286  // set procsTo to the proc ID for each send
287  // in interpreting this code, remember that we are assuming contiguity
288  // that is why index skips through the ranks
289  {
290  size_t procIndex = 0;
291  for (size_t i = 0; i < numSendsToOtherProcs_; ++i) {
292  while (exportProcIDs[procIndex] < 0) {
293  ++procIndex; // skip all negative proc IDs
294  }
295  startsTo_[i] = procIndex;
296  int procID = exportProcIDs[procIndex];
297  procIdsToSendTo_[i] = procID;
298  procIndex += starts[procID];
299  }
300  }
301  // sort the startsTo and proc IDs together, in ascending order, according
302  // to proc IDs
303  if (numSendsToOtherProcs_ > 0) {
304  sort2(procIdsToSendTo_.begin(), procIdsToSendTo_.end(), startsTo_.begin());
305  }
306  // compute the maximum send length
307  maxSendLength_ = 0;
308  for (size_t i = 0; i < numSendsToOtherProcs_; ++i) {
309  int procID = procIdsToSendTo_[i];
310  lengthsTo_[i] = starts[procID];
311  if ((procID != myProcID) && (lengthsTo_[i] > maxSendLength_)) {
312  maxSendLength_ = lengthsTo_[i];
313  }
314  }
315  }
316  else {
317  // not grouped by proc, need send buffer and indicesTo_
318 
319  // starts[i] is the number of sends to proc i
320  // numActive equals number of sends total, \sum_i starts[i]
321 
322  // this loop starts at starts[1], so explicitly check starts[0]
323  if (starts[0] == 0 ) {
324  numSendsToOtherProcs_ = 0;
325  }
326  else {
327  numSendsToOtherProcs_ = 1;
328  }
329  for (Teuchos::Array<size_t>::iterator i=starts.begin()+1,
330  im1=starts.begin();
331  i != starts.end(); ++i)
332  {
333  if (*i != 0) ++numSendsToOtherProcs_;
334  *i += *im1;
335  im1 = i;
336  }
337  // starts[i] now contains the number of exports to procs 0 through i
338 
339  for (Teuchos::Array<size_t>::reverse_iterator ip1=starts.rbegin(),
340  i=starts.rbegin()+1;
341  i != starts.rend(); ++i)
342  {
343  *ip1 = *i;
344  ip1 = i;
345  }
346  starts[0] = 0;
347  // starts[i] now contains the number of exports to procs 0 through
348  // i-1, i.e., all procs before proc i
349 
350  indicesTo_.resize(numActive);
351 
352  for (size_t i = 0; i < numExports; ++i) {
353  if (exportProcIDs[i] >= 0) {
354  // record the offset to the sendBuffer for this export
355  indicesTo_[starts[exportProcIDs[i]]] = i;
356  // now increment the offset for this proc
357  ++starts[exportProcIDs[i]];
358  }
359  }
360  // our send buffer will contain the export data for each of the procs
361  // we communicate with, in order by proc id
362  // sendBuffer = {proc_0_data, proc_1_data, ..., proc_np-1_data}
363  // indicesTo now maps each export to the location in our send buffer
364  // associated with the export
365  // data for export i located at sendBuffer[indicesTo[i]]
366  //
367  // starts[i] once again contains the number of exports to
368  // procs 0 through i
369  for (int proc = numProcs-1; proc != 0; --proc) {
370  starts[proc] = starts[proc-1];
371  }
372  starts.front() = 0;
373  starts[numProcs] = numActive;
374  //
375  // starts[proc] once again contains the number of exports to
376  // procs 0 through proc-1
377  // i.e., the start of my data in the sendBuffer
378 
379  // this contains invalid data at procs we don't care about, that is okay
380  procIdsToSendTo_.resize(numSendsToOtherProcs_);
381  startsTo_.resize(numSendsToOtherProcs_);
382  lengthsTo_.resize(numSendsToOtherProcs_);
383 
384  // for each group of sends/exports, record the destination proc,
385  // the length, and the offset for this send into the
386  // send buffer (startsTo_)
387  maxSendLength_ = 0;
388  size_t snd = 0;
389  for (int proc = 0; proc < numProcs; ++proc ) {
390  if (starts[proc+1] != starts[proc]) {
391  lengthsTo_[snd] = starts[proc+1] - starts[proc];
392  startsTo_[snd] = starts[proc];
393  // record max length for all off-proc sends
394  if ((proc != myProcID) && (lengthsTo_[snd] > maxSendLength_)) {
395  maxSendLength_ = lengthsTo_[snd];
396  }
397  procIdsToSendTo_[snd] = proc;
398  ++snd;
399  }
400  }
401  }
402 
403  if (sendMessageToSelf_) {
404  --numSendsToOtherProcs_;
405  }
406 
407  // Invert map to see what msgs are received and what length
408  computeReceives();
409 
410 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
411  initializeMpiAdvance();
412 #endif
413 
414  // createFromRecvs() calls createFromSends(), but will set
415  // howInitialized_ again after calling createFromSends().
416  howInitialized_ = Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS;
417 
418  return totalReceiveLength_;
419 }
420 
421 void DistributorPlan::createFromRecvs(const Teuchos::ArrayView<const int>& remoteProcIDs)
422 {
423  createFromSends(remoteProcIDs);
424  *this = *getReversePlan();
425  howInitialized_ = Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_RECVS;
426 }
427 
428 void DistributorPlan::createFromSendsAndRecvs(const Teuchos::ArrayView<const int>& exportProcIDs,
429  const Teuchos::ArrayView<const int>& remoteProcIDs)
430 {
431  // note the exportProcIDs and remoteProcIDs _must_ be a list that has
432  // an entry for each GID. If the export/remoteProcIDs is taken from
433  // the getProcs{From|To} lists that are extracted from a previous distributor,
434  // it will generate a wrong answer, because those lists have a unique entry
435  // for each processor id. A version of this with lengthsTo and lengthsFrom
436  // should be made.
437 
438  howInitialized_ = Tpetra::Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS_N_RECVS;
439 
440  int myProcID = comm_->getRank ();
441  int numProcs = comm_->getSize();
442 
443  const size_t numExportIDs = exportProcIDs.size();
444  Teuchos::Array<size_t> starts (numProcs + 1, 0);
445 
446  size_t numActive = 0;
447  int needSendBuff = 0; // Boolean
448 
449  for(size_t i = 0; i < numExportIDs; i++ )
450  {
451  if( needSendBuff==0 && i && (exportProcIDs[i] < exportProcIDs[i-1]) )
452  needSendBuff = 1;
453  if( exportProcIDs[i] >= 0 )
454  {
455  ++starts[ exportProcIDs[i] ];
456  ++numActive;
457  }
458  }
459 
460  sendMessageToSelf_ = ( starts[myProcID] != 0 ) ? 1 : 0;
461 
462  numSendsToOtherProcs_ = 0;
463 
464  if( needSendBuff ) //grouped by processor, no send buffer or indicesTo_ needed
465  {
466  if (starts[0] == 0 ) {
467  numSendsToOtherProcs_ = 0;
468  }
469  else {
470  numSendsToOtherProcs_ = 1;
471  }
472  for (Teuchos::Array<size_t>::iterator i=starts.begin()+1,
473  im1=starts.begin();
474  i != starts.end(); ++i)
475  {
476  if (*i != 0) ++numSendsToOtherProcs_;
477  *i += *im1;
478  im1 = i;
479  }
480  // starts[i] now contains the number of exports to procs 0 through i
481 
482  for (Teuchos::Array<size_t>::reverse_iterator ip1=starts.rbegin(),
483  i=starts.rbegin()+1;
484  i != starts.rend(); ++i)
485  {
486  *ip1 = *i;
487  ip1 = i;
488  }
489  starts[0] = 0;
490  // starts[i] now contains the number of exports to procs 0 through
491  // i-1, i.e., all procs before proc i
492 
493  indicesTo_.resize(numActive);
494 
495  for (size_t i = 0; i < numExportIDs; ++i) {
496  if (exportProcIDs[i] >= 0) {
497  // record the offset to the sendBuffer for this export
498  indicesTo_[starts[exportProcIDs[i]]] = i;
499  // now increment the offset for this proc
500  ++starts[exportProcIDs[i]];
501  }
502  }
503  for (int proc = numProcs-1; proc != 0; --proc) {
504  starts[proc] = starts[proc-1];
505  }
506  starts.front() = 0;
507  starts[numProcs] = numActive;
508  procIdsToSendTo_.resize(numSendsToOtherProcs_);
509  startsTo_.resize(numSendsToOtherProcs_);
510  lengthsTo_.resize(numSendsToOtherProcs_);
511  maxSendLength_ = 0;
512  size_t snd = 0;
513  for (int proc = 0; proc < numProcs; ++proc ) {
514  if (starts[proc+1] != starts[proc]) {
515  lengthsTo_[snd] = starts[proc+1] - starts[proc];
516  startsTo_[snd] = starts[proc];
517  // record max length for all off-proc sends
518  if ((proc != myProcID) && (lengthsTo_[snd] > maxSendLength_)) {
519  maxSendLength_ = lengthsTo_[snd];
520  }
521  procIdsToSendTo_[snd] = proc;
522  ++snd;
523  }
524  }
525  }
526  else {
527  // grouped by proc, no send buffer or indicesTo_ needed
528  numSendsToOtherProcs_ = 0;
529  // Count total number of sends, i.e., total number of procs to
530  // which we are sending. This includes myself, if applicable.
531  for (int i = 0; i < numProcs; ++i) {
532  if (starts[i]) {
533  ++numSendsToOtherProcs_;
534  }
535  }
536 
537  // Not only do we not need these, but we must clear them, as
538  // empty status of indicesTo is a flag used later.
539  indicesTo_.resize(0);
540  // Size these to numSendsToOtherProcs_; note, at the moment, numSendsToOtherProcs_
541  // includes self sends. Set their values to zeros.
542  procIdsToSendTo_.assign(numSendsToOtherProcs_,0);
543  startsTo_.assign(numSendsToOtherProcs_,0);
544  lengthsTo_.assign(numSendsToOtherProcs_,0);
545 
546  // set startsTo to the offset for each send (i.e., each proc ID)
547  // set procsTo to the proc ID for each send
548  // in interpreting this code, remember that we are assuming contiguity
549  // that is why index skips through the ranks
550  {
551  size_t procIndex = 0;
552  for (size_t i = 0; i < numSendsToOtherProcs_; ++i) {
553  while (exportProcIDs[procIndex] < 0) {
554  ++procIndex; // skip all negative proc IDs
555  }
556  startsTo_[i] = procIndex;
557  int procID = exportProcIDs[procIndex];
558  procIdsToSendTo_[i] = procID;
559  procIndex += starts[procID];
560  }
561  }
562  // sort the startsTo and proc IDs together, in ascending order, according
563  // to proc IDs
564  if (numSendsToOtherProcs_ > 0) {
565  sort2(procIdsToSendTo_.begin(), procIdsToSendTo_.end(), startsTo_.begin());
566  }
567  // compute the maximum send length
568  maxSendLength_ = 0;
569  for (size_t i = 0; i < numSendsToOtherProcs_; ++i) {
570  int procID = procIdsToSendTo_[i];
571  lengthsTo_[i] = starts[procID];
572  if ((procID != myProcID) && (lengthsTo_[i] > maxSendLength_)) {
573  maxSendLength_ = lengthsTo_[i];
574  }
575  }
576  }
577 
578 
579  numSendsToOtherProcs_ -= sendMessageToSelf_;
580  std::vector<int> recv_list;
581  recv_list.reserve(numSendsToOtherProcs_); //reserve an initial guess for size needed
582 
583  int last_pid=-2;
584  for(int i=0; i<remoteProcIDs.size(); i++) {
585  if(remoteProcIDs[i]>last_pid) {
586  recv_list.push_back(remoteProcIDs[i]);
587  last_pid = remoteProcIDs[i];
588  }
589  else if (remoteProcIDs[i]<last_pid)
590  throw std::runtime_error("Tpetra::Distributor:::createFromSendsAndRecvs expected RemotePIDs to be in sorted order");
591  }
592  numReceives_ = recv_list.size();
593  if(numReceives_) {
594  procsFrom_.assign(numReceives_,0);
595  lengthsFrom_.assign(numReceives_,0);
596  indicesFrom_.assign(numReceives_,0);
597  startsFrom_.assign(numReceives_,0);
598  }
599  for(size_t i=0,j=0; i<numReceives_; ++i) {
600  int jlast=j;
601  procsFrom_[i] = recv_list[i];
602  startsFrom_[i] = j;
603  for( ; j<(size_t)remoteProcIDs.size() &&
604  remoteProcIDs[jlast]==remoteProcIDs[j] ; j++){;}
605  lengthsFrom_[i] = j-jlast;
606  }
607  totalReceiveLength_ = remoteProcIDs.size();
608  indicesFrom_.clear ();
609  numReceives_-=sendMessageToSelf_;
610 
611 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
612  initializeMpiAdvance();
613 #endif
614 }
615 
616 Teuchos::RCP<DistributorPlan> DistributorPlan::getReversePlan() const {
617  if (reversePlan_.is_null()) createReversePlan();
618  return reversePlan_;
619 }
620 
621 void DistributorPlan::createReversePlan() const
622 {
623  reversePlan_ = Teuchos::rcp(new DistributorPlan(comm_));
624  reversePlan_->howInitialized_ = Details::DISTRIBUTOR_INITIALIZED_BY_REVERSE;
625  reversePlan_->sendType_ = sendType_;
626 
627  // The total length of all the sends of this DistributorPlan. We
628  // calculate it because it's the total length of all the receives
629  // of the reverse DistributorPlan.
630  size_t totalSendLength =
631  std::accumulate(lengthsTo_.begin(), lengthsTo_.end(), 0);
632 
633  // The maximum length of any of the receives of this DistributorPlan.
634  // We calculate it because it's the maximum length of any of the
635  // sends of the reverse DistributorPlan.
636  size_t maxReceiveLength = 0;
637  const int myProcID = comm_->getRank();
638  for (size_t i=0; i < numReceives_; ++i) {
639  if (procsFrom_[i] != myProcID) {
640  // Don't count receives for messages sent by myself to myself.
641  if (lengthsFrom_[i] > maxReceiveLength) {
642  maxReceiveLength = lengthsFrom_[i];
643  }
644  }
645  }
646 
647  reversePlan_->sendMessageToSelf_ = sendMessageToSelf_;
648  reversePlan_->numSendsToOtherProcs_ = numReceives_;
649  reversePlan_->procIdsToSendTo_ = procsFrom_;
650  reversePlan_->startsTo_ = startsFrom_;
651  reversePlan_->lengthsTo_ = lengthsFrom_;
652  reversePlan_->maxSendLength_ = maxReceiveLength;
653  reversePlan_->indicesTo_ = indicesFrom_;
654  reversePlan_->numReceives_ = numSendsToOtherProcs_;
655  reversePlan_->totalReceiveLength_ = totalSendLength;
656  reversePlan_->lengthsFrom_ = lengthsTo_;
657  reversePlan_->procsFrom_ = procIdsToSendTo_;
658  reversePlan_->startsFrom_ = startsTo_;
659  reversePlan_->indicesFrom_ = indicesTo_;
660 
661 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
662  // is there a smarter way to do this
663  reversePlan_->initializeMpiAdvance();
664 #endif
665 }
666 
667 void DistributorPlan::computeReceives()
668 {
669  using Teuchos::Array;
670  using Teuchos::ArrayRCP;
671  using Teuchos::as;
672  using Teuchos::CommStatus;
673  using Teuchos::CommRequest;
674  using Teuchos::ireceive;
675  using Teuchos::RCP;
676  using Teuchos::rcp;
677  using Teuchos::REDUCE_SUM;
678  using Teuchos::receive;
679  using Teuchos::reduce;
680  using Teuchos::scatter;
681  using Teuchos::send;
682  using Teuchos::waitAll;
683 
684  const int myRank = comm_->getRank();
685  const int numProcs = comm_->getSize();
686 
687  const int mpiTag = DEFAULT_MPI_TAG;
688 
689  // toProcsFromMe[i] == the number of messages sent by this process
690  // to process i. The data in numSendsToOtherProcs_, procIdsToSendTo_, and lengthsTo_
691  // concern the contiguous sends. Therefore, each process will be
692  // listed in procIdsToSendTo_ at most once, and so toProcsFromMe[i] will
693  // either be 0 or 1.
694  {
695  Array<int> toProcsFromMe (numProcs, 0);
696 #ifdef HAVE_TPETRA_DEBUG
697  bool counting_error = false;
698 #endif // HAVE_TPETRA_DEBUG
699  for (size_t i = 0; i < (numSendsToOtherProcs_ + (sendMessageToSelf_ ? 1 : 0)); ++i) {
700 #ifdef HAVE_TPETRA_DEBUG
701  if (toProcsFromMe[procIdsToSendTo_[i]] != 0) {
702  counting_error = true;
703  }
704 #endif // HAVE_TPETRA_DEBUG
705  toProcsFromMe[procIdsToSendTo_[i]] = 1;
706  }
707 #ifdef HAVE_TPETRA_DEBUG
708  // Note that SHARED_TEST_FOR_EXCEPTION does a global reduction
709  SHARED_TEST_FOR_EXCEPTION(counting_error, std::logic_error,
710  "Tpetra::Distributor::computeReceives: There was an error on at least "
711  "one process in counting the number of messages send by that process to "
712  "the other processs. Please report this bug to the Tpetra developers.",
713  *comm_);
714 #endif // HAVE_TPETRA_DEBUG
715 
716  // Compute the number of receives that this process needs to
717  // post. The number of receives includes any self sends (i.e.,
718  // messages sent by this process to itself).
719  //
720  // (We will use numReceives_ this below to post exactly that
721  // number of receives, with MPI_ANY_SOURCE as the sending rank.
722  // This will tell us from which processes this process expects
723  // to receive, and how many packets of data we expect to receive
724  // from each process.)
725  //
726  // toProcsFromMe[i] is the number of messages sent by this
727  // process to process i. Compute the sum (elementwise) of all
728  // the toProcsFromMe arrays on all processes in the
729  // communicator. If the array x is that sum, then if this
730  // process has rank j, x[j] is the number of messages sent
731  // to process j, that is, the number of receives on process j
732  // (including any messages sent by process j to itself).
733  //
734  // Yes, this requires storing and operating on an array of
735  // length P, where P is the number of processes in the
736  // communicator. Epetra does this too. Avoiding this O(P)
737  // memory bottleneck would require some research.
738  //
739  // mfh 09 Jan 2012, 15 Jul 2015: There are three ways to
740  // implement this O(P) memory algorithm.
741  //
742  // 1. Use MPI_Reduce and MPI_Scatter: reduce on the root
743  // process (0) from toProcsFromMe, to numRecvsOnEachProc.
744  // Then, scatter the latter, so that each process p gets
745  // numRecvsOnEachProc[p].
746  //
747  // 2. Like #1, but use MPI_Reduce_scatter instead of
748  // MPI_Reduce and MPI_Scatter. MPI_Reduce_scatter might be
749  // optimized to reduce the number of messages, but
750  // MPI_Reduce_scatter is more general than we need (it
751  // allows the equivalent of MPI_Scatterv). See Bug 6336.
752  //
753  // 3. Do an all-reduce on toProcsFromMe, and let my process
754  // (with rank myRank) get numReceives_ from
755  // toProcsFromMe[myRank]. The HPCCG miniapp uses the
756  // all-reduce method.
757  //
758  // Approaches 1 and 3 have the same critical path length.
759  // However, #3 moves more data. This is because the final
760  // result is just one integer, but #3 moves a whole array of
761  // results to all the processes. This is why we use Approach 1
762  // here.
763  //
764  // mfh 12 Apr 2013: See discussion in createFromSends() about
765  // how we could use this communication to propagate an error
766  // flag for "free" in a release build.
767 
768  const int root = 0; // rank of root process of the reduction
769  Array<int> numRecvsOnEachProc; // temp; only needed on root
770  if (myRank == root) {
771  numRecvsOnEachProc.resize (numProcs);
772  }
773  int numReceivesAsInt = 0; // output
774  reduce<int, int> (toProcsFromMe.getRawPtr (),
775  numRecvsOnEachProc.getRawPtr (),
776  numProcs, REDUCE_SUM, root, *comm_);
777  scatter<int, int> (numRecvsOnEachProc.getRawPtr (), 1,
778  &numReceivesAsInt, 1, root, *comm_);
779  numReceives_ = static_cast<size_t> (numReceivesAsInt);
780  }
781 
782  // Now we know numReceives_, which is this process' number of
783  // receives. Allocate the lengthsFrom_ and procsFrom_ arrays
784  // with this number of entries.
785  lengthsFrom_.assign (numReceives_, 0);
786  procsFrom_.assign (numReceives_, 0);
787 
788  //
789  // Ask (via nonblocking receive) each process from which we are
790  // receiving how many packets we should expect from it in the
791  // communication pattern.
792  //
793 
794  // At this point, numReceives_ includes any self message that
795  // there may be. At the end of this routine, we'll subtract off
796  // the self message (if there is one) from numReceives_. In this
797  // routine, we don't need to receive a message from ourselves in
798  // order to figure out our lengthsFrom_ and source process ID; we
799  // can just ask ourselves directly. Thus, the actual number of
800  // nonblocking receives we post here does not include the self
801  // message.
802  const size_t actualNumReceives = numReceives_ - (sendMessageToSelf_ ? 1 : 0);
803 
804  // Teuchos' wrapper for nonblocking receives requires receive
805  // buffers that it knows won't go away. This is why we use RCPs,
806  // one RCP per nonblocking receive request. They get allocated in
807  // the loop below.
808  Array<RCP<CommRequest<int> > > requests (actualNumReceives);
809  Array<ArrayRCP<size_t> > lengthsFromBuffers (actualNumReceives);
810  Array<RCP<CommStatus<int> > > statuses (actualNumReceives);
811 
812  // Teuchos::Comm treats a negative process ID as MPI_ANY_SOURCE
813  // (receive data from any process).
814 #ifdef HAVE_MPI
815  const int anySourceProc = MPI_ANY_SOURCE;
816 #else
817  const int anySourceProc = -1;
818 #endif
819 
820  // Post the (nonblocking) receives.
821  for (size_t i = 0; i < actualNumReceives; ++i) {
822  // Once the receive completes, we can ask the corresponding
823  // CommStatus object (output by wait()) for the sending process'
824  // ID (which we'll assign to procsFrom_[i] -- don't forget to
825  // do that!).
826  lengthsFromBuffers[i].resize (1);
827  lengthsFromBuffers[i][0] = as<size_t> (0);
828  requests[i] = ireceive<int, size_t> (lengthsFromBuffers[i], anySourceProc,
829  mpiTag, *comm_);
830  }
831 
832  // Post the sends: Tell each process to which we are sending how
833  // many packets it should expect from us in the communication
834  // pattern. We could use nonblocking sends here, as long as we do
835  // a waitAll() on all the sends and receives at once.
836  //
837  // We assume that numSendsToOtherProcs_ and sendMessageToSelf_ have already been
838  // set. The value of numSendsToOtherProcs_ (my process' number of sends) does
839  // not include any message that it might send to itself.
840  for (size_t i = 0; i < numSendsToOtherProcs_ + (sendMessageToSelf_ ? 1 : 0); ++i) {
841  if (procIdsToSendTo_[i] != myRank) {
842  // Send a message to procIdsToSendTo_[i], telling that process that
843  // this communication pattern will send that process
844  // lengthsTo_[i] blocks of packets.
845  const size_t* const lengthsTo_i = &lengthsTo_[i];
846  send<int, size_t> (lengthsTo_i, 1, as<int> (procIdsToSendTo_[i]), mpiTag, *comm_);
847  }
848  else {
849  // We don't need a send in the self-message case. If this
850  // process will send a message to itself in the communication
851  // pattern, then the last element of lengthsFrom_ and
852  // procsFrom_ corresponds to the self-message. Of course
853  // this process knows how long the message is, and the process
854  // ID is its own process ID.
855  lengthsFrom_[numReceives_-1] = lengthsTo_[i];
856  procsFrom_[numReceives_-1] = myRank;
857  }
858  }
859 
860  //
861  // Wait on all the receives. When they arrive, check the status
862  // output of wait() for the receiving process ID, unpack the
863  // request buffers into lengthsFrom_, and set procsFrom_ from the
864  // status.
865  //
866  waitAll (*comm_, requests (), statuses ());
867  for (size_t i = 0; i < actualNumReceives; ++i) {
868  lengthsFrom_[i] = *lengthsFromBuffers[i];
869  procsFrom_[i] = statuses[i]->getSourceRank ();
870  }
871 
872  // Sort the procsFrom_ array, and apply the same permutation to
873  // lengthsFrom_. This ensures that procsFrom_[i] and
874  // lengthsFrom_[i] refers to the same thing.
875  sort2 (procsFrom_.begin(), procsFrom_.end(), lengthsFrom_.begin());
876 
877  // Compute indicesFrom_
878  totalReceiveLength_ =
879  std::accumulate (lengthsFrom_.begin (), lengthsFrom_.end (), 0);
880  indicesFrom_.clear ();
881 
882  startsFrom_.clear ();
883  startsFrom_.reserve (numReceives_);
884  for (size_t i = 0, j = 0; i < numReceives_; ++i) {
885  startsFrom_.push_back(j);
886  j += lengthsFrom_[i];
887  }
888 
889  if (sendMessageToSelf_) {
890  --numReceives_;
891  }
892 }
893 
894 void DistributorPlan::setParameterList(const Teuchos::RCP<Teuchos::ParameterList>& plist)
895 {
896  using Teuchos::FancyOStream;
897  using Teuchos::getIntegralValue;
898  using Teuchos::ParameterList;
899  using Teuchos::parameterList;
900  using Teuchos::RCP;
901  using std::endl;
902 
903  if (! plist.is_null()) {
904  RCP<const ParameterList> validParams = getValidParameters ();
905  plist->validateParametersAndSetDefaults (*validParams);
906 
907  const Details::EDistributorSendType sendType =
908  getIntegralValue<Details::EDistributorSendType> (*plist, "Send type");
909 
910  // Now that we've validated the input list, save the results.
911  sendType_ = sendType;
912 
913  // ParameterListAcceptor semantics require pointer identity of the
914  // sublist passed to setParameterList(), so we save the pointer.
915  this->setMyParamList (plist);
916  }
917 }
918 
919 Teuchos::Array<std::string> distributorSendTypes()
920 {
921  Teuchos::Array<std::string> sendTypes;
922  sendTypes.push_back ("Isend");
923  sendTypes.push_back ("Send");
924  sendTypes.push_back ("Alltoall");
925 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
926  sendTypes.push_back ("MpiAdvanceAlltoall");
927  sendTypes.push_back ("MpiAdvanceNbralltoallv");
928 #endif
929  return sendTypes;
930 }
931 
932 Teuchos::RCP<const Teuchos::ParameterList>
933 DistributorPlan::getValidParameters() const
934 {
935  using Teuchos::Array;
936  using Teuchos::ParameterList;
937  using Teuchos::parameterList;
938  using Teuchos::RCP;
939  using Teuchos::setStringToIntegralParameter;
940 
941  Array<std::string> sendTypes = distributorSendTypes ();
942  const std::string defaultSendType ("Send");
943  Array<Details::EDistributorSendType> sendTypeEnums;
944  sendTypeEnums.push_back (Details::DISTRIBUTOR_ISEND);
945  sendTypeEnums.push_back (Details::DISTRIBUTOR_SEND);
946  sendTypeEnums.push_back (Details::DISTRIBUTOR_ALLTOALL);
947 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
948  sendTypeEnums.push_back (Details::DISTRIBUTOR_MPIADVANCE_ALLTOALL);
949  sendTypeEnums.push_back (Details::DISTRIBUTOR_MPIADVANCE_NBRALLTOALLV);
950 #endif
951 
952  RCP<ParameterList> plist = parameterList ("Tpetra::Distributor");
953 
954  setStringToIntegralParameter<Details::EDistributorSendType> ("Send type",
955  defaultSendType, "When using MPI, the variant of send to use in "
956  "do[Reverse]Posts()", sendTypes(), sendTypeEnums(), plist.getRawPtr());
957  plist->set ("Timer Label","","Label for Time Monitor output");
958 
959  return Teuchos::rcp_const_cast<const ParameterList> (plist);
960 }
961 
962 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
963 
964 // Used by Teuchos::RCP to clean up an owned MPIX_Comm*
965 struct MpixCommDeallocator {
966  void free(MPIX_Comm **comm) const {
967  MPIX_Comm_free(*comm);
968  }
969 };
970 
971 void DistributorPlan::initializeMpiAdvance() {
972 
973  // assert the mpix communicator is null. if this is not the case we will figure out why
974  TEUCHOS_ASSERT(mpixComm_.is_null());
975 
976  // use the members to initialize the graph for neightborhood mode, or just the MPIX communicator for non-neighborhood mode
977  Teuchos::RCP<const Teuchos::MpiComm<int> > mpiComm = Teuchos::rcp_dynamic_cast<const Teuchos::MpiComm<int> >(comm_);
978  Teuchos::RCP<const Teuchos::OpaqueWrapper<MPI_Comm> > rawComm = mpiComm->getRawMpiComm();
979  int err = 0;
980  if (sendType_ == DISTRIBUTOR_MPIADVANCE_ALLTOALL) {
981  MPIX_Comm **mpixComm = new(MPIX_Comm*);
982  err = MPIX_Comm_init(mpixComm, (*rawComm)());
983  mpixComm_ = Teuchos::RCP(mpixComm,
984  MpixCommDeallocator(),
985  true /*take ownership*/
986  );
987  }
988  else if (sendType_ == DISTRIBUTOR_MPIADVANCE_NBRALLTOALLV) {
989  int numRecvs = (int)(numReceives_ + (sendMessageToSelf_ ? 1 : 0));
990  int *sourceRanks = procsFrom_.data();
991 
992  // int *sourceWeights = static_cast<int*>(lengthsFrom_.data());// lengthsFrom_ may not be int
993  const int *sourceWeights = MPI_UNWEIGHTED;
994  int numSends = (int)(numSendsToOtherProcs_ + (sendMessageToSelf_ ? 1 : 0));
995  int *destRanks = procIdsToSendTo_.data();
996 
997  // int *destWeights = static_cast<int*>(lengthsTo_.data()); // lengthsTo_ may not be int
998  const int *destWeights = MPI_UNWEIGHTED; // lengthsTo_ may not be int
999 
1000  MPIX_Comm **mpixComm = new(MPIX_Comm*);
1001  err = MPIX_Dist_graph_create_adjacent((*rawComm)(), numRecvs, sourceRanks, sourceWeights, numSends, destRanks, destWeights, MPI_INFO_NULL, false, mpixComm);
1002  mpixComm_ = Teuchos::RCP(mpixComm,
1003  MpixCommDeallocator(),
1004  true /*take ownership*/
1005  );
1006  }
1007 
1008  TEUCHOS_ASSERT(err == 0);
1009 }
1010 #endif
1011 
1012 }
1013 }
EDistributorHowInitialized
Enum indicating how and whether a Distributor was initialized.
static bool debug()
Whether Tpetra is in debug mode.
std::string DistributorSendTypeEnumToString(EDistributorSendType sendType)
Convert an EDistributorSendType enum value to a string.
#define TPETRA_EFFICIENCY_WARNING(throw_exception_test, msg)
Print or throw an efficency warning.
Teuchos::Array< std::string > distributorSendTypes()
Valid values for Distributor&#39;s &quot;Send type&quot; parameter.
void sort2(const IT1 &first1, const IT1 &last1, const IT2 &first2)
Sort the first array, and apply the resulting permutation to the second array.
std::string DistributorHowInitializedEnumToString(EDistributorHowInitialized how)
Convert an EDistributorHowInitialized enum value to a string.
Stand-alone utility functions and macros.
#define SHARED_TEST_FOR_EXCEPTION(throw_exception_test, Exception, msg, comm)
Test for exception, with reduction over the given communicator.
EDistributorSendType
The type of MPI send that Distributor should use.
Declaration of Tpetra::Details::Behavior, a class that describes Tpetra&#39;s behavior.