Tpetra parallel linear algebra  Version of the Day
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
Tpetra_Distributor.cpp
1 // ***********************************************************************
2 //
3 // Tpetra: Templated Linear Algebra Services Package
4 // Copyright (2008) Sandia Corporation
5 //
6 // Under the terms of Contract DE-AC04-94AL85000 with Sandia Corporation,
7 // the U.S. Government retains certain rights in this software.
8 //
9 // Redistribution and use in source and binary forms, with or without
10 // modification, are permitted provided that the following conditions are
11 // met:
12 //
13 // 1. Redistributions of source code must retain the above copyright
14 // notice, this list of conditions and the following disclaimer.
15 //
16 // 2. Redistributions in binary form must reproduce the above copyright
17 // notice, this list of conditions and the following disclaimer in the
18 // documentation and/or other materials provided with the distribution.
19 //
20 // 3. Neither the name of the Corporation nor the names of the
21 // contributors may be used to endorse or promote products derived from
22 // this software without specific prior written permission.
23 //
24 // THIS SOFTWARE IS PROVIDED BY SANDIA CORPORATION "AS IS" AND ANY
25 // EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
27 // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SANDIA CORPORATION OR THE
28 // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
29 // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
30 // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
31 // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
32 // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
33 // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
34 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
35 //
36 // ************************************************************************
37 // @HEADER
38 
39 #include "Tpetra_Distributor.hpp"
42 #include "Tpetra_Util.hpp"
43 #include "Tpetra_Details_makeValidVerboseStream.hpp"
44 #include "Teuchos_StandardParameterEntryValidators.hpp"
45 #include "Teuchos_VerboseObjectParameterListHelpers.hpp"
46 #include <numeric>
47 
48 namespace Tpetra {
49  namespace Details {
50  std::string
52  {
53  if (sendType == DISTRIBUTOR_ISEND) {
54  return "Isend";
55  }
56  else if (sendType == DISTRIBUTOR_RSEND) {
57  return "Rsend";
58  }
59  else if (sendType == DISTRIBUTOR_SEND) {
60  return "Send";
61  }
62  else if (sendType == DISTRIBUTOR_SSEND) {
63  return "Ssend";
64  }
65  else {
66  TEUCHOS_TEST_FOR_EXCEPTION(true, std::invalid_argument, "Invalid "
67  "EDistributorSendType enum value " << sendType << ".");
68  }
69  }
70 
71  std::string
73  {
74  switch (how) {
75  case Details::DISTRIBUTOR_NOT_INITIALIZED:
76  return "Not initialized yet";
77  case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS:
78  return "By createFromSends";
79  case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_RECVS:
80  return "By createFromRecvs";
81  case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS_N_RECVS:
82  return "By createFromSendsAndRecvs";
83  case Details::DISTRIBUTOR_INITIALIZED_BY_REVERSE:
84  return "By createReverseDistributor";
85  case Details::DISTRIBUTOR_INITIALIZED_BY_COPY:
86  return "By copy constructor";
87  default:
88  return "INVALID";
89  }
90  }
91  } // namespace Details
92 
93  Teuchos::Array<std::string>
95  {
96  Teuchos::Array<std::string> sendTypes;
97  sendTypes.push_back ("Isend");
98  sendTypes.push_back ("Rsend");
99  sendTypes.push_back ("Send");
100  sendTypes.push_back ("Ssend");
101  return sendTypes;
102  }
103 
104  // We set default values of Distributor's Boolean parameters here,
105  // in this one place. That way, if we want to change the default
106  // value of a parameter, we don't have to search the whole file to
107  // ensure a consistent setting.
108  namespace {
109  // Default value of the "Debug" parameter.
110  const bool tpetraDistributorDebugDefault = false;
111  // Default value of the "Barrier between receives and sends" parameter.
112  const bool barrierBetween_default = false;
113  // Default value of the "Use distinct tags" parameter.
114  const bool useDistinctTags_default = true;
115  } // namespace (anonymous)
116 
117  int Distributor::getTag (const int pathTag) const {
118  return useDistinctTags_ ? pathTag : comm_->getTag ();
119  }
120 
121 
122 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
123  void Distributor::makeTimers () {
124  timer_doWaits_ = Teuchos::TimeMonitor::getNewTimer (
125  "Tpetra::Distributor: doWaits");
126 
127  timer_doPosts3TA_ = Teuchos::TimeMonitor::getNewTimer (
128  "Tpetra::Distributor: doPosts(3) TA");
129  timer_doPosts4TA_ = Teuchos::TimeMonitor::getNewTimer (
130  "Tpetra::Distributor: doPosts(4) TA");
131 
132  timer_doPosts3TA_recvs_ = Teuchos::TimeMonitor::getNewTimer (
133  "Tpetra::Distributor: doPosts(3): recvs TA");
134  timer_doPosts4TA_recvs_ = Teuchos::TimeMonitor::getNewTimer (
135  "Tpetra::Distributor: doPosts(4): recvs TA");
136 
137  timer_doPosts3TA_barrier_ = Teuchos::TimeMonitor::getNewTimer (
138  "Tpetra::Distributor: doPosts(3): barrier TA");
139  timer_doPosts4TA_barrier_ = Teuchos::TimeMonitor::getNewTimer (
140  "Tpetra::Distributor: doPosts(4): barrier TA");
141 
142  timer_doPosts3TA_sends_ = Teuchos::TimeMonitor::getNewTimer (
143  "Tpetra::Distributor: doPosts(3): sends TA");
144  timer_doPosts4TA_sends_ = Teuchos::TimeMonitor::getNewTimer (
145  "Tpetra::Distributor: doPosts(4): sends TA");
146  timer_doPosts3TA_sends_slow_ = Teuchos::TimeMonitor::getNewTimer (
147  "Tpetra::Distributor: doPosts(3): sends TA SLOW");
148  timer_doPosts4TA_sends_slow_ = Teuchos::TimeMonitor::getNewTimer (
149  "Tpetra::Distributor: doPosts(4): sends TA SLOW");
150  timer_doPosts3TA_sends_fast_ = Teuchos::TimeMonitor::getNewTimer (
151  "Tpetra::Distributor: doPosts(3): sends TA FAST");
152  timer_doPosts4TA_sends_fast_ = Teuchos::TimeMonitor::getNewTimer (
153  "Tpetra::Distributor: doPosts(4): sends TA FAST");
154 
155  timer_doPosts3KV_ = Teuchos::TimeMonitor::getNewTimer (
156  "Tpetra::Distributor: doPosts(3) KV");
157  timer_doPosts4KV_ = Teuchos::TimeMonitor::getNewTimer (
158  "Tpetra::Distributor: doPosts(4) KV");
159 
160  timer_doPosts3KV_recvs_ = Teuchos::TimeMonitor::getNewTimer (
161  "Tpetra::Distributor: doPosts(3): recvs KV");
162  timer_doPosts4KV_recvs_ = Teuchos::TimeMonitor::getNewTimer (
163  "Tpetra::Distributor: doPosts(4): recvs KV");
164 
165  timer_doPosts3KV_barrier_ = Teuchos::TimeMonitor::getNewTimer (
166  "Tpetra::Distributor: doPosts(3): barrier KV");
167  timer_doPosts4KV_barrier_ = Teuchos::TimeMonitor::getNewTimer (
168  "Tpetra::Distributor: doPosts(4): barrier KV");
169 
170  timer_doPosts3KV_sends_ = Teuchos::TimeMonitor::getNewTimer (
171  "Tpetra::Distributor: doPosts(3): sends KV");
172  timer_doPosts4KV_sends_ = Teuchos::TimeMonitor::getNewTimer (
173  "Tpetra::Distributor: doPosts(4): sends KV");
174  timer_doPosts3KV_sends_slow_ = Teuchos::TimeMonitor::getNewTimer (
175  "Tpetra::Distributor: doPosts(3): sends KV SLOW");
176  timer_doPosts4KV_sends_slow_ = Teuchos::TimeMonitor::getNewTimer (
177  "Tpetra::Distributor: doPosts(4): sends KV SLOW");
178  timer_doPosts3KV_sends_fast_ = Teuchos::TimeMonitor::getNewTimer (
179  "Tpetra::Distributor: doPosts(3): sends KV FAST");
180  timer_doPosts4KV_sends_fast_ = Teuchos::TimeMonitor::getNewTimer (
181  "Tpetra::Distributor: doPosts(4): sends KV FAST");
182  }
183 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
184 
186  Distributor (const Teuchos::RCP<const Teuchos::Comm<int> >& comm,
187  const Teuchos::RCP<Teuchos::FancyOStream>& /* out */,
188  const Teuchos::RCP<Teuchos::ParameterList>& plist)
189  : comm_ (comm)
190  , howInitialized_ (Details::DISTRIBUTOR_NOT_INITIALIZED)
191  , sendType_ (Details::DISTRIBUTOR_SEND)
192  , barrierBetween_ (barrierBetween_default)
193  , selfMessage_ (false)
194  , numSends_ (0)
195  , maxSendLength_ (0)
196  , numReceives_ (0)
197  , totalReceiveLength_ (0)
198  , lastRoundBytesSend_ (0)
199  , lastRoundBytesRecv_ (0)
200  , useDistinctTags_ (useDistinctTags_default)
201  {
202  this->setParameterList(plist);
203 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
204  makeTimers ();
205 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
206  }
207 
209  Distributor (const Teuchos::RCP<const Teuchos::Comm<int> >& comm)
210  : Distributor (comm, Teuchos::null, Teuchos::null)
211  {}
212 
214  Distributor (const Teuchos::RCP<const Teuchos::Comm<int> >& comm,
215  const Teuchos::RCP<Teuchos::FancyOStream>& out)
216  : Distributor (comm, out, Teuchos::null)
217  {}
218 
220  Distributor (const Teuchos::RCP<const Teuchos::Comm<int> >& comm,
221  const Teuchos::RCP<Teuchos::ParameterList>& plist)
222  : Distributor (comm, Teuchos::null, plist)
223  {}
224 
226  Distributor (const Distributor& distributor)
227  : comm_ (distributor.comm_)
228  , howInitialized_ (Details::DISTRIBUTOR_INITIALIZED_BY_COPY)
229  , sendType_ (distributor.sendType_)
230  , barrierBetween_ (distributor.barrierBetween_)
231  , verbose_ (distributor.verbose_)
232  , selfMessage_ (distributor.selfMessage_)
233  , numSends_ (distributor.numSends_)
234  , procsTo_ (distributor.procsTo_)
235  , startsTo_ (distributor.startsTo_)
236  , lengthsTo_ (distributor.lengthsTo_)
237  , maxSendLength_ (distributor.maxSendLength_)
238  , indicesTo_ (distributor.indicesTo_)
239  , numReceives_ (distributor.numReceives_)
240  , totalReceiveLength_ (distributor.totalReceiveLength_)
241  , lengthsFrom_ (distributor.lengthsFrom_)
242  , procsFrom_ (distributor.procsFrom_)
243  , startsFrom_ (distributor.startsFrom_)
244  , indicesFrom_ (distributor.indicesFrom_)
245  , reverseDistributor_ (distributor.reverseDistributor_)
246  , lastRoundBytesSend_ (distributor.lastRoundBytesSend_)
247  , lastRoundBytesRecv_ (distributor.lastRoundBytesRecv_)
248  , useDistinctTags_ (distributor.useDistinctTags_)
249  {
250  using Teuchos::ParameterList;
251  using Teuchos::RCP;
252  using Teuchos::rcp;
253 
254  RCP<const ParameterList> rhsList = distributor.getParameterList ();
255  RCP<ParameterList> newList = rhsList.is_null () ? Teuchos::null :
256  Teuchos::parameterList (*rhsList);
257  this->setParameterList (newList);
258 
259 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
260  makeTimers ();
261 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
262  }
263 
265  using Teuchos::ParameterList;
266  using Teuchos::parameterList;
267  using Teuchos::RCP;
268 
269  std::swap (comm_, rhs.comm_);
270  std::swap (howInitialized_, rhs.howInitialized_);
271  std::swap (sendType_, rhs.sendType_);
272  std::swap (barrierBetween_, rhs.barrierBetween_);
273  std::swap (verbose_, rhs.verbose_);
274  std::swap (selfMessage_, rhs.selfMessage_);
275  std::swap (numSends_, rhs.numSends_);
276  std::swap (procsTo_, rhs.procsTo_);
277  std::swap (startsTo_, rhs.startsTo_);
278  std::swap (lengthsTo_, rhs.lengthsTo_);
279  std::swap (maxSendLength_, rhs.maxSendLength_);
280  std::swap (indicesTo_, rhs.indicesTo_);
281  std::swap (numReceives_, rhs.numReceives_);
282  std::swap (totalReceiveLength_, rhs.totalReceiveLength_);
283  std::swap (lengthsFrom_, rhs.lengthsFrom_);
284  std::swap (procsFrom_, rhs.procsFrom_);
285  std::swap (startsFrom_, rhs.startsFrom_);
286  std::swap (indicesFrom_, rhs.indicesFrom_);
287  std::swap (reverseDistributor_, rhs.reverseDistributor_);
288  std::swap (lastRoundBytesSend_, rhs.lastRoundBytesSend_);
289  std::swap (lastRoundBytesRecv_, rhs.lastRoundBytesRecv_);
290  std::swap (useDistinctTags_, rhs.useDistinctTags_);
291 
292  // Swap parameter lists. If they are the same object, make a deep
293  // copy first, so that modifying one won't modify the other one.
294  RCP<ParameterList> lhsList = this->getNonconstParameterList ();
295  RCP<ParameterList> rhsList = rhs.getNonconstParameterList ();
296  if (lhsList.getRawPtr () == rhsList.getRawPtr () && ! rhsList.is_null ()) {
297  rhsList = parameterList (*rhsList);
298  }
299  if (! rhsList.is_null ()) {
300  this->setMyParamList (rhsList);
301  }
302  if (! lhsList.is_null ()) {
303  rhs.setMyParamList (lhsList);
304  }
305 
306  // We don't need to swap timers, because all instances of
307  // Distributor use the same timers.
308  }
309 
310  bool
311  Distributor::getVerbose()
312  {
313  return Details::Behavior::verbose("Distributor") ||
314  Details::Behavior::verbose("Tpetra::Distributor");
315  }
316 
317  std::unique_ptr<std::string>
318  Distributor::
319  createPrefix(const char methodName[]) const
320  {
321  return Details::createPrefix(
322  comm_.getRawPtr(), "Distributor", methodName);
323  }
324 
325  void
327  setParameterList (const Teuchos::RCP<Teuchos::ParameterList>& plist)
328  {
329  using ::Tpetra::Details::Behavior;
330  using Teuchos::FancyOStream;
331  using Teuchos::getIntegralValue;
332  using Teuchos::includesVerbLevel;
333  using Teuchos::ParameterList;
334  using Teuchos::parameterList;
335  using Teuchos::RCP;
336  using std::endl;
337 
338  if (! plist.is_null()) {
339  RCP<const ParameterList> validParams = getValidParameters ();
340  plist->validateParametersAndSetDefaults (*validParams);
341 
342  const bool barrierBetween =
343  plist->get<bool> ("Barrier between receives and sends");
344  const Details::EDistributorSendType sendType =
345  getIntegralValue<Details::EDistributorSendType> (*plist, "Send type");
346  const bool useDistinctTags = plist->get<bool> ("Use distinct tags");
347  {
348  // mfh 03 May 2016: We keep this option only for backwards
349  // compatibility, but it must always be true. See discussion of
350  // Github Issue #227.
351  const bool enable_cuda_rdma =
352  plist->get<bool> ("Enable MPI CUDA RDMA support");
353  TEUCHOS_TEST_FOR_EXCEPTION
354  (! enable_cuda_rdma, std::invalid_argument, "Tpetra::Distributor::"
355  "setParameterList: " << "You specified \"Enable MPI CUDA RDMA "
356  "support\" = false. This is no longer valid. You don't need to "
357  "specify this option any more; Tpetra assumes it is always true. "
358  "This is a very light assumption on the MPI implementation, and in "
359  "fact does not actually involve hardware or system RDMA support. "
360  "Tpetra just assumes that the MPI implementation can tell whether a "
361  "pointer points to host memory or CUDA device memory.");
362  }
363 
364  // We check this property explicitly, since we haven't yet learned
365  // how to make a validator that can cross-check properties.
366  // Later, turn this into a validator so that it can be embedded in
367  // the valid ParameterList and used in Optika.
368  TEUCHOS_TEST_FOR_EXCEPTION
369  (! barrierBetween && sendType == Details::DISTRIBUTOR_RSEND,
370  std::invalid_argument, "Tpetra::Distributor::setParameterList: " << endl
371  << "You specified \"Send type\"=\"Rsend\", but turned off the barrier "
372  "between receives and sends." << endl << "This is invalid; you must "
373  "include the barrier if you use ready sends." << endl << "Ready sends "
374  "require that their corresponding receives have already been posted, "
375  "and the only way to guarantee that in general is with a barrier.");
376 
377  // Now that we've validated the input list, save the results.
378  sendType_ = sendType;
379  barrierBetween_ = barrierBetween;
380  useDistinctTags_ = useDistinctTags;
381 
382  // ParameterListAcceptor semantics require pointer identity of the
383  // sublist passed to setParameterList(), so we save the pointer.
384  this->setMyParamList (plist);
385  }
386  }
387 
388  Teuchos::RCP<const Teuchos::ParameterList>
390  {
391  using Teuchos::Array;
392  using Teuchos::ParameterList;
393  using Teuchos::parameterList;
394  using Teuchos::RCP;
395  using Teuchos::setStringToIntegralParameter;
396 
397  const bool barrierBetween = barrierBetween_default;
398  const bool useDistinctTags = useDistinctTags_default;
399  const bool debug = tpetraDistributorDebugDefault;
400 
401  Array<std::string> sendTypes = distributorSendTypes ();
402  const std::string defaultSendType ("Send");
403  Array<Details::EDistributorSendType> sendTypeEnums;
404  sendTypeEnums.push_back (Details::DISTRIBUTOR_ISEND);
405  sendTypeEnums.push_back (Details::DISTRIBUTOR_RSEND);
406  sendTypeEnums.push_back (Details::DISTRIBUTOR_SEND);
407  sendTypeEnums.push_back (Details::DISTRIBUTOR_SSEND);
408 
409  RCP<ParameterList> plist = parameterList ("Tpetra::Distributor");
410  plist->set ("Barrier between receives and sends", barrierBetween,
411  "Whether to execute a barrier between receives and sends in do"
412  "[Reverse]Posts(). Required for correctness when \"Send type\""
413  "=\"Rsend\", otherwise correct but not recommended.");
414  setStringToIntegralParameter<Details::EDistributorSendType> ("Send type",
415  defaultSendType, "When using MPI, the variant of send to use in "
416  "do[Reverse]Posts()", sendTypes(), sendTypeEnums(), plist.getRawPtr());
417  plist->set ("Use distinct tags", useDistinctTags, "Whether to use distinct "
418  "MPI message tags for different code paths. Highly recommended"
419  " to avoid message collisions.");
420  plist->set ("Debug", debug, "Whether to print copious debugging output on "
421  "all processes.");
422  plist->set ("Timer Label","","Label for Time Monitor output");
423  plist->set ("Enable MPI CUDA RDMA support", true, "Assume that MPI can "
424  "tell whether a pointer points to host memory or CUDA device "
425  "memory. You don't need to specify this option any more; "
426  "Tpetra assumes it is always true. This is a very light "
427  "assumption on the MPI implementation, and in fact does not "
428  "actually involve hardware or system RDMA support.");
429 
430  // mfh 24 Dec 2015: Tpetra no longer inherits from
431  // Teuchos::VerboseObject, so it doesn't need the "VerboseObject"
432  // sublist. However, we retain the "VerboseObject" sublist
433  // anyway, for backwards compatibility (otherwise the above
434  // validation would fail with an invalid parameter name, should
435  // the user still want to provide this list).
436  Teuchos::setupVerboseObjectSublist (&*plist);
437  return Teuchos::rcp_const_cast<const ParameterList> (plist);
438  }
439 
440 
442  { return totalReceiveLength_; }
443 
445  { return numReceives_; }
446 
448  { return selfMessage_; }
449 
451  { return numSends_; }
452 
454  { return maxSendLength_; }
455 
456  Teuchos::ArrayView<const int> Distributor::getProcsFrom() const
457  { return procsFrom_; }
458 
459  Teuchos::ArrayView<const size_t> Distributor::getLengthsFrom() const
460  { return lengthsFrom_; }
461 
462  Teuchos::ArrayView<const int> Distributor::getProcsTo() const
463  { return procsTo_; }
464 
465  Teuchos::ArrayView<const size_t> Distributor::getLengthsTo() const
466  { return lengthsTo_; }
467 
468  Teuchos::RCP<Distributor>
469  Distributor::getReverse(bool create) const {
470  if (reverseDistributor_.is_null () && create) {
471  createReverseDistributor ();
472  }
473  TEUCHOS_TEST_FOR_EXCEPTION
474  (reverseDistributor_.is_null () && create, std::logic_error, "The reverse "
475  "Distributor is null after createReverseDistributor returned. "
476  "Please report this bug to the Tpetra developers.");
477  return reverseDistributor_;
478  }
479 
480 
481  void
482  Distributor::createReverseDistributor() const
483  {
484  reverseDistributor_ = Teuchos::rcp(new Distributor(comm_));
485  reverseDistributor_->howInitialized_ = Details::DISTRIBUTOR_INITIALIZED_BY_REVERSE;
486  reverseDistributor_->sendType_ = sendType_;
487  reverseDistributor_->barrierBetween_ = barrierBetween_;
488  reverseDistributor_->verbose_ = verbose_;
489 
490  // The total length of all the sends of this Distributor. We
491  // calculate it because it's the total length of all the receives
492  // of the reverse Distributor.
493  size_t totalSendLength =
494  std::accumulate (lengthsTo_.begin(), lengthsTo_.end(), 0);
495 
496  // The maximum length of any of the receives of this Distributor.
497  // We calculate it because it's the maximum length of any of the
498  // sends of the reverse Distributor.
499  size_t maxReceiveLength = 0;
500  const int myProcID = comm_->getRank();
501  for (size_t i=0; i < numReceives_; ++i) {
502  if (procsFrom_[i] != myProcID) {
503  // Don't count receives for messages sent by myself to myself.
504  if (lengthsFrom_[i] > maxReceiveLength) {
505  maxReceiveLength = lengthsFrom_[i];
506  }
507  }
508  }
509 
510  // Initialize all of reverseDistributor's data members. This
511  // mainly just involves flipping "send" and "receive," or the
512  // equivalent "to" and "from."
513 
514  reverseDistributor_->selfMessage_ = selfMessage_;
515  reverseDistributor_->numSends_ = numReceives_;
516  reverseDistributor_->procsTo_ = procsFrom_;
517  reverseDistributor_->startsTo_ = startsFrom_;
518  reverseDistributor_->lengthsTo_ = lengthsFrom_;
519  reverseDistributor_->maxSendLength_ = maxReceiveLength;
520  reverseDistributor_->indicesTo_ = indicesFrom_;
521  reverseDistributor_->numReceives_ = numSends_;
522  reverseDistributor_->totalReceiveLength_ = totalSendLength;
523  reverseDistributor_->lengthsFrom_ = lengthsTo_;
524  reverseDistributor_->procsFrom_ = procsTo_;
525  reverseDistributor_->startsFrom_ = startsTo_;
526  reverseDistributor_->indicesFrom_ = indicesTo_;
527 
528  // requests_: Allocated on demand.
529  // reverseDistributor_: See note below
530 
531  // mfh 31 Mar 2016: These are statistics, kept on calls to
532  // doPostsAndWaits or doReversePostsAndWaits. They weren't here
533  // when I started, and I didn't add them, so I don't know if they
534  // are accurate.
535  reverseDistributor_->lastRoundBytesSend_ = 0;
536  reverseDistributor_->lastRoundBytesRecv_ = 0;
537 
538  reverseDistributor_->useDistinctTags_ = useDistinctTags_;
539 
540  // I am my reverse Distributor's reverse Distributor.
541  // Thus, it would be legit to do the following:
542  //
543  // reverseDistributor_->reverseDistributor_ = Teuchos::rcp (this, false);
544  //
545  // (Note use of a "weak reference" to avoid a circular RCP
546  // dependency.) The only issue is that if users hold on to the
547  // reverse Distributor but let go of the forward one, this
548  // reference won't be valid anymore. However, the reverse
549  // Distributor is really an implementation detail of Distributor
550  // and not meant to be used directly, so we don't need to do this.
551  reverseDistributor_->reverseDistributor_ = Teuchos::null;
552  }
553 
554  void
556  {
557  using Teuchos::Array;
558  using Teuchos::CommRequest;
559  using Teuchos::FancyOStream;
560  using Teuchos::includesVerbLevel;
561  using Teuchos::is_null;
562  using Teuchos::RCP;
563  using Teuchos::waitAll;
564  using std::endl;
565 
566 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
567  Teuchos::TimeMonitor timeMon (*timer_doWaits_);
568 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
569 
570  const bool debug = Details::Behavior::debug("Distributor");
571 
572  std::unique_ptr<std::string> prefix;
573  if (verbose_) {
574  prefix = createPrefix("doWaits");
575  std::ostringstream os;
576  os << *prefix << "Start: requests_.size(): "
577  << requests_.size() << endl;
578  std::cerr << os.str();
579  }
580 
581  if (requests_.size() > 0) {
582  waitAll(*comm_, requests_());
583 
584  if (debug) {
585  // Make sure that waitAll() nulled out all the requests.
586  for (auto it = requests_.begin(); it != requests_.end(); ++it) {
587  TEUCHOS_TEST_FOR_EXCEPTION
588  (! is_null(*it), std::runtime_error,
589  "Tpetra::Distributor::doWaits: Communication requests "
590  "should all be null aftr calling Teuchos::waitAll on "
591  "them, but at least one request is not null.");
592  }
593  }
594  // Restore the invariant that requests_.size() is the number of
595  // outstanding nonblocking communication requests.
596  requests_.resize (0);
597  }
598 
599  if (debug) {
600  const int localSizeNonzero = (requests_.size () != 0) ? 1 : 0;
601  int globalSizeNonzero = 0;
602  Teuchos::reduceAll<int, int> (*comm_, Teuchos::REDUCE_MAX,
603  localSizeNonzero,
604  Teuchos::outArg (globalSizeNonzero));
605  TEUCHOS_TEST_FOR_EXCEPTION(
606  globalSizeNonzero != 0, std::runtime_error,
607  "Tpetra::Distributor::doWaits: After waitAll, at least one process has "
608  "a nonzero number of outstanding posts. There should be none at this "
609  "point. Please report this bug to the Tpetra developers.");
610  }
611 
612  if (verbose_) {
613  std::ostringstream os;
614  os << *prefix << "Done" << endl;
615  std::cerr << os.str();
616  }
617  }
618 
620  // call doWaits() on the reverse Distributor, if it exists
621  if (! reverseDistributor_.is_null()) {
622  reverseDistributor_->doWaits();
623  }
624  }
625 
626  std::string Distributor::description () const {
627  std::ostringstream out;
628 
629  out << "\"Tpetra::Distributor\": {";
630  const std::string label = this->getObjectLabel ();
631  if (label != "") {
632  out << "Label: " << label << ", ";
633  }
634  out << "How initialized: "
636  << ", Parameters: {"
637  << "Send type: "
638  << DistributorSendTypeEnumToString (sendType_)
639  << ", Barrier between receives and sends: "
640  << (barrierBetween_ ? "true" : "false")
641  << ", Use distinct tags: "
642  << (useDistinctTags_ ? "true" : "false")
643  << ", Debug: " << (verbose_ ? "true" : "false")
644  << "}}";
645  return out.str ();
646  }
647 
648  std::string
649  Distributor::
650  localDescribeToString (const Teuchos::EVerbosityLevel vl) const
651  {
652  using Teuchos::toString;
653  using Teuchos::VERB_HIGH;
654  using Teuchos::VERB_EXTREME;
655  using std::endl;
656 
657  // This preserves current behavior of Distributor.
658  if (vl <= Teuchos::VERB_LOW || comm_.is_null ()) {
659  return std::string ();
660  }
661 
662  auto outStringP = Teuchos::rcp (new std::ostringstream ());
663  auto outp = Teuchos::getFancyOStream (outStringP); // returns RCP
664  Teuchos::FancyOStream& out = *outp;
665 
666  const int myRank = comm_->getRank ();
667  const int numProcs = comm_->getSize ();
668  out << "Process " << myRank << " of " << numProcs << ":" << endl;
669  Teuchos::OSTab tab1 (out);
670 
671  out << "selfMessage: " << hasSelfMessage () << endl;
672  out << "numSends: " << getNumSends () << endl;
673  if (vl == VERB_HIGH || vl == VERB_EXTREME) {
674  out << "procsTo: " << toString (procsTo_) << endl;
675  out << "lengthsTo: " << toString (lengthsTo_) << endl;
676  out << "maxSendLength: " << getMaxSendLength () << endl;
677  }
678  if (vl == VERB_EXTREME) {
679  out << "startsTo: " << toString (startsTo_) << endl;
680  out << "indicesTo: " << toString (indicesTo_) << endl;
681  }
682  if (vl == VERB_HIGH || vl == VERB_EXTREME) {
683  out << "numReceives: " << getNumReceives () << endl;
684  out << "totalReceiveLength: " << getTotalReceiveLength () << endl;
685  out << "lengthsFrom: " << toString (lengthsFrom_) << endl;
686  out << "startsFrom: " << toString (startsFrom_) << endl;
687  out << "procsFrom: " << toString (procsFrom_) << endl;
688  }
689 
690  out.flush (); // make sure the ostringstream got everything
691  return outStringP->str ();
692  }
693 
694  void
696  describe (Teuchos::FancyOStream& out,
697  const Teuchos::EVerbosityLevel verbLevel) const
698  {
699  using std::endl;
700  using Teuchos::VERB_DEFAULT;
701  using Teuchos::VERB_NONE;
702  using Teuchos::VERB_LOW;
703  using Teuchos::VERB_MEDIUM;
704  using Teuchos::VERB_HIGH;
705  using Teuchos::VERB_EXTREME;
706  const Teuchos::EVerbosityLevel vl =
707  (verbLevel == VERB_DEFAULT) ? VERB_LOW : verbLevel;
708 
709  if (vl == VERB_NONE) {
710  return; // don't print anything
711  }
712  // If this Distributor's Comm is null, then the the calling
713  // process does not participate in Distributor-related collective
714  // operations with the other processes. In that case, it is not
715  // even legal to call this method. The reasonable thing to do in
716  // that case is nothing.
717  if (comm_.is_null ()) {
718  return;
719  }
720  const int myRank = comm_->getRank ();
721  const int numProcs = comm_->getSize ();
722 
723  // Only Process 0 should touch the output stream, but this method
724  // in general may need to do communication. Thus, we may need to
725  // preserve the current tab level across multiple "if (myRank ==
726  // 0) { ... }" inner scopes. This is why we sometimes create
727  // OSTab instances by pointer, instead of by value. We only need
728  // to create them by pointer if the tab level must persist through
729  // multiple inner scopes.
730  Teuchos::RCP<Teuchos::OSTab> tab0, tab1;
731 
732  if (myRank == 0) {
733  // At every verbosity level but VERB_NONE, Process 0 prints.
734  // By convention, describe() always begins with a tab before
735  // printing.
736  tab0 = Teuchos::rcp (new Teuchos::OSTab (out));
737  // We quote the class name because it contains colons.
738  // This makes the output valid YAML.
739  out << "\"Tpetra::Distributor\":" << endl;
740  tab1 = Teuchos::rcp (new Teuchos::OSTab (out));
741 
742  const std::string label = this->getObjectLabel ();
743  if (label != "") {
744  out << "Label: " << label << endl;
745  }
746  out << "Number of processes: " << numProcs << endl
747  << "How initialized: "
749  << endl;
750  {
751  out << "Parameters: " << endl;
752  Teuchos::OSTab tab2 (out);
753  out << "\"Send type\": "
754  << DistributorSendTypeEnumToString (sendType_) << endl
755  << "\"Barrier between receives and sends\": "
756  << (barrierBetween_ ? "true" : "false") << endl
757  << "\"Use distinct tags\": "
758  << (useDistinctTags_ ? "true" : "false") << endl
759  << "\"Debug\": " << (verbose_ ? "true" : "false") << endl;
760  }
761  } // if myRank == 0
762 
763  // This is collective over the Map's communicator.
764  if (vl > VERB_LOW) {
765  const std::string lclStr = this->localDescribeToString (vl);
766  Tpetra::Details::gathervPrint (out, lclStr, *comm_);
767  }
768 
769  out << "Reverse Distributor:";
770  if (reverseDistributor_.is_null ()) {
771  out << " null" << endl;
772  }
773  else {
774  out << endl;
775  reverseDistributor_->describe (out, vl);
776  }
777  }
778 
779  void
780  Distributor::
781  computeReceives ()
782  {
783  using Teuchos::Array;
784  using Teuchos::ArrayRCP;
785  using Teuchos::as;
786  using Teuchos::CommStatus;
787  using Teuchos::CommRequest;
788  using Teuchos::ireceive;
789  using Teuchos::RCP;
790  using Teuchos::rcp;
791  using Teuchos::REDUCE_SUM;
792  using Teuchos::receive;
793  using Teuchos::reduce;
794  using Teuchos::scatter;
795  using Teuchos::send;
796  using Teuchos::waitAll;
797  using std::endl;
798 
799  const int myRank = comm_->getRank();
800  const int numProcs = comm_->getSize();
801 
802  // MPI tag for nonblocking receives and blocking sends in this method.
803  const int pathTag = 2;
804  const int tag = this->getTag (pathTag);
805 
806  std::unique_ptr<std::string> prefix;
807  if (verbose_) {
808  prefix = createPrefix("computeReceives");
809  std::ostringstream os;
810  os << *prefix
811  << "selfMessage_: " << (selfMessage_ ? "true" : "false")
812  << ", pathTag: " << pathTag << ", tag: " << tag << endl;
813  std::cerr << os.str();
814  }
815 
816  // toProcsFromMe[i] == the number of messages sent by this process
817  // to process i. The data in numSends_, procsTo_, and lengthsTo_
818  // concern the contiguous sends. Therefore, each process will be
819  // listed in procsTo_ at most once, and so toProcsFromMe[i] will
820  // either be 0 or 1.
821  {
822  Array<int> toProcsFromMe (numProcs, 0);
823 #ifdef HAVE_TEUCHOS_DEBUG
824  bool counting_error = false;
825 #endif // HAVE_TEUCHOS_DEBUG
826  for (size_t i = 0; i < (numSends_ + (selfMessage_ ? 1 : 0)); ++i) {
827 #ifdef HAVE_TEUCHOS_DEBUG
828  if (toProcsFromMe[procsTo_[i]] != 0) {
829  counting_error = true;
830  }
831 #endif // HAVE_TEUCHOS_DEBUG
832  toProcsFromMe[procsTo_[i]] = 1;
833  }
834 #ifdef HAVE_TEUCHOS_DEBUG
835  SHARED_TEST_FOR_EXCEPTION(counting_error, std::logic_error,
836  "Tpetra::Distributor::computeReceives: There was an error on at least "
837  "one process in counting the number of messages send by that process to "
838  "the other processs. Please report this bug to the Tpetra developers.",
839  *comm_);
840 #endif // HAVE_TEUCHOS_DEBUG
841 
842  if (verbose_) {
843  std::ostringstream os;
844  os << *prefix << "Reduce & scatter" << endl;
845  std::cerr << os.str();
846  }
847 
848  // Compute the number of receives that this process needs to
849  // post. The number of receives includes any self sends (i.e.,
850  // messages sent by this process to itself).
851  //
852  // (We will use numReceives_ this below to post exactly that
853  // number of receives, with MPI_ANY_SOURCE as the sending rank.
854  // This will tell us from which processes this process expects
855  // to receive, and how many packets of data we expect to receive
856  // from each process.)
857  //
858  // toProcsFromMe[i] is the number of messages sent by this
859  // process to process i. Compute the sum (elementwise) of all
860  // the toProcsFromMe arrays on all processes in the
861  // communicator. If the array x is that sum, then if this
862  // process has rank j, x[j] is the number of messages sent
863  // to process j, that is, the number of receives on process j
864  // (including any messages sent by process j to itself).
865  //
866  // Yes, this requires storing and operating on an array of
867  // length P, where P is the number of processes in the
868  // communicator. Epetra does this too. Avoiding this O(P)
869  // memory bottleneck would require some research.
870  //
871  // mfh 09 Jan 2012, 15 Jul 2015: There are three ways to
872  // implement this O(P) memory algorithm.
873  //
874  // 1. Use MPI_Reduce and MPI_Scatter: reduce on the root
875  // process (0) from toProcsFromMe, to numRecvsOnEachProc.
876  // Then, scatter the latter, so that each process p gets
877  // numRecvsOnEachProc[p].
878  //
879  // 2. Like #1, but use MPI_Reduce_scatter instead of
880  // MPI_Reduce and MPI_Scatter. MPI_Reduce_scatter might be
881  // optimized to reduce the number of messages, but
882  // MPI_Reduce_scatter is more general than we need (it
883  // allows the equivalent of MPI_Scatterv). See Bug 6336.
884  //
885  // 3. Do an all-reduce on toProcsFromMe, and let my process
886  // (with rank myRank) get numReceives_ from
887  // toProcsFromMe[myRank]. The HPCCG miniapp uses the
888  // all-reduce method.
889  //
890  // Approaches 1 and 3 have the same critical path length.
891  // However, #3 moves more data. This is because the final
892  // result is just one integer, but #3 moves a whole array of
893  // results to all the processes. This is why we use Approach 1
894  // here.
895  //
896  // mfh 12 Apr 2013: See discussion in createFromSends() about
897  // how we could use this communication to propagate an error
898  // flag for "free" in a release build.
899 
900  const int root = 0; // rank of root process of the reduction
901  Array<int> numRecvsOnEachProc; // temp; only needed on root
902  if (myRank == root) {
903  numRecvsOnEachProc.resize (numProcs);
904  }
905  int numReceivesAsInt = 0; // output
906  reduce<int, int> (toProcsFromMe.getRawPtr (),
907  numRecvsOnEachProc.getRawPtr (),
908  numProcs, REDUCE_SUM, root, *comm_);
909  scatter<int, int> (numRecvsOnEachProc.getRawPtr (), 1,
910  &numReceivesAsInt, 1, root, *comm_);
911  numReceives_ = static_cast<size_t> (numReceivesAsInt);
912  }
913 
914  // Now we know numReceives_, which is this process' number of
915  // receives. Allocate the lengthsFrom_ and procsFrom_ arrays
916  // with this number of entries.
917  lengthsFrom_.assign (numReceives_, 0);
918  procsFrom_.assign (numReceives_, 0);
919 
920  //
921  // Ask (via nonblocking receive) each process from which we are
922  // receiving how many packets we should expect from it in the
923  // communication pattern.
924  //
925 
926  // At this point, numReceives_ includes any self message that
927  // there may be. At the end of this routine, we'll subtract off
928  // the self message (if there is one) from numReceives_. In this
929  // routine, we don't need to receive a message from ourselves in
930  // order to figure out our lengthsFrom_ and source process ID; we
931  // can just ask ourselves directly. Thus, the actual number of
932  // nonblocking receives we post here does not include the self
933  // message.
934  const size_t actualNumReceives = numReceives_ - (selfMessage_ ? 1 : 0);
935 
936  // Teuchos' wrapper for nonblocking receives requires receive
937  // buffers that it knows won't go away. This is why we use RCPs,
938  // one RCP per nonblocking receive request. They get allocated in
939  // the loop below.
940  Array<RCP<CommRequest<int> > > requests (actualNumReceives);
941  Array<ArrayRCP<size_t> > lengthsFromBuffers (actualNumReceives);
942  Array<RCP<CommStatus<int> > > statuses (actualNumReceives);
943 
944  // Teuchos::Comm treats a negative process ID as MPI_ANY_SOURCE
945  // (receive data from any process).
946 #ifdef HAVE_MPI
947  const int anySourceProc = MPI_ANY_SOURCE;
948 #else
949  const int anySourceProc = -1;
950 #endif
951 
952  if (verbose_) {
953  std::ostringstream os;
954  os << *prefix << "Post " << actualNumReceives << " irecv"
955  << (actualNumReceives != size_t (1) ? "s" : "") << endl;
956  std::cerr << os.str();
957  }
958 
959  // Post the (nonblocking) receives.
960  for (size_t i = 0; i < actualNumReceives; ++i) {
961  // Once the receive completes, we can ask the corresponding
962  // CommStatus object (output by wait()) for the sending process'
963  // ID (which we'll assign to procsFrom_[i] -- don't forget to
964  // do that!).
965  lengthsFromBuffers[i].resize (1);
966  lengthsFromBuffers[i][0] = as<size_t> (0);
967  requests[i] = ireceive<int, size_t> (lengthsFromBuffers[i], anySourceProc,
968  tag, *comm_);
969  if (verbose_) {
970  std::ostringstream os;
971  os << *prefix << "Posted any-proc irecv w/ tag " << tag << endl;
972  std::cerr << os.str();
973  }
974  }
975 
976  if (verbose_) {
977  std::ostringstream os;
978  os << *prefix << "Post " << numSends_ << " send"
979  << (numSends_ != size_t (1) ? "s" : "") << endl;
980  std::cerr << os.str();
981  }
982  // Post the sends: Tell each process to which we are sending how
983  // many packets it should expect from us in the communication
984  // pattern. We could use nonblocking sends here, as long as we do
985  // a waitAll() on all the sends and receives at once.
986  //
987  // We assume that numSends_ and selfMessage_ have already been
988  // set. The value of numSends_ (my process' number of sends) does
989  // not include any message that it might send to itself.
990  for (size_t i = 0; i < numSends_ + (selfMessage_ ? 1 : 0); ++i) {
991  if (procsTo_[i] != myRank) {
992  // Send a message to procsTo_[i], telling that process that
993  // this communication pattern will send that process
994  // lengthsTo_[i] blocks of packets.
995  const size_t* const lengthsTo_i = &lengthsTo_[i];
996  send<int, size_t> (lengthsTo_i, 1, as<int> (procsTo_[i]), tag, *comm_);
997  if (verbose_) {
998  std::ostringstream os;
999  os << *prefix << "Posted send to Proc " << procsTo_[i] << " w/ tag "
1000  << tag << endl;
1001  std::cerr << os.str();
1002  }
1003  }
1004  else {
1005  // We don't need a send in the self-message case. If this
1006  // process will send a message to itself in the communication
1007  // pattern, then the last element of lengthsFrom_ and
1008  // procsFrom_ corresponds to the self-message. Of course
1009  // this process knows how long the message is, and the process
1010  // ID is its own process ID.
1011  lengthsFrom_[numReceives_-1] = lengthsTo_[i];
1012  procsFrom_[numReceives_-1] = myRank;
1013  }
1014  }
1015 
1016  if (verbose_) {
1017  std::ostringstream os;
1018  const size_t numReq = requests.size();
1019  os << *prefix << "waitAll on " << numReq << " request"
1020  << (numReq != size_t(1) ? "s" : "") << endl;
1021  std::cerr << os.str();
1022  }
1023  //
1024  // Wait on all the receives. When they arrive, check the status
1025  // output of wait() for the receiving process ID, unpack the
1026  // request buffers into lengthsFrom_, and set procsFrom_ from the
1027  // status.
1028  //
1029  waitAll (*comm_, requests (), statuses ());
1030  for (size_t i = 0; i < actualNumReceives; ++i) {
1031  lengthsFrom_[i] = *lengthsFromBuffers[i];
1032  procsFrom_[i] = statuses[i]->getSourceRank ();
1033  }
1034 
1035  // Sort the procsFrom_ array, and apply the same permutation to
1036  // lengthsFrom_. This ensures that procsFrom_[i] and
1037  // lengthsFrom_[i] refers to the same thing.
1038  sort2 (procsFrom_.begin(), procsFrom_.end(), lengthsFrom_.begin());
1039 
1040  // Compute indicesFrom_
1041  totalReceiveLength_ =
1042  std::accumulate (lengthsFrom_.begin (), lengthsFrom_.end (), 0);
1043  indicesFrom_.clear ();
1044  // NOTE (mfh 13 Feb 2019): Epetra_MpiDistributor deliberately does
1045  // _not_ fill indicesFrom_ (what it calls "indices_from_") like
1046  // this; it leaves indicesFrom_ empty. The comment there mentions
1047  // that not filling indicesFrom_ helps reverse mode correctness.
1048 #if 0
1049  indicesFrom_.reserve (totalReceiveLength_);
1050  for (size_t i = 0; i < totalReceiveLength_; ++i) {
1051  indicesFrom_.push_back(i);
1052  }
1053 #endif // 0
1054 
1055  startsFrom_.clear ();
1056  startsFrom_.reserve (numReceives_);
1057  for (size_t i = 0, j = 0; i < numReceives_; ++i) {
1058  startsFrom_.push_back(j);
1059  j += lengthsFrom_[i];
1060  }
1061 
1062  if (selfMessage_) {
1063  --numReceives_;
1064  }
1065 
1066  if (verbose_) {
1067  std::ostringstream os;
1068  os << *prefix << "Done" << endl;
1069  std::cerr << os.str();
1070  }
1071  }
1072 
1073  size_t
1075  createFromSends (const Teuchos::ArrayView<const int>& exportProcIDs)
1076  {
1077  using Teuchos::outArg;
1078  using Teuchos::REDUCE_MAX;
1079  using Teuchos::reduceAll;
1080  using std::endl;
1081  const char rawPrefix[] = "Tpetra::Distributor::createFromSends";
1082 
1083  const size_t numExports = exportProcIDs.size();
1084  const int myProcID = comm_->getRank();
1085  const int numProcs = comm_->getSize();
1086 
1087  const bool debug = Details::Behavior::debug("Distributor");
1088  const size_t maxNumToPrint = verbose_ ?
1090  std::unique_ptr<std::string> prefix;
1091  if (verbose_) {
1092  prefix = createPrefix("createFromSends");
1093  std::ostringstream os;
1094  os << *prefix << "Start: ";
1095  Details::verbosePrintArray(os, exportProcIDs, "exportPIDs",
1096  maxNumToPrint);
1097  os << endl;
1098  std::cerr << os.str();
1099  }
1100 
1101  // exportProcIDs tells us the communication pattern for this
1102  // distributor. It dictates the way that the export data will be
1103  // interpreted in doPosts(). We want to perform at most one
1104  // send per process in doPosts; this is for two reasons:
1105  // * minimize latency / overhead in the comm routines (nice)
1106  // * match the number of receives and sends between processes
1107  // (necessary)
1108  //
1109  // Teuchos::Comm requires that the data for a send are contiguous
1110  // in a send buffer. Therefore, if the data in the send buffer
1111  // for doPosts() are not contiguous, they will need to be copied
1112  // into a contiguous buffer. The user has specified this
1113  // noncontiguous pattern and we can't do anything about it.
1114  // However, if they do not provide an efficient pattern, we will
1115  // warn them if one of the following compile-time options has been
1116  // set:
1117  // * HAVE_TPETRA_THROW_EFFICIENCY_WARNINGS
1118  // * HAVE_TPETRA_PRINT_EFFICIENCY_WARNINGS
1119  //
1120  // If the data are contiguous, then we can post the sends in situ
1121  // (i.e., without needing to copy them into a send buffer).
1122  //
1123  // Determine contiguity. There are a number of ways to do this:
1124  // * If the export IDs are sorted, then all exports to a
1125  // particular proc must be contiguous. This is what Epetra does.
1126  // * If the export ID of the current export already has been
1127  // listed, then the previous listing should correspond to the
1128  // same export. This tests contiguity, but not sortedness.
1129  //
1130  // Both of these tests require O(n), where n is the number of
1131  // exports. However, the latter will positively identify a greater
1132  // portion of contiguous patterns. We use the latter method.
1133  //
1134  // Check to see if values are grouped by procs without gaps
1135  // If so, indices_to -> 0.
1136 
1137  // Set up data structures for quick traversal of arrays.
1138  // This contains the number of sends for each process ID.
1139  //
1140  // FIXME (mfh 20 Mar 2014) This is one of a few places in Tpetra
1141  // that create an array of length the number of processes in the
1142  // communicator (plus one). Given how this code uses this array,
1143  // it should be straightforward to replace it with a hash table or
1144  // some other more space-efficient data structure. In practice,
1145  // most of the entries of starts should be zero for a sufficiently
1146  // large process count, unless the communication pattern is dense.
1147  // Note that it's important to be able to iterate through keys (i
1148  // for which starts[i] is nonzero) in increasing order.
1149  Teuchos::Array<size_t> starts (numProcs + 1, 0);
1150 
1151  // numActive is the number of sends that are not Null
1152  size_t numActive = 0;
1153  int needSendBuff = 0; // Boolean
1154 
1155  int badID = -1; // only used in debug mode
1156  for (size_t i = 0; i < numExports; ++i) {
1157  const int exportID = exportProcIDs[i];
1158  if (exportID >= numProcs) {
1159  badID = myProcID;
1160  break;
1161  }
1162  else if (exportID >= 0) {
1163  // exportID is a valid process ID. Increment the number of
1164  // messages this process will send to that process.
1165  ++starts[exportID];
1166 
1167  // If we're sending more than one message to process exportID,
1168  // then it is possible that the data are not contiguous.
1169  // Check by seeing if the previous process ID in the list
1170  // (exportProcIDs[i-1]) is the same. It's safe to use i-1,
1171  // because if starts[exportID] > 1, then i must be > 1 (since
1172  // the starts array was filled with zeros initially).
1173 
1174  // null entries break continuity.
1175  // e.g., [ 0, 0, 0, 1, -99, 1, 2, 2, 2] is not contiguous
1176  if (needSendBuff == 0 && starts[exportID] > 1 &&
1177  exportID != exportProcIDs[i-1]) {
1178  needSendBuff = 1;
1179  }
1180  ++numActive;
1181  }
1182  }
1183 
1184  if (debug) {
1185  // Test whether any process in the communicator got an invalid
1186  // process ID. If badID != -1 on this process, then it equals
1187  // this process' rank. The max of all badID over all processes
1188  // is the max rank which has an invalid process ID.
1189  int gbl_badID;
1190  reduceAll<int, int> (*comm_, REDUCE_MAX, badID, outArg (gbl_badID));
1191  TEUCHOS_TEST_FOR_EXCEPTION
1192  (gbl_badID >= 0, std::runtime_error, rawPrefix << "Proc "
1193  << gbl_badID << ", perhaps among other processes, got a bad "
1194  "send process ID.");
1195  }
1196  // FIXME (mfh 12 Apr 2013, 15 Jul 2015, 13 Feb 2020) Rather than
1197  // simply ignoring this information when not in debug mode, we
1198  // should think about how to pass it along so that all the
1199  // processes find out about it. In a release build with
1200  // efficiency warnings turned off, the next collective
1201  // communication happens in computeReceives(). We could figure
1202  // out how to encode the error flag in that operation, for example
1203  // by adding an extra entry to the collective's output array that
1204  // encodes the error condition (0 on all processes if no error,
1205  // else 1 on any process with the error, so that the sum will
1206  // produce a nonzero value if any process had an error). I'll
1207  // defer this change for now and recommend instead that people
1208  // with troubles try a debug build.
1209 
1210 #if defined(HAVE_TPETRA_THROW_EFFICIENCY_WARNINGS) || defined(HAVE_TPETRA_PRINT_EFFICIENCY_WARNINGS)
1211  {
1212  int global_needSendBuff;
1213  reduceAll<int, int> (*comm_, REDUCE_MAX, needSendBuff,
1214  outArg (global_needSendBuff));
1216  global_needSendBuff != 0, std::runtime_error,
1217  "::createFromSends: Grouping export IDs together by process rank often "
1218  "improves performance.");
1219  }
1220 #endif
1221 
1222  if (verbose_) {
1223  std::ostringstream os;
1224  os << *prefix << "Detect whether I have a self message" << endl;
1225  std::cerr << os.str();
1226  }
1227 
1228  // Determine from the caller's data whether or not the current
1229  // process should send (a) message(s) to itself.
1230  if (starts[myProcID] != 0) {
1231  selfMessage_ = true;
1232  }
1233  else {
1234  selfMessage_ = false;
1235  }
1236 
1237  bool index_neq_numActive = false;
1238  bool send_neq_numSends = false;
1239  if (! needSendBuff) {
1240  if (verbose_) {
1241  std::ostringstream os;
1242  os << *prefix << "I don't need a send buffer or indicesTo_ "
1243  "(fast path)" << endl;
1244  std::cerr << os.str();
1245  }
1246  // grouped by proc, no send buffer or indicesTo_ needed
1247  numSends_ = 0;
1248  // Count total number of sends, i.e., total number of procs to
1249  // which we are sending. This includes myself, if applicable.
1250  for (int i = 0; i < numProcs; ++i) {
1251  if (starts[i]) {
1252  ++numSends_;
1253  }
1254  }
1255 
1256  // Not only do we not need these, but we must clear them, as
1257  // empty status of indicesTo is a flag used later.
1258  indicesTo_.resize(0);
1259  // Size these to numSends_; note, at the moment, numSends_
1260  // includes self sends. Set their values to zeros.
1261  procsTo_.assign(numSends_,0);
1262  startsTo_.assign(numSends_,0);
1263  lengthsTo_.assign(numSends_,0);
1264 
1265  // set startsTo to the offset for each send (i.e., each proc ID)
1266  // set procsTo to the proc ID for each send
1267  // in interpreting this code, remember that we are assuming contiguity
1268  // that is why index skips through the ranks
1269  {
1270  size_t index = 0, procIndex = 0;
1271  for (size_t i = 0; i < numSends_; ++i) {
1272  while (exportProcIDs[procIndex] < 0) {
1273  ++procIndex; // skip all negative proc IDs
1274  }
1275  startsTo_[i] = procIndex;
1276  int procID = exportProcIDs[procIndex];
1277  procsTo_[i] = procID;
1278  index += starts[procID];
1279  procIndex += starts[procID];
1280  }
1281  if (index != numActive) {
1282  index_neq_numActive = true;
1283  }
1284  }
1285  // sort the startsTo and proc IDs together, in ascending order, according
1286  // to proc IDs
1287  if (numSends_ > 0) {
1288  sort2(procsTo_.begin(), procsTo_.end(), startsTo_.begin());
1289  }
1290  // compute the maximum send length
1291  maxSendLength_ = 0;
1292  for (size_t i = 0; i < numSends_; ++i) {
1293  int procID = procsTo_[i];
1294  lengthsTo_[i] = starts[procID];
1295  if ((procID != myProcID) && (lengthsTo_[i] > maxSendLength_)) {
1296  maxSendLength_ = lengthsTo_[i];
1297  }
1298  }
1299  }
1300  else {
1301  if (verbose_) {
1302  std::ostringstream os;
1303  os << *prefix << "I need a send buffer & indicesTo_ "
1304  "(slow path)" << endl;
1305  std::cerr << os.str();
1306  }
1307  // not grouped by proc, need send buffer and indicesTo_
1308 
1309  // starts[i] is the number of sends to proc i
1310  // numActive equals number of sends total, \sum_i starts[i]
1311 
1312  // this loop starts at starts[1], so explicitly check starts[0]
1313  if (starts[0] == 0 ) {
1314  numSends_ = 0;
1315  }
1316  else {
1317  numSends_ = 1;
1318  }
1319  for (Teuchos::Array<size_t>::iterator i=starts.begin()+1,
1320  im1=starts.begin();
1321  i != starts.end(); ++i)
1322  {
1323  if (*i != 0) ++numSends_;
1324  *i += *im1;
1325  im1 = i;
1326  }
1327  // starts[i] now contains the number of exports to procs 0 through i
1328 
1329  for (Teuchos::Array<size_t>::reverse_iterator ip1=starts.rbegin(),
1330  i=starts.rbegin()+1;
1331  i != starts.rend(); ++i)
1332  {
1333  *ip1 = *i;
1334  ip1 = i;
1335  }
1336  starts[0] = 0;
1337  // starts[i] now contains the number of exports to procs 0 through
1338  // i-1, i.e., all procs before proc i
1339 
1340  indicesTo_.resize(numActive);
1341 
1342  for (size_t i = 0; i < numExports; ++i) {
1343  if (exportProcIDs[i] >= 0) {
1344  // record the offset to the sendBuffer for this export
1345  indicesTo_[starts[exportProcIDs[i]]] = i;
1346  // now increment the offset for this proc
1347  ++starts[exportProcIDs[i]];
1348  }
1349  }
1350  // our send buffer will contain the export data for each of the procs
1351  // we communicate with, in order by proc id
1352  // sendBuffer = {proc_0_data, proc_1_data, ..., proc_np-1_data}
1353  // indicesTo now maps each export to the location in our send buffer
1354  // associated with the export
1355  // data for export i located at sendBuffer[indicesTo[i]]
1356  //
1357  // starts[i] once again contains the number of exports to
1358  // procs 0 through i
1359  for (int proc = numProcs-1; proc != 0; --proc) {
1360  starts[proc] = starts[proc-1];
1361  }
1362  starts.front() = 0;
1363  starts[numProcs] = numActive;
1364  //
1365  // starts[proc] once again contains the number of exports to
1366  // procs 0 through proc-1
1367  // i.e., the start of my data in the sendBuffer
1368 
1369  // this contains invalid data at procs we don't care about, that is okay
1370  procsTo_.resize(numSends_);
1371  startsTo_.resize(numSends_);
1372  lengthsTo_.resize(numSends_);
1373 
1374  // for each group of sends/exports, record the destination proc,
1375  // the length, and the offset for this send into the
1376  // send buffer (startsTo_)
1377  maxSendLength_ = 0;
1378  size_t snd = 0;
1379  for (int proc = 0; proc < numProcs; ++proc ) {
1380  if (starts[proc+1] != starts[proc]) {
1381  lengthsTo_[snd] = starts[proc+1] - starts[proc];
1382  startsTo_[snd] = starts[proc];
1383  // record max length for all off-proc sends
1384  if ((proc != myProcID) && (lengthsTo_[snd] > maxSendLength_)) {
1385  maxSendLength_ = lengthsTo_[snd];
1386  }
1387  procsTo_[snd] = proc;
1388  ++snd;
1389  }
1390  }
1391  if (snd != numSends_) {
1392  send_neq_numSends = true;
1393  }
1394  }
1395  if (debug) {
1397  (index_neq_numActive, std::logic_error,
1398  rawPrefix << "logic error. Please notify the Tpetra team.", *comm_);
1400  (send_neq_numSends, std::logic_error,
1401  rawPrefix << "logic error. Please notify the Tpetra team.", *comm_);
1402  }
1403 
1404  if (selfMessage_) {
1405  if (verbose_) {
1406  std::ostringstream os;
1407  os << *prefix << "Sending self message; numSends "
1408  << numSends_ << " -> " << (numSends_ - 1) << endl;
1409  std::cerr << os.str();
1410  }
1411  --numSends_;
1412  }
1413 
1414  // Invert map to see what msgs are received and what length
1415  computeReceives();
1416 
1417  // createFromRecvs() calls createFromSends(), but will set
1418  // howInitialized_ again after calling createFromSends().
1419  howInitialized_ = Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS;
1420 
1421  if (verbose_) {
1422  std::ostringstream os;
1423  os << *prefix << "Done; totalReceiveLength_="
1424  << totalReceiveLength_ << endl;
1425  std::cerr << os.str();
1426  }
1427  return totalReceiveLength_;
1428  }
1429 
1430  void
1432  createFromSendsAndRecvs (const Teuchos::ArrayView<const int>& exportProcIDs,
1433  const Teuchos::ArrayView<const int>& remoteProcIDs)
1434  {
1435  std::unique_ptr<std::string> prefix;
1436  if (verbose_) {
1437  prefix = createPrefix("createFromSendsAndRecvs");
1438  std::ostringstream os;
1439  os << *prefix << "Start" << std::endl;
1440  std::cerr << os.str();
1441  }
1442 
1443  // note the exportProcIDs and remoteProcIDs _must_ be a list that has
1444  // an entry for each GID. If the export/remoteProcIDs is taken from
1445  // the getProcs{From|To} lists that are extracted from a previous distributor,
1446  // it will generate a wrong answer, because those lists have a unique entry
1447  // for each processor id. A version of this with lengthsTo and lengthsFrom
1448  // should be made.
1449 
1450  howInitialized_ = Tpetra::Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS_N_RECVS;
1451 
1452 
1453  int myProcID = comm_->getRank ();
1454  int numProcs = comm_->getSize();
1455 
1456  const size_t numExportIDs = exportProcIDs.size();
1457  Teuchos::Array<size_t> starts (numProcs + 1, 0);
1458 
1459  size_t numActive = 0;
1460  int needSendBuff = 0; // Boolean
1461 
1462  for(size_t i = 0; i < numExportIDs; i++ )
1463  {
1464  if( needSendBuff==0 && i && (exportProcIDs[i] < exportProcIDs[i-1]) )
1465  needSendBuff = 1;
1466  if( exportProcIDs[i] >= 0 )
1467  {
1468  ++starts[ exportProcIDs[i] ];
1469  ++numActive;
1470  }
1471  }
1472 
1473  selfMessage_ = ( starts[myProcID] != 0 ) ? 1 : 0;
1474 
1475  numSends_ = 0;
1476 
1477  if( needSendBuff ) //grouped by processor, no send buffer or indicesTo_ needed
1478  {
1479  if (starts[0] == 0 ) {
1480  numSends_ = 0;
1481  }
1482  else {
1483  numSends_ = 1;
1484  }
1485  for (Teuchos::Array<size_t>::iterator i=starts.begin()+1,
1486  im1=starts.begin();
1487  i != starts.end(); ++i)
1488  {
1489  if (*i != 0) ++numSends_;
1490  *i += *im1;
1491  im1 = i;
1492  }
1493  // starts[i] now contains the number of exports to procs 0 through i
1494 
1495  for (Teuchos::Array<size_t>::reverse_iterator ip1=starts.rbegin(),
1496  i=starts.rbegin()+1;
1497  i != starts.rend(); ++i)
1498  {
1499  *ip1 = *i;
1500  ip1 = i;
1501  }
1502  starts[0] = 0;
1503  // starts[i] now contains the number of exports to procs 0 through
1504  // i-1, i.e., all procs before proc i
1505 
1506  indicesTo_.resize(numActive);
1507 
1508  for (size_t i = 0; i < numExportIDs; ++i) {
1509  if (exportProcIDs[i] >= 0) {
1510  // record the offset to the sendBuffer for this export
1511  indicesTo_[starts[exportProcIDs[i]]] = i;
1512  // now increment the offset for this proc
1513  ++starts[exportProcIDs[i]];
1514  }
1515  }
1516  for (int proc = numProcs-1; proc != 0; --proc) {
1517  starts[proc] = starts[proc-1];
1518  }
1519  starts.front() = 0;
1520  starts[numProcs] = numActive;
1521  procsTo_.resize(numSends_);
1522  startsTo_.resize(numSends_);
1523  lengthsTo_.resize(numSends_);
1524  maxSendLength_ = 0;
1525  size_t snd = 0;
1526  for (int proc = 0; proc < numProcs; ++proc ) {
1527  if (starts[proc+1] != starts[proc]) {
1528  lengthsTo_[snd] = starts[proc+1] - starts[proc];
1529  startsTo_[snd] = starts[proc];
1530  // record max length for all off-proc sends
1531  if ((proc != myProcID) && (lengthsTo_[snd] > maxSendLength_)) {
1532  maxSendLength_ = lengthsTo_[snd];
1533  }
1534  procsTo_[snd] = proc;
1535  ++snd;
1536  }
1537  }
1538  }
1539  else {
1540  // grouped by proc, no send buffer or indicesTo_ needed
1541  numSends_ = 0;
1542  // Count total number of sends, i.e., total number of procs to
1543  // which we are sending. This includes myself, if applicable.
1544  for (int i = 0; i < numProcs; ++i) {
1545  if (starts[i]) {
1546  ++numSends_;
1547  }
1548  }
1549 
1550  // Not only do we not need these, but we must clear them, as
1551  // empty status of indicesTo is a flag used later.
1552  indicesTo_.resize(0);
1553  // Size these to numSends_; note, at the moment, numSends_
1554  // includes self sends. Set their values to zeros.
1555  procsTo_.assign(numSends_,0);
1556  startsTo_.assign(numSends_,0);
1557  lengthsTo_.assign(numSends_,0);
1558 
1559  // set startsTo to the offset for each send (i.e., each proc ID)
1560  // set procsTo to the proc ID for each send
1561  // in interpreting this code, remember that we are assuming contiguity
1562  // that is why index skips through the ranks
1563  {
1564  size_t index = 0, procIndex = 0;
1565  for (size_t i = 0; i < numSends_; ++i) {
1566  while (exportProcIDs[procIndex] < 0) {
1567  ++procIndex; // skip all negative proc IDs
1568  }
1569  startsTo_[i] = procIndex;
1570  int procID = exportProcIDs[procIndex];
1571  procsTo_[i] = procID;
1572  index += starts[procID];
1573  procIndex += starts[procID];
1574  }
1575  }
1576  // sort the startsTo and proc IDs together, in ascending order, according
1577  // to proc IDs
1578  if (numSends_ > 0) {
1579  sort2(procsTo_.begin(), procsTo_.end(), startsTo_.begin());
1580  }
1581  // compute the maximum send length
1582  maxSendLength_ = 0;
1583  for (size_t i = 0; i < numSends_; ++i) {
1584  int procID = procsTo_[i];
1585  lengthsTo_[i] = starts[procID];
1586  if ((procID != myProcID) && (lengthsTo_[i] > maxSendLength_)) {
1587  maxSendLength_ = lengthsTo_[i];
1588  }
1589  }
1590  }
1591 
1592 
1593  numSends_ -= selfMessage_;
1594  std::vector<int> recv_list;
1595  recv_list.reserve(numSends_); //reserve an initial guess for size needed
1596 
1597  int last_pid=-2;
1598  for(int i=0; i<remoteProcIDs.size(); i++) {
1599  if(remoteProcIDs[i]>last_pid) {
1600  recv_list.push_back(remoteProcIDs[i]);
1601  last_pid = remoteProcIDs[i];
1602  }
1603  else if (remoteProcIDs[i]<last_pid)
1604  throw std::runtime_error("Tpetra::Distributor:::createFromSendsAndRecvs expected RemotePIDs to be in sorted order");
1605  }
1606  numReceives_ = recv_list.size();
1607  if(numReceives_) {
1608  procsFrom_.assign(numReceives_,0);
1609  lengthsFrom_.assign(numReceives_,0);
1610  indicesFrom_.assign(numReceives_,0);
1611  startsFrom_.assign(numReceives_,0);
1612  }
1613  for(size_t i=0,j=0; i<numReceives_; ++i) {
1614  int jlast=j;
1615  procsFrom_[i] = recv_list[i];
1616  startsFrom_[i] = j;
1617  for( ; j<(size_t)remoteProcIDs.size() &&
1618  remoteProcIDs[jlast]==remoteProcIDs[j] ; j++){;}
1619  lengthsFrom_[i] = j-jlast;
1620  }
1621  totalReceiveLength_ = remoteProcIDs.size();
1622  indicesFrom_.clear ();
1623  // NOTE (mfh 13 Feb 2019): Epetra_MpiDistributor deliberately does
1624  // _not_ fill indicesFrom_ (what it calls "indices_from_") like
1625  // this; it leaves indicesFrom_ empty. The comment there mentions
1626  // that not filling indicesFrom_ helps reverse mode correctness.
1627 #if 0
1628  indicesFrom_.reserve (totalReceiveLength_);
1629  for (size_t i = 0; i < totalReceiveLength_; ++i) {
1630  indicesFrom_.push_back(i);
1631  }
1632 #endif // 0
1633  numReceives_-=selfMessage_;
1634 
1635  if (verbose_) {
1636  std::ostringstream os;
1637  os << *prefix << "Done" << std::endl;
1638  std::cerr << os.str();
1639  }
1640  }
1641 
1642 } // namespace Tpetra
size_t getNumReceives() const
The number of processes from which we will receive data.
std::string description() const
Return a one-line description of this object.
Teuchos::RCP< Distributor > getReverse(bool create=true) const
A reverse communication plan Distributor.
Declaration of a function that prints strings from each process.
EDistributorHowInitialized
Enum indicating how and whether a Distributor was initialized.
Teuchos::RCP< const Teuchos::ParameterList > getValidParameters() const
List of valid Distributor parameters.
static bool debug()
Whether Tpetra is in debug mode.
void verbosePrintArray(std::ostream &out, const ArrayType &x, const char name[], const size_t maxNumToPrint)
Print min(x.size(), maxNumToPrint) entries of x.
void swap(Distributor &rhs)
Swap the contents of rhs with those of *this.
std::string DistributorSendTypeEnumToString(EDistributorSendType sendType)
Convert an EDistributorSendType enum value to a string.
Teuchos::ArrayView< const size_t > getLengthsFrom() const
Number of values this process will receive from each process.
Teuchos::ArrayView< const int > getProcsFrom() const
Ranks of the processes sending values to this process.
size_t createFromSends(const Teuchos::ArrayView< const int > &exportProcIDs)
Set up Distributor using list of process ranks to which this process will send.
void gathervPrint(std::ostream &out, const std::string &s, const Teuchos::Comm< int > &comm)
On Process 0 in the given communicator, print strings from each process in that communicator, in rank order.
Teuchos::ArrayView< const int > getProcsTo() const
Ranks of the processes to which this process will send values.
void createFromSendsAndRecvs(const Teuchos::ArrayView< const int > &exportProcIDs, const Teuchos::ArrayView< const int > &remoteProcIDs)
Set up Distributor using list of process ranks to which to send, and list of process ranks from which...
bool hasSelfMessage() const
Whether the calling process will send or receive messages to itself.
Sets up and executes a communication plan for a Tpetra DistObject.
static bool verbose()
Whether Tpetra is in verbose mode.
size_t getTotalReceiveLength() const
Total number of values this process will receive from other processes.
void setParameterList(const Teuchos::RCP< Teuchos::ParameterList > &plist)
Set Distributor parameters.
#define TPETRA_EFFICIENCY_WARNING(throw_exception_test, Exception, msg)
Print or throw an efficency warning.
Teuchos::ArrayView< const size_t > getLengthsTo() const
Number of values this process will send to each process.
Teuchos::Array< std::string > distributorSendTypes()
Valid values for Distributor&#39;s &quot;Send type&quot; parameter.
void sort2(const IT1 &first1, const IT1 &last1, const IT2 &first2)
Sort the first array, and apply the resulting permutation to the second array.
std::string DistributorHowInitializedEnumToString(EDistributorHowInitialized how)
Convert an EDistributorHowInitialized enum value to a string.
Stand-alone utility functions and macros.
static size_t verbosePrintCountThreshold()
Number of entries below which arrays, lists, etc. will be printed in debug mode.
size_t getNumSends() const
The number of processes to which we will send data.
void describe(Teuchos::FancyOStream &out, const Teuchos::EVerbosityLevel verbLevel=Teuchos::Describable::verbLevel_default) const
Describe this object in a human-readable way to the given output stream.
size_t getMaxSendLength() const
Maximum number of values this process will send to another single process.
#define SHARED_TEST_FOR_EXCEPTION(throw_exception_test, Exception, msg, comm)
Test for exception, with reduction over the given communicator.
std::unique_ptr< std::string > createPrefix(const int myRank, const char prefix[])
Create string prefix for each line of verbose output.
EDistributorSendType
The type of MPI send that Distributor should use.
Distributor(const Teuchos::RCP< const Teuchos::Comm< int > > &comm)
Construct using the specified communicator and default parameters.
Declaration of Tpetra::Details::Behavior, a class that describes Tpetra&#39;s behavior.