Tpetra parallel linear algebra  Version of the Day
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
Tpetra_Distributor.cpp
1 // ***********************************************************************
2 //
3 // Tpetra: Templated Linear Algebra Services Package
4 // Copyright (2008) Sandia Corporation
5 //
6 // Under the terms of Contract DE-AC04-94AL85000 with Sandia Corporation,
7 // the U.S. Government retains certain rights in this software.
8 //
9 // Redistribution and use in source and binary forms, with or without
10 // modification, are permitted provided that the following conditions are
11 // met:
12 //
13 // 1. Redistributions of source code must retain the above copyright
14 // notice, this list of conditions and the following disclaimer.
15 //
16 // 2. Redistributions in binary form must reproduce the above copyright
17 // notice, this list of conditions and the following disclaimer in the
18 // documentation and/or other materials provided with the distribution.
19 //
20 // 3. Neither the name of the Corporation nor the names of the
21 // contributors may be used to endorse or promote products derived from
22 // this software without specific prior written permission.
23 //
24 // THIS SOFTWARE IS PROVIDED BY SANDIA CORPORATION "AS IS" AND ANY
25 // EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
27 // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SANDIA CORPORATION OR THE
28 // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
29 // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
30 // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
31 // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
32 // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
33 // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
34 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
35 //
36 // Questions? Contact Michael A. Heroux (maherou@sandia.gov)
37 //
38 // ************************************************************************
39 // @HEADER
40 
41 #include "Tpetra_Distributor.hpp"
42 #include "Tpetra_Details_gathervPrint.hpp"
43 #include "Tpetra_Details_makeValidVerboseStream.hpp"
44 #include "Teuchos_StandardParameterEntryValidators.hpp"
45 #include "Teuchos_VerboseObjectParameterListHelpers.hpp"
46 #include <numeric>
47 
48 namespace Tpetra {
49  namespace Details {
50  std::string
52  {
53  if (sendType == DISTRIBUTOR_ISEND) {
54  return "Isend";
55  }
56  else if (sendType == DISTRIBUTOR_RSEND) {
57  return "Rsend";
58  }
59  else if (sendType == DISTRIBUTOR_SEND) {
60  return "Send";
61  }
62  else if (sendType == DISTRIBUTOR_SSEND) {
63  return "Ssend";
64  }
65  else {
66  TEUCHOS_TEST_FOR_EXCEPTION(true, std::invalid_argument, "Invalid "
67  "EDistributorSendType enum value " << sendType << ".");
68  }
69  }
70 
71  std::string
73  {
74  switch (how) {
75  case Details::DISTRIBUTOR_NOT_INITIALIZED:
76  return "Not initialized yet";
77  case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS:
78  return "By createFromSends";
79  case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_RECVS:
80  return "By createFromRecvs";
81  case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS_N_RECVS:
82  return "By createFromSendsAndRecvs";
83  case Details::DISTRIBUTOR_INITIALIZED_BY_REVERSE:
84  return "By createReverseDistributor";
85  case Details::DISTRIBUTOR_INITIALIZED_BY_COPY:
86  return "By copy constructor";
87  default:
88  return "INVALID";
89  }
90  }
91  } // namespace Details
92 
93  Teuchos::Array<std::string>
95  {
96  Teuchos::Array<std::string> sendTypes;
97  sendTypes.push_back ("Isend");
98  sendTypes.push_back ("Rsend");
99  sendTypes.push_back ("Send");
100  sendTypes.push_back ("Ssend");
101  return sendTypes;
102  }
103 
104  // We set default values of Distributor's Boolean parameters here,
105  // in this one place. That way, if we want to change the default
106  // value of a parameter, we don't have to search the whole file to
107  // ensure a consistent setting.
108  namespace {
109  // Default value of the "Debug" parameter.
110  const bool tpetraDistributorDebugDefault = false;
111  // Default value of the "Barrier between receives and sends" parameter.
112  const bool barrierBetween_default = false;
113  // Default value of the "Use distinct tags" parameter.
114  const bool useDistinctTags_default = true;
115  } // namespace (anonymous)
116 
117  int Distributor::getTag (const int pathTag) const {
118  return useDistinctTags_ ? pathTag : comm_->getTag ();
119  }
120 
121 
122 #ifdef TPETRA_DISTRIBUTOR_TIMERS
123  void Distributor::makeTimers () {
124  const std::string name_doPosts3 = "Tpetra::Distributor: doPosts(3)";
125  const std::string name_doPosts4 = "Tpetra::Distributor: doPosts(4)";
126  const std::string name_doWaits = "Tpetra::Distributor: doWaits";
127  const std::string name_doPosts3_recvs = "Tpetra::Distributor: doPosts(3): recvs";
128  const std::string name_doPosts4_recvs = "Tpetra::Distributor: doPosts(4): recvs";
129  const std::string name_doPosts3_barrier = "Tpetra::Distributor: doPosts(3): barrier";
130  const std::string name_doPosts4_barrier = "Tpetra::Distributor: doPosts(4): barrier";
131  const std::string name_doPosts3_sends = "Tpetra::Distributor: doPosts(3): sends";
132  const std::string name_doPosts4_sends = "Tpetra::Distributor: doPosts(4): sends";
133 
134  timer_doPosts3_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts3);
135  timer_doPosts4_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts4);
136  timer_doWaits_ = Teuchos::TimeMonitor::getNewTimer (name_doWaits);
137  timer_doPosts3_recvs_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts3_recvs);
138  timer_doPosts4_recvs_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts4_recvs);
139  timer_doPosts3_barrier_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts3_barrier);
140  timer_doPosts4_barrier_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts4_barrier);
141  timer_doPosts3_sends_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts3_sends);
142  timer_doPosts4_sends_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts4_sends);
143  }
144 #endif // TPETRA_DISTRIBUTOR_TIMERS
145 
147  Distributor (const Teuchos::RCP<const Teuchos::Comm<int> >& comm,
148  const Teuchos::RCP<Teuchos::FancyOStream>& out,
149  const Teuchos::RCP<Teuchos::ParameterList>& plist)
150  : comm_ (comm)
151  , out_ (::Tpetra::Details::makeValidVerboseStream (out))
152  , howInitialized_ (Details::DISTRIBUTOR_NOT_INITIALIZED)
153  , sendType_ (Details::DISTRIBUTOR_SEND)
154  , barrierBetween_ (barrierBetween_default)
155  , verbose_ (tpetraDistributorDebugDefault)
156  , selfMessage_ (false)
157  , numSends_ (0)
158  , maxSendLength_ (0)
159  , numReceives_ (0)
160  , totalReceiveLength_ (0)
161  , lastRoundBytesSend_ (0)
162  , lastRoundBytesRecv_ (0)
163  , useDistinctTags_ (useDistinctTags_default)
164  {
165  TEUCHOS_ASSERT( ! out_.is_null () );
166 
167  this->setParameterList (plist); // sets verbose_ via Behavior
168 #ifdef TPETRA_DISTRIBUTOR_TIMERS
169  makeTimers ();
170 #endif // TPETRA_DISTRIBUTOR_TIMERS
171  }
172 
174  Distributor (const Teuchos::RCP<const Teuchos::Comm<int> >& comm)
175  : Distributor (comm, Teuchos::null, Teuchos::null)
176  {}
177 
179  Distributor (const Teuchos::RCP<const Teuchos::Comm<int> >& comm,
180  const Teuchos::RCP<Teuchos::FancyOStream>& out)
181  : Distributor (comm, out, Teuchos::null)
182  {}
183 
185  Distributor (const Teuchos::RCP<const Teuchos::Comm<int> >& comm,
186  const Teuchos::RCP<Teuchos::ParameterList>& plist)
187  : Distributor (comm, Teuchos::null, plist)
188  {}
189 
191  Distributor (const Distributor& distributor)
192  : comm_ (distributor.comm_)
193  , out_ (distributor.out_)
194  , howInitialized_ (Details::DISTRIBUTOR_INITIALIZED_BY_COPY)
195  , sendType_ (distributor.sendType_)
196  , barrierBetween_ (distributor.barrierBetween_)
197  , verbose_ (distributor.verbose_)
198  , selfMessage_ (distributor.selfMessage_)
199  , numSends_ (distributor.numSends_)
200  , procsTo_ (distributor.procsTo_)
201  , startsTo_ (distributor.startsTo_)
202  , lengthsTo_ (distributor.lengthsTo_)
203  , maxSendLength_ (distributor.maxSendLength_)
204  , indicesTo_ (distributor.indicesTo_)
205  , numReceives_ (distributor.numReceives_)
206  , totalReceiveLength_ (distributor.totalReceiveLength_)
207  , lengthsFrom_ (distributor.lengthsFrom_)
208  , procsFrom_ (distributor.procsFrom_)
209  , startsFrom_ (distributor.startsFrom_)
210  , indicesFrom_ (distributor.indicesFrom_)
211  , reverseDistributor_ (distributor.reverseDistributor_)
212  , lastRoundBytesSend_ (distributor.lastRoundBytesSend_)
213  , lastRoundBytesRecv_ (distributor.lastRoundBytesRecv_)
214  , useDistinctTags_ (distributor.useDistinctTags_)
215  {
216  using Teuchos::ParameterList;
217  using Teuchos::RCP;
218  using Teuchos::rcp;
219 
220  TEUCHOS_ASSERT( ! out_.is_null () );
221 
222  RCP<const ParameterList> rhsList = distributor.getParameterList ();
223  RCP<ParameterList> newList = rhsList.is_null () ? Teuchos::null :
224  Teuchos::parameterList (*rhsList);
225  this->setParameterList (newList);
226 
227 #ifdef TPETRA_DISTRIBUTOR_TIMERS
228  makeTimers ();
229 #endif // TPETRA_DISTRIBUTOR_TIMERS
230  }
231 
233  using Teuchos::ParameterList;
234  using Teuchos::parameterList;
235  using Teuchos::RCP;
236 
237  std::swap (comm_, rhs.comm_);
238  std::swap (out_, rhs.out_);
239  std::swap (howInitialized_, rhs.howInitialized_);
240  std::swap (sendType_, rhs.sendType_);
241  std::swap (barrierBetween_, rhs.barrierBetween_);
242  std::swap (verbose_, rhs.verbose_);
243  std::swap (selfMessage_, rhs.selfMessage_);
244  std::swap (numSends_, rhs.numSends_);
245  std::swap (procsTo_, rhs.procsTo_);
246  std::swap (startsTo_, rhs.startsTo_);
247  std::swap (lengthsTo_, rhs.lengthsTo_);
248  std::swap (maxSendLength_, rhs.maxSendLength_);
249  std::swap (indicesTo_, rhs.indicesTo_);
250  std::swap (numReceives_, rhs.numReceives_);
251  std::swap (totalReceiveLength_, rhs.totalReceiveLength_);
252  std::swap (lengthsFrom_, rhs.lengthsFrom_);
253  std::swap (procsFrom_, rhs.procsFrom_);
254  std::swap (startsFrom_, rhs.startsFrom_);
255  std::swap (indicesFrom_, rhs.indicesFrom_);
256  std::swap (reverseDistributor_, rhs.reverseDistributor_);
257  std::swap (lastRoundBytesSend_, rhs.lastRoundBytesSend_);
258  std::swap (lastRoundBytesRecv_, rhs.lastRoundBytesRecv_);
259  std::swap (useDistinctTags_, rhs.useDistinctTags_);
260 
261  // Swap parameter lists. If they are the same object, make a deep
262  // copy first, so that modifying one won't modify the other one.
263  RCP<ParameterList> lhsList = this->getNonconstParameterList ();
264  RCP<ParameterList> rhsList = rhs.getNonconstParameterList ();
265  if (lhsList.getRawPtr () == rhsList.getRawPtr () && ! rhsList.is_null ()) {
266  rhsList = parameterList (*rhsList);
267  }
268  if (! rhsList.is_null ()) {
269  this->setMyParamList (rhsList);
270  }
271  if (! lhsList.is_null ()) {
272  rhs.setMyParamList (lhsList);
273  }
274 
275  // We don't need to swap timers, because all instances of
276  // Distributor use the same timers.
277  }
278 
279  void
281  setParameterList (const Teuchos::RCP<Teuchos::ParameterList>& plist)
282  {
283  using ::Tpetra::Details::Behavior;
284  using Teuchos::FancyOStream;
285  using Teuchos::getIntegralValue;
286  using Teuchos::includesVerbLevel;
287  using Teuchos::OSTab;
288  using Teuchos::ParameterList;
289  using Teuchos::parameterList;
290  using Teuchos::RCP;
291  using std::endl;
292 
293  const bool verboseDefault = Behavior::verbose ("Distributor") ||
294  Behavior::verbose ("Tpetra::Distributor");
295 
296  if (plist.is_null ()) {
297  verbose_ = verboseDefault;
298  }
299  else {
300  RCP<const ParameterList> validParams = getValidParameters ();
301  plist->validateParametersAndSetDefaults (*validParams);
302 
303  const bool barrierBetween =
304  plist->get<bool> ("Barrier between receives and sends");
305  const Details::EDistributorSendType sendType =
306  getIntegralValue<Details::EDistributorSendType> (*plist, "Send type");
307  const bool useDistinctTags = plist->get<bool> ("Use distinct tags");
308  const bool debug = plist->get<bool> ("Debug");
309  {
310  // mfh 03 May 2016: We keep this option only for backwards
311  // compatibility, but it must always be true. See discussion of
312  // Github Issue #227.
313  const bool enable_cuda_rdma =
314  plist->get<bool> ("Enable MPI CUDA RDMA support");
315  TEUCHOS_TEST_FOR_EXCEPTION
316  (! enable_cuda_rdma, std::invalid_argument, "Tpetra::Distributor::"
317  "setParameterList: " << "You specified \"Enable MPI CUDA RDMA "
318  "support\" = false. This is no longer valid. You don't need to "
319  "specify this option any more; Tpetra assumes it is always true. "
320  "This is a very light assumption on the MPI implementation, and in "
321  "fact does not actually involve hardware or system RDMA support. "
322  "Tpetra just assumes that the MPI implementation can tell whether a "
323  "pointer points to host memory or CUDA device memory.");
324  }
325 
326  // We check this property explicitly, since we haven't yet learned
327  // how to make a validator that can cross-check properties.
328  // Later, turn this into a validator so that it can be embedded in
329  // the valid ParameterList and used in Optika.
330  TEUCHOS_TEST_FOR_EXCEPTION
331  (! barrierBetween && sendType == Details::DISTRIBUTOR_RSEND,
332  std::invalid_argument, "Tpetra::Distributor::setParameterList: " << endl
333  << "You specified \"Send type\"=\"Rsend\", but turned off the barrier "
334  "between receives and sends." << endl << "This is invalid; you must "
335  "include the barrier if you use ready sends." << endl << "Ready sends "
336  "require that their corresponding receives have already been posted, "
337  "and the only way to guarantee that in general is with a barrier.");
338 
339  // Now that we've validated the input list, save the results.
340  sendType_ = sendType;
341  barrierBetween_ = barrierBetween;
342  useDistinctTags_ = useDistinctTags;
343  verbose_ = debug || verboseDefault;
344 
345  // ParameterListAcceptor semantics require pointer identity of the
346  // sublist passed to setParameterList(), so we save the pointer.
347  this->setMyParamList (plist);
348  }
349  }
350 
351  Teuchos::RCP<const Teuchos::ParameterList>
353  {
354  using Teuchos::Array;
355  using Teuchos::ParameterList;
356  using Teuchos::parameterList;
357  using Teuchos::RCP;
358  using Teuchos::setStringToIntegralParameter;
359 
360  const bool barrierBetween = barrierBetween_default;
361  const bool useDistinctTags = useDistinctTags_default;
362  const bool debug = tpetraDistributorDebugDefault;
363 
364  Array<std::string> sendTypes = distributorSendTypes ();
365  const std::string defaultSendType ("Send");
366  Array<Details::EDistributorSendType> sendTypeEnums;
367  sendTypeEnums.push_back (Details::DISTRIBUTOR_ISEND);
368  sendTypeEnums.push_back (Details::DISTRIBUTOR_RSEND);
369  sendTypeEnums.push_back (Details::DISTRIBUTOR_SEND);
370  sendTypeEnums.push_back (Details::DISTRIBUTOR_SSEND);
371 
372  RCP<ParameterList> plist = parameterList ("Tpetra::Distributor");
373  plist->set ("Barrier between receives and sends", barrierBetween,
374  "Whether to execute a barrier between receives and sends in do"
375  "[Reverse]Posts(). Required for correctness when \"Send type\""
376  "=\"Rsend\", otherwise correct but not recommended.");
377  setStringToIntegralParameter<Details::EDistributorSendType> ("Send type",
378  defaultSendType, "When using MPI, the variant of send to use in "
379  "do[Reverse]Posts()", sendTypes(), sendTypeEnums(), plist.getRawPtr());
380  plist->set ("Use distinct tags", useDistinctTags, "Whether to use distinct "
381  "MPI message tags for different code paths. Highly recommended"
382  " to avoid message collisions.");
383  plist->set ("Debug", debug, "Whether to print copious debugging output on "
384  "all processes.");
385  plist->set ("Timer Label","","Label for Time Monitor output");
386  plist->set ("Enable MPI CUDA RDMA support", true, "Assume that MPI can "
387  "tell whether a pointer points to host memory or CUDA device "
388  "memory. You don't need to specify this option any more; "
389  "Tpetra assumes it is always true. This is a very light "
390  "assumption on the MPI implementation, and in fact does not "
391  "actually involve hardware or system RDMA support.");
392 
393  // mfh 24 Dec 2015: Tpetra no longer inherits from
394  // Teuchos::VerboseObject, so it doesn't need the "VerboseObject"
395  // sublist. However, we retain the "VerboseObject" sublist
396  // anyway, for backwards compatibility (otherwise the above
397  // validation would fail with an invalid parameter name, should
398  // the user still want to provide this list).
399  Teuchos::setupVerboseObjectSublist (&*plist);
400  return Teuchos::rcp_const_cast<const ParameterList> (plist);
401  }
402 
403 
405  { return totalReceiveLength_; }
406 
408  { return numReceives_; }
409 
411  { return selfMessage_; }
412 
414  { return numSends_; }
415 
417  { return maxSendLength_; }
418 
419  Teuchos::ArrayView<const int> Distributor::getProcsFrom() const
420  { return procsFrom_; }
421 
422  Teuchos::ArrayView<const size_t> Distributor::getLengthsFrom() const
423  { return lengthsFrom_; }
424 
425  Teuchos::ArrayView<const int> Distributor::getProcsTo() const
426  { return procsTo_; }
427 
428  Teuchos::ArrayView<const size_t> Distributor::getLengthsTo() const
429  { return lengthsTo_; }
430 
431  Teuchos::RCP<Distributor>
433  if (reverseDistributor_.is_null ()) {
434  createReverseDistributor ();
435  }
436  TEUCHOS_TEST_FOR_EXCEPTION
437  (reverseDistributor_.is_null (), std::logic_error, "The reverse "
438  "Distributor is null after createReverseDistributor returned. "
439  "Please report this bug to the Tpetra developers.");
440  return reverseDistributor_;
441  }
442 
443 
444  void
445  Distributor::createReverseDistributor() const
446  {
447  reverseDistributor_ = Teuchos::rcp (new Distributor (comm_, out_));
448  reverseDistributor_->howInitialized_ = Details::DISTRIBUTOR_INITIALIZED_BY_REVERSE;
449  reverseDistributor_->sendType_ = sendType_;
450  reverseDistributor_->barrierBetween_ = barrierBetween_;
451  reverseDistributor_->verbose_ = verbose_;
452 
453  // The total length of all the sends of this Distributor. We
454  // calculate it because it's the total length of all the receives
455  // of the reverse Distributor.
456  size_t totalSendLength =
457  std::accumulate (lengthsTo_.begin(), lengthsTo_.end(), 0);
458 
459  // The maximum length of any of the receives of this Distributor.
460  // We calculate it because it's the maximum length of any of the
461  // sends of the reverse Distributor.
462  size_t maxReceiveLength = 0;
463  const int myProcID = comm_->getRank();
464  for (size_t i=0; i < numReceives_; ++i) {
465  if (procsFrom_[i] != myProcID) {
466  // Don't count receives for messages sent by myself to myself.
467  if (lengthsFrom_[i] > maxReceiveLength) {
468  maxReceiveLength = lengthsFrom_[i];
469  }
470  }
471  }
472 
473  // Initialize all of reverseDistributor's data members. This
474  // mainly just involves flipping "send" and "receive," or the
475  // equivalent "to" and "from."
476 
477  reverseDistributor_->selfMessage_ = selfMessage_;
478  reverseDistributor_->numSends_ = numReceives_;
479  reverseDistributor_->procsTo_ = procsFrom_;
480  reverseDistributor_->startsTo_ = startsFrom_;
481  reverseDistributor_->lengthsTo_ = lengthsFrom_;
482  reverseDistributor_->maxSendLength_ = maxReceiveLength;
483  reverseDistributor_->indicesTo_ = indicesFrom_;
484  reverseDistributor_->numReceives_ = numSends_;
485  reverseDistributor_->totalReceiveLength_ = totalSendLength;
486  reverseDistributor_->lengthsFrom_ = lengthsTo_;
487  reverseDistributor_->procsFrom_ = procsTo_;
488  reverseDistributor_->startsFrom_ = startsTo_;
489  reverseDistributor_->indicesFrom_ = indicesTo_;
490 
491  // requests_: Allocated on demand.
492  // reverseDistributor_: See note below
493 
494  // mfh 31 Mar 2016: These are statistics, kept on calls to
495  // doPostsAndWaits or doReversePostsAndWaits. They weren't here
496  // when I started, and I didn't add them, so I don't know if they
497  // are accurate.
498  reverseDistributor_->lastRoundBytesSend_ = 0;
499  reverseDistributor_->lastRoundBytesRecv_ = 0;
500 
501  reverseDistributor_->useDistinctTags_ = useDistinctTags_;
502 
503  // I am my reverse Distributor's reverse Distributor.
504  // Thus, it would be legit to do the following:
505  //
506  // reverseDistributor_->reverseDistributor_ = Teuchos::rcp (this, false);
507  //
508  // (Note use of a "weak reference" to avoid a circular RCP
509  // dependency.) The only issue is that if users hold on to the
510  // reverse Distributor but let go of the forward one, this
511  // reference won't be valid anymore. However, the reverse
512  // Distributor is really an implementation detail of Distributor
513  // and not meant to be used directly, so we don't need to do this.
514  reverseDistributor_->reverseDistributor_ = Teuchos::null;
515  }
516 
517 
519  using Teuchos::Array;
520  using Teuchos::CommRequest;
521  using Teuchos::FancyOStream;
522  using Teuchos::includesVerbLevel;
523  using Teuchos::is_null;
524  using Teuchos::OSTab;
525  using Teuchos::RCP;
526  using Teuchos::waitAll;
527  using std::endl;
528 
529  Teuchos::OSTab tab (out_);
530 
531 #ifdef TPETRA_DISTRIBUTOR_TIMERS
532  Teuchos::TimeMonitor timeMon (*timer_doWaits_);
533 #endif // TPETRA_DISTRIBUTOR_TIMERS
534 
535  const int myRank = comm_->getRank ();
536 
537  if (verbose_) {
538  std::ostringstream os;
539  os << myRank << ": doWaits: # reqs = "
540  << requests_.size () << endl;
541  *out_ << os.str ();
542  }
543 
544  if (requests_.size() > 0) {
545  waitAll (*comm_, requests_());
546 
547 #ifdef HAVE_TEUCHOS_DEBUG
548  // Make sure that waitAll() nulled out all the requests.
549  for (Array<RCP<CommRequest<int> > >::const_iterator it = requests_.begin();
550  it != requests_.end(); ++it)
551  {
552  TEUCHOS_TEST_FOR_EXCEPTION( ! is_null (*it), std::runtime_error,
553  Teuchos::typeName(*this) << "::doWaits(): Communication requests "
554  "should all be null aftr calling Teuchos::waitAll() on them, but "
555  "at least one request is not null.");
556  }
557 #endif // HAVE_TEUCHOS_DEBUG
558  // Restore the invariant that requests_.size() is the number of
559  // outstanding nonblocking communication requests.
560  requests_.resize (0);
561  }
562 
563 #ifdef HAVE_TEUCHOS_DEBUG
564  {
565  const int localSizeNonzero = (requests_.size () != 0) ? 1 : 0;
566  int globalSizeNonzero = 0;
567  Teuchos::reduceAll<int, int> (*comm_, Teuchos::REDUCE_MAX,
568  localSizeNonzero,
569  Teuchos::outArg (globalSizeNonzero));
570  TEUCHOS_TEST_FOR_EXCEPTION(
571  globalSizeNonzero != 0, std::runtime_error,
572  "Tpetra::Distributor::doWaits: After waitAll, at least one process has "
573  "a nonzero number of outstanding posts. There should be none at this "
574  "point. Please report this bug to the Tpetra developers.");
575  }
576 #endif // HAVE_TEUCHOS_DEBUG
577 
578  if (verbose_) {
579  std::ostringstream os;
580  os << myRank << ": doWaits done" << endl;
581  *out_ << os.str ();
582  }
583  }
584 
586  // call doWaits() on the reverse Distributor, if it exists
587  if (! reverseDistributor_.is_null()) {
588  reverseDistributor_->doWaits();
589  }
590  }
591 
592  std::string Distributor::description () const {
593  std::ostringstream out;
594 
595  out << "\"Tpetra::Distributor\": {";
596  const std::string label = this->getObjectLabel ();
597  if (label != "") {
598  out << "Label: " << label << ", ";
599  }
600  out << "How initialized: "
602  << ", Parameters: {"
603  << "Send type: "
604  << DistributorSendTypeEnumToString (sendType_)
605  << ", Barrier between receives and sends: "
606  << (barrierBetween_ ? "true" : "false")
607  << ", Use distinct tags: "
608  << (useDistinctTags_ ? "true" : "false")
609  << ", Debug: " << (verbose_ ? "true" : "false")
610  << "}}";
611  return out.str ();
612  }
613 
614  std::string
615  Distributor::
616  localDescribeToString (const Teuchos::EVerbosityLevel vl) const
617  {
618  using Teuchos::toString;
619  using Teuchos::VERB_HIGH;
620  using Teuchos::VERB_EXTREME;
621  using std::endl;
622 
623  // This preserves current behavior of Distributor.
624  if (vl <= Teuchos::VERB_LOW || comm_.is_null ()) {
625  return std::string ();
626  }
627 
628  auto outStringP = Teuchos::rcp (new std::ostringstream ());
629  auto outp = Teuchos::getFancyOStream (outStringP); // returns RCP
630  Teuchos::FancyOStream& out = *outp;
631 
632  const int myRank = comm_->getRank ();
633  const int numProcs = comm_->getSize ();
634  out << "Process " << myRank << " of " << numProcs << ":" << endl;
635  Teuchos::OSTab tab1 (out);
636 
637  out << "selfMessage: " << hasSelfMessage () << endl;
638  out << "numSends: " << getNumSends () << endl;
639  if (vl == VERB_HIGH || vl == VERB_EXTREME) {
640  out << "procsTo: " << toString (procsTo_) << endl;
641  out << "lengthsTo: " << toString (lengthsTo_) << endl;
642  out << "maxSendLength: " << getMaxSendLength () << endl;
643  }
644  if (vl == VERB_EXTREME) {
645  out << "startsTo: " << toString (startsTo_) << endl;
646  out << "indicesTo: " << toString (indicesTo_) << endl;
647  }
648  if (vl == VERB_HIGH || vl == VERB_EXTREME) {
649  out << "numReceives: " << getNumReceives () << endl;
650  out << "totalReceiveLength: " << getTotalReceiveLength () << endl;
651  out << "lengthsFrom: " << toString (lengthsFrom_) << endl;
652  out << "startsFrom: " << toString (startsFrom_) << endl;
653  out << "procsFrom: " << toString (procsFrom_) << endl;
654  }
655 
656  out.flush (); // make sure the ostringstream got everything
657  return outStringP->str ();
658  }
659 
660  void
662  describe (Teuchos::FancyOStream &out,
663  const Teuchos::EVerbosityLevel verbLevel) const
664  {
665  using std::endl;
666  using Teuchos::VERB_DEFAULT;
667  using Teuchos::VERB_NONE;
668  using Teuchos::VERB_LOW;
669  using Teuchos::VERB_MEDIUM;
670  using Teuchos::VERB_HIGH;
671  using Teuchos::VERB_EXTREME;
672  const Teuchos::EVerbosityLevel vl =
673  (verbLevel == VERB_DEFAULT) ? VERB_LOW : verbLevel;
674 
675  if (vl == VERB_NONE) {
676  return; // don't print anything
677  }
678  // If this Distributor's Comm is null, then the the calling
679  // process does not participate in Distributor-related collective
680  // operations with the other processes. In that case, it is not
681  // even legal to call this method. The reasonable thing to do in
682  // that case is nothing.
683  if (comm_.is_null ()) {
684  return;
685  }
686  const int myRank = comm_->getRank ();
687  const int numProcs = comm_->getSize ();
688 
689  // Only Process 0 should touch the output stream, but this method
690  // in general may need to do communication. Thus, we may need to
691  // preserve the current tab level across multiple "if (myRank ==
692  // 0) { ... }" inner scopes. This is why we sometimes create
693  // OSTab instances by pointer, instead of by value. We only need
694  // to create them by pointer if the tab level must persist through
695  // multiple inner scopes.
696  Teuchos::RCP<Teuchos::OSTab> tab0, tab1;
697 
698  if (myRank == 0) {
699  // At every verbosity level but VERB_NONE, Process 0 prints.
700  // By convention, describe() always begins with a tab before
701  // printing.
702  tab0 = Teuchos::rcp (new Teuchos::OSTab (out));
703  // We quote the class name because it contains colons.
704  // This makes the output valid YAML.
705  out << "\"Tpetra::Distributor\":" << endl;
706  tab1 = Teuchos::rcp (new Teuchos::OSTab (out));
707 
708  const std::string label = this->getObjectLabel ();
709  if (label != "") {
710  out << "Label: " << label << endl;
711  }
712  out << "Number of processes: " << numProcs << endl
713  << "How initialized: "
715  << endl;
716  {
717  out << "Parameters: " << endl;
718  Teuchos::OSTab tab2 (out);
719  out << "\"Send type\": "
720  << DistributorSendTypeEnumToString (sendType_) << endl
721  << "\"Barrier between receives and sends\": "
722  << (barrierBetween_ ? "true" : "false") << endl
723  << "\"Use distinct tags\": "
724  << (useDistinctTags_ ? "true" : "false") << endl
725  << "\"Debug\": " << (verbose_ ? "true" : "false") << endl;
726  }
727  } // if myRank == 0
728 
729  // This is collective over the Map's communicator.
730  if (vl > VERB_LOW) {
731  const std::string lclStr = this->localDescribeToString (vl);
732  Tpetra::Details::gathervPrint (out, lclStr, *comm_);
733  }
734 
735  out << "Reverse Distributor:";
736  if (reverseDistributor_.is_null ()) {
737  out << " null" << endl;
738  }
739  else {
740  out << endl;
741  reverseDistributor_->describe (out, vl);
742  }
743  }
744 
745  void
746  Distributor::
747  computeReceives ()
748  {
749  using Teuchos::Array;
750  using Teuchos::ArrayRCP;
751  using Teuchos::as;
752  using Teuchos::CommStatus;
753  using Teuchos::CommRequest;
754  using Teuchos::ireceive;
755  using Teuchos::RCP;
756  using Teuchos::rcp;
757  using Teuchos::REDUCE_SUM;
758  using Teuchos::receive;
759  using Teuchos::reduce;
760  using Teuchos::scatter;
761  using Teuchos::send;
762  using Teuchos::waitAll;
763  using std::endl;
764 
765  Teuchos::OSTab tab (out_);
766  const int myRank = comm_->getRank();
767  const int numProcs = comm_->getSize();
768 
769  // MPI tag for nonblocking receives and blocking sends in this method.
770  const int pathTag = 2;
771  const int tag = this->getTag (pathTag);
772 
773  std::unique_ptr<std::string> prefix;
774  if (verbose_) {
775  std::ostringstream os;
776  os << "Proc " << myRank << ": computeReceives: ";
777  prefix = std::unique_ptr<std::string> (new std::string (os.str ()));
778  os << "{selfMessage_: " << (selfMessage_ ? "true" : "false")
779  << ", tag: " << tag << "}" << endl;
780  *out_ << os.str ();
781  }
782 
783  // toProcsFromMe[i] == the number of messages sent by this process
784  // to process i. The data in numSends_, procsTo_, and lengthsTo_
785  // concern the contiguous sends. Therefore, each process will be
786  // listed in procsTo_ at most once, and so toProcsFromMe[i] will
787  // either be 0 or 1.
788  {
789  Array<int> toProcsFromMe (numProcs, 0);
790 #ifdef HAVE_TEUCHOS_DEBUG
791  bool counting_error = false;
792 #endif // HAVE_TEUCHOS_DEBUG
793  for (size_t i = 0; i < (numSends_ + (selfMessage_ ? 1 : 0)); ++i) {
794 #ifdef HAVE_TEUCHOS_DEBUG
795  if (toProcsFromMe[procsTo_[i]] != 0) {
796  counting_error = true;
797  }
798 #endif // HAVE_TEUCHOS_DEBUG
799  toProcsFromMe[procsTo_[i]] = 1;
800  }
801 #ifdef HAVE_TEUCHOS_DEBUG
802  SHARED_TEST_FOR_EXCEPTION(counting_error, std::logic_error,
803  "Tpetra::Distributor::computeReceives: There was an error on at least "
804  "one process in counting the number of messages send by that process to "
805  "the other processs. Please report this bug to the Tpetra developers.",
806  *comm_);
807 #endif // HAVE_TEUCHOS_DEBUG
808 
809  if (verbose_) {
810  std::ostringstream os;
811  os << *prefix << "Reduce & scatter" << endl;
812  *out_ << os.str ();
813  }
814 
815  // Compute the number of receives that this process needs to
816  // post. The number of receives includes any self sends (i.e.,
817  // messages sent by this process to itself).
818  //
819  // (We will use numReceives_ this below to post exactly that
820  // number of receives, with MPI_ANY_SOURCE as the sending rank.
821  // This will tell us from which processes this process expects
822  // to receive, and how many packets of data we expect to receive
823  // from each process.)
824  //
825  // toProcsFromMe[i] is the number of messages sent by this
826  // process to process i. Compute the sum (elementwise) of all
827  // the toProcsFromMe arrays on all processes in the
828  // communicator. If the array x is that sum, then if this
829  // process has rank j, x[j] is the number of messages sent
830  // to process j, that is, the number of receives on process j
831  // (including any messages sent by process j to itself).
832  //
833  // Yes, this requires storing and operating on an array of
834  // length P, where P is the number of processes in the
835  // communicator. Epetra does this too. Avoiding this O(P)
836  // memory bottleneck would require some research.
837  //
838  // mfh 09 Jan 2012, 15 Jul 2015: There are three ways to
839  // implement this O(P) memory algorithm.
840  //
841  // 1. Use MPI_Reduce and MPI_Scatter: reduce on the root
842  // process (0) from toProcsFromMe, to numRecvsOnEachProc.
843  // Then, scatter the latter, so that each process p gets
844  // numRecvsOnEachProc[p].
845  //
846  // 2. Like #1, but use MPI_Reduce_scatter instead of
847  // MPI_Reduce and MPI_Scatter. MPI_Reduce_scatter might be
848  // optimized to reduce the number of messages, but
849  // MPI_Reduce_scatter is more general than we need (it
850  // allows the equivalent of MPI_Scatterv). See Bug 6336.
851  //
852  // 3. Do an all-reduce on toProcsFromMe, and let my process
853  // (with rank myRank) get numReceives_ from
854  // toProcsFromMe[myRank]. The HPCCG miniapp uses the
855  // all-reduce method.
856  //
857  // Approaches 1 and 3 have the same critical path length.
858  // However, #3 moves more data. This is because the final
859  // result is just one integer, but #3 moves a whole array of
860  // results to all the processes. This is why we use Approach 1
861  // here.
862  //
863  // mfh 12 Apr 2013: See discussion in createFromSends() about
864  // how we could use this communication to propagate an error
865  // flag for "free" in a release build.
866 
867  const int root = 0; // rank of root process of the reduction
868  Array<int> numRecvsOnEachProc; // temp; only needed on root
869  if (myRank == root) {
870  numRecvsOnEachProc.resize (numProcs);
871  }
872  int numReceivesAsInt = 0; // output
873  reduce<int, int> (toProcsFromMe.getRawPtr (),
874  numRecvsOnEachProc.getRawPtr (),
875  numProcs, REDUCE_SUM, root, *comm_);
876  scatter<int, int> (numRecvsOnEachProc.getRawPtr (), 1,
877  &numReceivesAsInt, 1, root, *comm_);
878  numReceives_ = static_cast<size_t> (numReceivesAsInt);
879  }
880 
881  // Now we know numReceives_, which is this process' number of
882  // receives. Allocate the lengthsFrom_ and procsFrom_ arrays
883  // with this number of entries.
884  lengthsFrom_.assign (numReceives_, 0);
885  procsFrom_.assign (numReceives_, 0);
886 
887  //
888  // Ask (via nonblocking receive) each process from which we are
889  // receiving how many packets we should expect from it in the
890  // communication pattern.
891  //
892 
893  // At this point, numReceives_ includes any self message that
894  // there may be. At the end of this routine, we'll subtract off
895  // the self message (if there is one) from numReceives_. In this
896  // routine, we don't need to receive a message from ourselves in
897  // order to figure out our lengthsFrom_ and source process ID; we
898  // can just ask ourselves directly. Thus, the actual number of
899  // nonblocking receives we post here does not include the self
900  // message.
901  const size_t actualNumReceives = numReceives_ - (selfMessage_ ? 1 : 0);
902 
903  // Teuchos' wrapper for nonblocking receives requires receive
904  // buffers that it knows won't go away. This is why we use RCPs,
905  // one RCP per nonblocking receive request. They get allocated in
906  // the loop below.
907  Array<RCP<CommRequest<int> > > requests (actualNumReceives);
908  Array<ArrayRCP<size_t> > lengthsFromBuffers (actualNumReceives);
909  Array<RCP<CommStatus<int> > > statuses (actualNumReceives);
910 
911  // Teuchos::Comm treats a negative process ID as MPI_ANY_SOURCE
912  // (receive data from any process).
913 #ifdef HAVE_MPI
914  const int anySourceProc = MPI_ANY_SOURCE;
915 #else
916  const int anySourceProc = -1;
917 #endif
918 
919  if (verbose_) {
920  std::ostringstream os;
921  os << *prefix << "Post " << actualNumReceives << " irecv"
922  << (actualNumReceives != size_t (1) ? "s" : "") << endl;
923  *out_ << os.str ();
924  }
925 
926  // Post the (nonblocking) receives.
927  for (size_t i = 0; i < actualNumReceives; ++i) {
928  // Once the receive completes, we can ask the corresponding
929  // CommStatus object (output by wait()) for the sending process'
930  // ID (which we'll assign to procsFrom_[i] -- don't forget to
931  // do that!).
932  lengthsFromBuffers[i].resize (1);
933  lengthsFromBuffers[i][0] = as<size_t> (0);
934  requests[i] = ireceive<int, size_t> (lengthsFromBuffers[i], anySourceProc,
935  tag, *comm_);
936  if (verbose_) {
937  std::ostringstream os;
938  os << *prefix << "Posted any-proc irecv w/ tag " << tag << endl;
939  *out_ << os.str ();
940  }
941  }
942 
943  if (verbose_) {
944  std::ostringstream os;
945  os << *prefix << "Post " << numSends_ << " send"
946  << (numSends_ != size_t (1) ? "s" : "") << endl;
947  *out_ << os.str ();
948  }
949  // Post the sends: Tell each process to which we are sending how
950  // many packets it should expect from us in the communication
951  // pattern. We could use nonblocking sends here, as long as we do
952  // a waitAll() on all the sends and receives at once.
953  //
954  // We assume that numSends_ and selfMessage_ have already been
955  // set. The value of numSends_ (my process' number of sends) does
956  // not include any message that it might send to itself.
957  for (size_t i = 0; i < numSends_ + (selfMessage_ ? 1 : 0); ++i) {
958  if (procsTo_[i] != myRank) {
959  // Send a message to procsTo_[i], telling that process that
960  // this communication pattern will send that process
961  // lengthsTo_[i] blocks of packets.
962  const size_t* const lengthsTo_i = &lengthsTo_[i];
963  send<int, size_t> (lengthsTo_i, 1, as<int> (procsTo_[i]), tag, *comm_);
964  if (verbose_) {
965  std::ostringstream os;
966  os << *prefix << "Posted send to Proc " << procsTo_[i] << " w/ tag "
967  << tag << endl;
968  *out_ << os.str ();
969  }
970  }
971  else {
972  // We don't need a send in the self-message case. If this
973  // process will send a message to itself in the communication
974  // pattern, then the last element of lengthsFrom_ and
975  // procsFrom_ corresponds to the self-message. Of course
976  // this process knows how long the message is, and the process
977  // ID is its own process ID.
978  lengthsFrom_[numReceives_-1] = lengthsTo_[i];
979  procsFrom_[numReceives_-1] = myRank;
980  }
981  }
982 
983  if (verbose_) {
984  std::ostringstream os;
985  os << myRank << ": computeReceives: waitAll on "
986  << requests.size () << " requests" << endl;
987  *out_ << os.str ();
988  }
989  //
990  // Wait on all the receives. When they arrive, check the status
991  // output of wait() for the receiving process ID, unpack the
992  // request buffers into lengthsFrom_, and set procsFrom_ from the
993  // status.
994  //
995  waitAll (*comm_, requests (), statuses ());
996  for (size_t i = 0; i < actualNumReceives; ++i) {
997  lengthsFrom_[i] = *lengthsFromBuffers[i];
998  procsFrom_[i] = statuses[i]->getSourceRank ();
999  }
1000 
1001  // Sort the procsFrom_ array, and apply the same permutation to
1002  // lengthsFrom_. This ensures that procsFrom_[i] and
1003  // lengthsFrom_[i] refers to the same thing.
1004  sort2 (procsFrom_.begin(), procsFrom_.end(), lengthsFrom_.begin());
1005 
1006  // Compute indicesFrom_
1007  totalReceiveLength_ =
1008  std::accumulate (lengthsFrom_.begin (), lengthsFrom_.end (), 0);
1009  indicesFrom_.clear ();
1010  // NOTE (mfh 13 Feb 2019): Epetra_MpiDistributor deliberately does
1011  // _not_ fill indicesFrom_ (what it calls "indices_from_") like
1012  // this; it leaves indicesFrom_ empty. The comment there mentions
1013  // that not filling indicesFrom_ helps reverse mode correctness.
1014 #if 0
1015  indicesFrom_.reserve (totalReceiveLength_);
1016  for (size_t i = 0; i < totalReceiveLength_; ++i) {
1017  indicesFrom_.push_back(i);
1018  }
1019 #endif // 0
1020 
1021  startsFrom_.clear ();
1022  startsFrom_.reserve (numReceives_);
1023  for (size_t i = 0, j = 0; i < numReceives_; ++i) {
1024  startsFrom_.push_back(j);
1025  j += lengthsFrom_[i];
1026  }
1027 
1028  if (selfMessage_) {
1029  --numReceives_;
1030  }
1031 
1032  if (verbose_) {
1033  std::ostringstream os;
1034  os << *prefix << "Done!" << endl;
1035  *out_ << os.str ();
1036  }
1037  }
1038 
1039  size_t
1041  createFromSends (const Teuchos::ArrayView<const int>& exportProcIDs)
1042  {
1043  using Teuchos::outArg;
1044  using Teuchos::REDUCE_MAX;
1045  using Teuchos::reduceAll;
1046  using std::endl;
1047  const char rawPrefix[] = "Tpetra::Distributor::createFromSends: ";
1048 
1049  Teuchos::OSTab tab (out_);
1050  const size_t numExports = exportProcIDs.size();
1051  const int myProcID = comm_->getRank();
1052  const int numProcs = comm_->getSize();
1053 
1054  std::unique_ptr<std::string> prefix;
1055  if (verbose_) {
1056  std::ostringstream os;
1057  os << "Proc " << myProcID << ": " << rawPrefix << ": ";
1058  prefix = std::unique_ptr<std::string> (new std::string (os.str ()));
1059  os << "exportPIDs: " << exportProcIDs << endl;
1060  *out_ << os.str ();
1061  }
1062 
1063  // exportProcIDs tells us the communication pattern for this
1064  // distributor. It dictates the way that the export data will be
1065  // interpreted in doPosts(). We want to perform at most one
1066  // send per process in doPosts; this is for two reasons:
1067  // * minimize latency / overhead in the comm routines (nice)
1068  // * match the number of receives and sends between processes
1069  // (necessary)
1070  //
1071  // Teuchos::Comm requires that the data for a send are contiguous
1072  // in a send buffer. Therefore, if the data in the send buffer
1073  // for doPosts() are not contiguous, they will need to be copied
1074  // into a contiguous buffer. The user has specified this
1075  // noncontiguous pattern and we can't do anything about it.
1076  // However, if they do not provide an efficient pattern, we will
1077  // warn them if one of the following compile-time options has been
1078  // set:
1079  // * HAVE_TPETRA_THROW_EFFICIENCY_WARNINGS
1080  // * HAVE_TPETRA_PRINT_EFFICIENCY_WARNINGS
1081  //
1082  // If the data are contiguous, then we can post the sends in situ
1083  // (i.e., without needing to copy them into a send buffer).
1084  //
1085  // Determine contiguity. There are a number of ways to do this:
1086  // * If the export IDs are sorted, then all exports to a
1087  // particular proc must be contiguous. This is what Epetra does.
1088  // * If the export ID of the current export already has been
1089  // listed, then the previous listing should correspond to the
1090  // same export. This tests contiguity, but not sortedness.
1091  //
1092  // Both of these tests require O(n), where n is the number of
1093  // exports. However, the latter will positively identify a greater
1094  // portion of contiguous patterns. We use the latter method.
1095  //
1096  // Check to see if values are grouped by procs without gaps
1097  // If so, indices_to -> 0.
1098 
1099  // Set up data structures for quick traversal of arrays.
1100  // This contains the number of sends for each process ID.
1101  //
1102  // FIXME (mfh 20 Mar 2014) This is one of a few places in Tpetra
1103  // that create an array of length the number of processes in the
1104  // communicator (plus one). Given how this code uses this array,
1105  // it should be straightforward to replace it with a hash table or
1106  // some other more space-efficient data structure. In practice,
1107  // most of the entries of starts should be zero for a sufficiently
1108  // large process count, unless the communication pattern is dense.
1109  // Note that it's important to be able to iterate through keys (i
1110  // for which starts[i] is nonzero) in increasing order.
1111  Teuchos::Array<size_t> starts (numProcs + 1, 0);
1112 
1113  // numActive is the number of sends that are not Null
1114  size_t numActive = 0;
1115  int needSendBuff = 0; // Boolean
1116 
1117 #ifdef HAVE_TPETRA_DEBUG
1118  int badID = -1; // only used in a debug build
1119 #endif // HAVE_TPETRA_DEBUG
1120  for (size_t i = 0; i < numExports; ++i) {
1121  const int exportID = exportProcIDs[i];
1122  if (exportID >= numProcs) {
1123 #ifdef HAVE_TPETRA_DEBUG
1124  badID = myProcID;
1125 #endif // HAVE_TPETRA_DEBUG
1126  break;
1127  }
1128  else if (exportID >= 0) {
1129  // exportID is a valid process ID. Increment the number of
1130  // messages this process will send to that process.
1131  ++starts[exportID];
1132 
1133  // If we're sending more than one message to process exportID,
1134  // then it is possible that the data are not contiguous.
1135  // Check by seeing if the previous process ID in the list
1136  // (exportProcIDs[i-1]) is the same. It's safe to use i-1,
1137  // because if starts[exportID] > 1, then i must be > 1 (since
1138  // the starts array was filled with zeros initially).
1139 
1140  // null entries break continuity.
1141  // e.g., [ 0, 0, 0, 1, -99, 1, 2, 2, 2] is not contiguous
1142  if (needSendBuff == 0 && starts[exportID] > 1 &&
1143  exportID != exportProcIDs[i-1]) {
1144  needSendBuff = 1;
1145  }
1146  ++numActive;
1147  }
1148  }
1149 
1150 #ifdef HAVE_TPETRA_DEBUG
1151  // Test whether any process in the communicator got an invalid
1152  // process ID. If badID != -1 on this process, then it equals
1153  // this process' rank. The max of all badID over all processes is
1154  // the max rank which has an invalid process ID.
1155  {
1156  int gbl_badID;
1157  reduceAll<int, int> (*comm_, REDUCE_MAX, badID, outArg (gbl_badID));
1158  TEUCHOS_TEST_FOR_EXCEPTION(gbl_badID >= 0, std::runtime_error,
1159  Teuchos::typeName(*this) << "::createFromSends: Proc " << gbl_badID
1160  << ", perhaps among other processes, got a bad send process ID.");
1161  }
1162 #else
1163  // FIXME (mfh 12 Apr 2013, 15 Jul 2015) Rather than simply
1164  // ignoring this information, we should think about how to pass it
1165  // along so that all the processes find out about it. In a
1166  // release build with efficiency warnings turned off, the next
1167  // collective communication happens in computeReceives(). We
1168  // could figure out how to encode the error flag in that
1169  // operation, for example by adding an extra entry to the
1170  // collective's output array that encodes the error condition (0
1171  // on all processes if no error, else 1 on any process with the
1172  // error, so that the sum will produce a nonzero value if any
1173  // process had an error). I'll defer this change for now and
1174  // recommend instead that people with troubles try a debug build.
1175 #endif // HAVE_TPETRA_DEBUG
1176 
1177 #if defined(HAVE_TPETRA_THROW_EFFICIENCY_WARNINGS) || defined(HAVE_TPETRA_PRINT_EFFICIENCY_WARNINGS)
1178  {
1179  int global_needSendBuff;
1180  reduceAll<int, int> (*comm_, REDUCE_MAX, needSendBuff,
1181  outArg (global_needSendBuff));
1183  global_needSendBuff != 0, std::runtime_error,
1184  "::createFromSends: Grouping export IDs together by process rank often "
1185  "improves performance.");
1186  }
1187 #endif
1188 
1189  // Determine from the caller's data whether or not the current
1190  // process should send (a) message(s) to itself.
1191  if (starts[myProcID] != 0) {
1192  selfMessage_ = true;
1193  }
1194  else {
1195  selfMessage_ = false;
1196  }
1197 
1198 #ifdef HAVE_TEUCHOS_DEBUG
1199  bool index_neq_numActive = false;
1200  bool send_neq_numSends = false;
1201 #endif
1202  if (! needSendBuff) {
1203  // grouped by proc, no send buffer or indicesTo_ needed
1204  numSends_ = 0;
1205  // Count total number of sends, i.e., total number of procs to
1206  // which we are sending. This includes myself, if applicable.
1207  for (int i = 0; i < numProcs; ++i) {
1208  if (starts[i]) {
1209  ++numSends_;
1210  }
1211  }
1212 
1213  // Not only do we not need these, but we must clear them, as
1214  // empty status of indicesTo is a flag used later.
1215  indicesTo_.resize(0);
1216  // Size these to numSends_; note, at the moment, numSends_
1217  // includes self sends. Set their values to zeros.
1218  procsTo_.assign(numSends_,0);
1219  startsTo_.assign(numSends_,0);
1220  lengthsTo_.assign(numSends_,0);
1221 
1222  // set startsTo to the offset for each send (i.e., each proc ID)
1223  // set procsTo to the proc ID for each send
1224  // in interpreting this code, remember that we are assuming contiguity
1225  // that is why index skips through the ranks
1226  {
1227  size_t index = 0, procIndex = 0;
1228  for (size_t i = 0; i < numSends_; ++i) {
1229  while (exportProcIDs[procIndex] < 0) {
1230  ++procIndex; // skip all negative proc IDs
1231  }
1232  startsTo_[i] = procIndex;
1233  int procID = exportProcIDs[procIndex];
1234  procsTo_[i] = procID;
1235  index += starts[procID];
1236  procIndex += starts[procID];
1237  }
1238 #ifdef HAVE_TEUCHOS_DEBUG
1239  if (index != numActive) {
1240  index_neq_numActive = true;
1241  }
1242 #endif
1243  }
1244  // sort the startsTo and proc IDs together, in ascending order, according
1245  // to proc IDs
1246  if (numSends_ > 0) {
1247  sort2(procsTo_.begin(), procsTo_.end(), startsTo_.begin());
1248  }
1249  // compute the maximum send length
1250  maxSendLength_ = 0;
1251  for (size_t i = 0; i < numSends_; ++i) {
1252  int procID = procsTo_[i];
1253  lengthsTo_[i] = starts[procID];
1254  if ((procID != myProcID) && (lengthsTo_[i] > maxSendLength_)) {
1255  maxSendLength_ = lengthsTo_[i];
1256  }
1257  }
1258  }
1259  else {
1260  // not grouped by proc, need send buffer and indicesTo_
1261 
1262  // starts[i] is the number of sends to proc i
1263  // numActive equals number of sends total, \sum_i starts[i]
1264 
1265  // this loop starts at starts[1], so explicitly check starts[0]
1266  if (starts[0] == 0 ) {
1267  numSends_ = 0;
1268  }
1269  else {
1270  numSends_ = 1;
1271  }
1272  for (Teuchos::Array<size_t>::iterator i=starts.begin()+1,
1273  im1=starts.begin();
1274  i != starts.end(); ++i)
1275  {
1276  if (*i != 0) ++numSends_;
1277  *i += *im1;
1278  im1 = i;
1279  }
1280  // starts[i] now contains the number of exports to procs 0 through i
1281 
1282  for (Teuchos::Array<size_t>::reverse_iterator ip1=starts.rbegin(),
1283  i=starts.rbegin()+1;
1284  i != starts.rend(); ++i)
1285  {
1286  *ip1 = *i;
1287  ip1 = i;
1288  }
1289  starts[0] = 0;
1290  // starts[i] now contains the number of exports to procs 0 through
1291  // i-1, i.e., all procs before proc i
1292 
1293  indicesTo_.resize(numActive);
1294 
1295  for (size_t i = 0; i < numExports; ++i) {
1296  if (exportProcIDs[i] >= 0) {
1297  // record the offset to the sendBuffer for this export
1298  indicesTo_[starts[exportProcIDs[i]]] = i;
1299  // now increment the offset for this proc
1300  ++starts[exportProcIDs[i]];
1301  }
1302  }
1303  // our send buffer will contain the export data for each of the procs
1304  // we communicate with, in order by proc id
1305  // sendBuffer = {proc_0_data, proc_1_data, ..., proc_np-1_data}
1306  // indicesTo now maps each export to the location in our send buffer
1307  // associated with the export
1308  // data for export i located at sendBuffer[indicesTo[i]]
1309  //
1310  // starts[i] once again contains the number of exports to
1311  // procs 0 through i
1312  for (int proc = numProcs-1; proc != 0; --proc) {
1313  starts[proc] = starts[proc-1];
1314  }
1315  starts.front() = 0;
1316  starts[numProcs] = numActive;
1317  //
1318  // starts[proc] once again contains the number of exports to
1319  // procs 0 through proc-1
1320  // i.e., the start of my data in the sendBuffer
1321 
1322  // this contains invalid data at procs we don't care about, that is okay
1323  procsTo_.resize(numSends_);
1324  startsTo_.resize(numSends_);
1325  lengthsTo_.resize(numSends_);
1326 
1327  // for each group of sends/exports, record the destination proc,
1328  // the length, and the offset for this send into the
1329  // send buffer (startsTo_)
1330  maxSendLength_ = 0;
1331  size_t snd = 0;
1332  for (int proc = 0; proc < numProcs; ++proc ) {
1333  if (starts[proc+1] != starts[proc]) {
1334  lengthsTo_[snd] = starts[proc+1] - starts[proc];
1335  startsTo_[snd] = starts[proc];
1336  // record max length for all off-proc sends
1337  if ((proc != myProcID) && (lengthsTo_[snd] > maxSendLength_)) {
1338  maxSendLength_ = lengthsTo_[snd];
1339  }
1340  procsTo_[snd] = proc;
1341  ++snd;
1342  }
1343  }
1344 #ifdef HAVE_TEUCHOS_DEBUG
1345  if (snd != numSends_) {
1346  send_neq_numSends = true;
1347  }
1348 #endif
1349  }
1350 #ifdef HAVE_TEUCHOS_DEBUG
1351  SHARED_TEST_FOR_EXCEPTION(index_neq_numActive, std::logic_error,
1352  "Tpetra::Distributor::createFromSends: logic error. Please notify the Tpetra team.",*comm_);
1353  SHARED_TEST_FOR_EXCEPTION(send_neq_numSends, std::logic_error,
1354  "Tpetra::Distributor::createFromSends: logic error. Please notify the Tpetra team.",*comm_);
1355 #endif
1356 
1357  if (selfMessage_) --numSends_;
1358 
1359  // Invert map to see what msgs are received and what length
1360  computeReceives();
1361 
1362  if (verbose_) {
1363  std::ostringstream os;
1364  os << *prefix << "Done!" << endl;
1365  *out_ << os.str ();
1366  }
1367 
1368  // createFromRecvs() calls createFromSends(), but will set
1369  // howInitialized_ again after calling createFromSends().
1370  howInitialized_ = Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS;
1371 
1372  return totalReceiveLength_;
1373  }
1374 
1375  void
1377  createFromSendsAndRecvs (const Teuchos::ArrayView<const int>& exportProcIDs,
1378  const Teuchos::ArrayView<const int>& remoteProcIDs)
1379  {
1380  // note the exportProcIDs and remoteProcIDs _must_ be a list that has
1381  // an entry for each GID. If the export/remoteProcIDs is taken from
1382  // the getProcs{From|To} lists that are extracted from a previous distributor,
1383  // it will generate a wrong answer, because those lists have a unique entry
1384  // for each processor id. A version of this with lengthsTo and lengthsFrom
1385  // should be made.
1386 
1387  howInitialized_ = Tpetra::Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS_N_RECVS;
1388 
1389 
1390  int myProcID = comm_->getRank ();
1391  int numProcs = comm_->getSize();
1392 
1393  const size_t numExportIDs = exportProcIDs.size();
1394  Teuchos::Array<size_t> starts (numProcs + 1, 0);
1395 
1396  size_t numActive = 0;
1397  int needSendBuff = 0; // Boolean
1398 
1399  for(size_t i = 0; i < numExportIDs; i++ )
1400  {
1401  if( needSendBuff==0 && i && (exportProcIDs[i] < exportProcIDs[i-1]) )
1402  needSendBuff = 1;
1403  if( exportProcIDs[i] >= 0 )
1404  {
1405  ++starts[ exportProcIDs[i] ];
1406  ++numActive;
1407  }
1408  }
1409 
1410  selfMessage_ = ( starts[myProcID] != 0 ) ? 1 : 0;
1411 
1412  numSends_ = 0;
1413 
1414  if( needSendBuff ) //grouped by processor, no send buffer or indicesTo_ needed
1415  {
1416  if (starts[0] == 0 ) {
1417  numSends_ = 0;
1418  }
1419  else {
1420  numSends_ = 1;
1421  }
1422  for (Teuchos::Array<size_t>::iterator i=starts.begin()+1,
1423  im1=starts.begin();
1424  i != starts.end(); ++i)
1425  {
1426  if (*i != 0) ++numSends_;
1427  *i += *im1;
1428  im1 = i;
1429  }
1430  // starts[i] now contains the number of exports to procs 0 through i
1431 
1432  for (Teuchos::Array<size_t>::reverse_iterator ip1=starts.rbegin(),
1433  i=starts.rbegin()+1;
1434  i != starts.rend(); ++i)
1435  {
1436  *ip1 = *i;
1437  ip1 = i;
1438  }
1439  starts[0] = 0;
1440  // starts[i] now contains the number of exports to procs 0 through
1441  // i-1, i.e., all procs before proc i
1442 
1443  indicesTo_.resize(numActive);
1444 
1445  for (size_t i = 0; i < numExportIDs; ++i) {
1446  if (exportProcIDs[i] >= 0) {
1447  // record the offset to the sendBuffer for this export
1448  indicesTo_[starts[exportProcIDs[i]]] = i;
1449  // now increment the offset for this proc
1450  ++starts[exportProcIDs[i]];
1451  }
1452  }
1453  for (int proc = numProcs-1; proc != 0; --proc) {
1454  starts[proc] = starts[proc-1];
1455  }
1456  starts.front() = 0;
1457  starts[numProcs] = numActive;
1458  procsTo_.resize(numSends_);
1459  startsTo_.resize(numSends_);
1460  lengthsTo_.resize(numSends_);
1461  maxSendLength_ = 0;
1462  size_t snd = 0;
1463  for (int proc = 0; proc < numProcs; ++proc ) {
1464  if (starts[proc+1] != starts[proc]) {
1465  lengthsTo_[snd] = starts[proc+1] - starts[proc];
1466  startsTo_[snd] = starts[proc];
1467  // record max length for all off-proc sends
1468  if ((proc != myProcID) && (lengthsTo_[snd] > maxSendLength_)) {
1469  maxSendLength_ = lengthsTo_[snd];
1470  }
1471  procsTo_[snd] = proc;
1472  ++snd;
1473  }
1474  }
1475  }
1476  else {
1477  // grouped by proc, no send buffer or indicesTo_ needed
1478  numSends_ = 0;
1479  // Count total number of sends, i.e., total number of procs to
1480  // which we are sending. This includes myself, if applicable.
1481  for (int i = 0; i < numProcs; ++i) {
1482  if (starts[i]) {
1483  ++numSends_;
1484  }
1485  }
1486 
1487  // Not only do we not need these, but we must clear them, as
1488  // empty status of indicesTo is a flag used later.
1489  indicesTo_.resize(0);
1490  // Size these to numSends_; note, at the moment, numSends_
1491  // includes self sends. Set their values to zeros.
1492  procsTo_.assign(numSends_,0);
1493  startsTo_.assign(numSends_,0);
1494  lengthsTo_.assign(numSends_,0);
1495 
1496  // set startsTo to the offset for each send (i.e., each proc ID)
1497  // set procsTo to the proc ID for each send
1498  // in interpreting this code, remember that we are assuming contiguity
1499  // that is why index skips through the ranks
1500  {
1501  size_t index = 0, procIndex = 0;
1502  for (size_t i = 0; i < numSends_; ++i) {
1503  while (exportProcIDs[procIndex] < 0) {
1504  ++procIndex; // skip all negative proc IDs
1505  }
1506  startsTo_[i] = procIndex;
1507  int procID = exportProcIDs[procIndex];
1508  procsTo_[i] = procID;
1509  index += starts[procID];
1510  procIndex += starts[procID];
1511  }
1512  }
1513  // sort the startsTo and proc IDs together, in ascending order, according
1514  // to proc IDs
1515  if (numSends_ > 0) {
1516  sort2(procsTo_.begin(), procsTo_.end(), startsTo_.begin());
1517  }
1518  // compute the maximum send length
1519  maxSendLength_ = 0;
1520  for (size_t i = 0; i < numSends_; ++i) {
1521  int procID = procsTo_[i];
1522  lengthsTo_[i] = starts[procID];
1523  if ((procID != myProcID) && (lengthsTo_[i] > maxSendLength_)) {
1524  maxSendLength_ = lengthsTo_[i];
1525  }
1526  }
1527  }
1528 
1529 
1530  numSends_ -= selfMessage_;
1531  std::vector<int> recv_list;
1532  recv_list.reserve(numSends_); //reserve an initial guess for size needed
1533 
1534  int last_pid=-2;
1535  for(int i=0; i<remoteProcIDs.size(); i++) {
1536  if(remoteProcIDs[i]>last_pid) {
1537  recv_list.push_back(remoteProcIDs[i]);
1538  last_pid = remoteProcIDs[i];
1539  }
1540  else if (remoteProcIDs[i]<last_pid)
1541  throw std::runtime_error("Tpetra::Distributor:::createFromSendsAndRecvs expected RemotePIDs to be in sorted order");
1542  }
1543  numReceives_ = recv_list.size();
1544  if(numReceives_) {
1545  procsFrom_.assign(numReceives_,0);
1546  lengthsFrom_.assign(numReceives_,0);
1547  indicesFrom_.assign(numReceives_,0);
1548  startsFrom_.assign(numReceives_,0);
1549  }
1550  for(size_t i=0,j=0; i<numReceives_; ++i) {
1551  int jlast=j;
1552  procsFrom_[i] = recv_list[i];
1553  startsFrom_[i] = j;
1554  for( ; j<(size_t)remoteProcIDs.size() &&
1555  remoteProcIDs[jlast]==remoteProcIDs[j] ; j++){;}
1556  lengthsFrom_[i] = j-jlast;
1557  }
1558  totalReceiveLength_ = remoteProcIDs.size();
1559  indicesFrom_.clear ();
1560  // NOTE (mfh 13 Feb 2019): Epetra_MpiDistributor deliberately does
1561  // _not_ fill indicesFrom_ (what it calls "indices_from_") like
1562  // this; it leaves indicesFrom_ empty. The comment there mentions
1563  // that not filling indicesFrom_ helps reverse mode correctness.
1564 #if 0
1565  indicesFrom_.reserve (totalReceiveLength_);
1566  for (size_t i = 0; i < totalReceiveLength_; ++i) {
1567  indicesFrom_.push_back(i);
1568  }
1569 #endif // 0
1570  numReceives_-=selfMessage_;
1571  }
1572 
1573 } // namespace Tpetra
size_t getNumReceives() const
The number of processes from which we will receive data.
std::string description() const
Return a one-line description of this object.
EDistributorHowInitialized
Enum indicating how and whether a Distributor was initialized.
Teuchos::RCP< const Teuchos::ParameterList > getValidParameters() const
List of valid Distributor parameters.
void swap(Distributor &rhs)
Swap the contents of rhs with those of *this.
std::string DistributorSendTypeEnumToString(EDistributorSendType sendType)
Convert an EDistributorSendType enum value to a string.
Teuchos::ArrayView< const size_t > getLengthsFrom() const
Number of values this process will receive from each process.
Teuchos::ArrayView< const int > getProcsFrom() const
Ranks of the processes sending values to this process.
size_t createFromSends(const Teuchos::ArrayView< const int > &exportProcIDs)
Set up Distributor using list of process ranks to which this process will send.
void gathervPrint(std::ostream &out, const std::string &s, const Teuchos::Comm< int > &comm)
On Process 0 in the given communicator, print strings from each process in that communicator, in rank order.
Teuchos::ArrayView< const int > getProcsTo() const
Ranks of the processes to which this process will send values.
void createFromSendsAndRecvs(const Teuchos::ArrayView< const int > &exportProcIDs, const Teuchos::ArrayView< const int > &remoteProcIDs)
Set up Distributor using list of process ranks to which to send, and list of process ranks from which...
bool hasSelfMessage() const
Whether the calling process will send or receive messages to itself.
Sets up and executes a communication plan for a Tpetra DistObject.
size_t getTotalReceiveLength() const
Total number of values this process will receive from other processes.
void setParameterList(const Teuchos::RCP< Teuchos::ParameterList > &plist)
Set Distributor parameters.
#define TPETRA_EFFICIENCY_WARNING(throw_exception_test, Exception, msg)
Print or throw an efficency warning.
Teuchos::ArrayView< const size_t > getLengthsTo() const
Number of values this process will send to each process.
Teuchos::Array< std::string > distributorSendTypes()
Valid values for Distributor&#39;s &quot;Send type&quot; parameter.
void sort2(const IT1 &first1, const IT1 &last1, const IT2 &first2)
Sort the first array, and apply the resulting permutation to the second array.
std::string DistributorHowInitializedEnumToString(EDistributorHowInitialized how)
Convert an EDistributorHowInitialized enum value to a string.
size_t getNumSends() const
The number of processes to which we will send data.
void describe(Teuchos::FancyOStream &out, const Teuchos::EVerbosityLevel verbLevel=Teuchos::Describable::verbLevel_default) const
Describe this object in a human-readable way to the given output stream.
size_t getMaxSendLength() const
Maximum number of values this process will send to another single process.
#define SHARED_TEST_FOR_EXCEPTION(throw_exception_test, Exception, msg, comm)
Test for exception, with reduction over the given communicator.
Teuchos::RCP< Distributor > getReverse() const
A reverse communication plan Distributor.
EDistributorSendType
The type of MPI send that Distributor should use.
Distributor(const Teuchos::RCP< const Teuchos::Comm< int > > &comm)
Construct using the specified communicator and default parameters.