39 #include "Tpetra_Distributor.hpp"
43 #include "Tpetra_Details_makeValidVerboseStream.hpp"
44 #include "Teuchos_StandardParameterEntryValidators.hpp"
45 #include "Teuchos_VerboseObjectParameterListHelpers.hpp"
53 if (sendType == DISTRIBUTOR_ISEND) {
56 else if (sendType == DISTRIBUTOR_RSEND) {
59 else if (sendType == DISTRIBUTOR_SEND) {
62 else if (sendType == DISTRIBUTOR_SSEND) {
66 TEUCHOS_TEST_FOR_EXCEPTION(
true, std::invalid_argument,
"Invalid "
67 "EDistributorSendType enum value " << sendType <<
".");
75 case Details::DISTRIBUTOR_NOT_INITIALIZED:
76 return "Not initialized yet";
77 case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS:
78 return "By createFromSends";
79 case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_RECVS:
80 return "By createFromRecvs";
81 case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS_N_RECVS:
82 return "By createFromSendsAndRecvs";
83 case Details::DISTRIBUTOR_INITIALIZED_BY_REVERSE:
84 return "By createReverseDistributor";
85 case Details::DISTRIBUTOR_INITIALIZED_BY_COPY:
86 return "By copy constructor";
93 Teuchos::Array<std::string>
96 Teuchos::Array<std::string> sendTypes;
97 sendTypes.push_back (
"Isend");
98 sendTypes.push_back (
"Rsend");
99 sendTypes.push_back (
"Send");
100 sendTypes.push_back (
"Ssend");
110 const bool tpetraDistributorDebugDefault =
false;
112 const bool barrierBetween_default =
false;
114 const bool useDistinctTags_default =
true;
117 int Distributor::getTag (
const int pathTag)
const {
118 return useDistinctTags_ ? pathTag : comm_->getTag ();
122 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
123 void Distributor::makeTimers () {
124 timer_doWaits_ = Teuchos::TimeMonitor::getNewTimer (
125 "Tpetra::Distributor: doWaits");
127 timer_doPosts3TA_ = Teuchos::TimeMonitor::getNewTimer (
128 "Tpetra::Distributor: doPosts(3) TA");
129 timer_doPosts4TA_ = Teuchos::TimeMonitor::getNewTimer (
130 "Tpetra::Distributor: doPosts(4) TA");
132 timer_doPosts3TA_recvs_ = Teuchos::TimeMonitor::getNewTimer (
133 "Tpetra::Distributor: doPosts(3): recvs TA");
134 timer_doPosts4TA_recvs_ = Teuchos::TimeMonitor::getNewTimer (
135 "Tpetra::Distributor: doPosts(4): recvs TA");
137 timer_doPosts3TA_barrier_ = Teuchos::TimeMonitor::getNewTimer (
138 "Tpetra::Distributor: doPosts(3): barrier TA");
139 timer_doPosts4TA_barrier_ = Teuchos::TimeMonitor::getNewTimer (
140 "Tpetra::Distributor: doPosts(4): barrier TA");
142 timer_doPosts3TA_sends_ = Teuchos::TimeMonitor::getNewTimer (
143 "Tpetra::Distributor: doPosts(3): sends TA");
144 timer_doPosts4TA_sends_ = Teuchos::TimeMonitor::getNewTimer (
145 "Tpetra::Distributor: doPosts(4): sends TA");
146 timer_doPosts3TA_sends_slow_ = Teuchos::TimeMonitor::getNewTimer (
147 "Tpetra::Distributor: doPosts(3): sends TA SLOW");
148 timer_doPosts4TA_sends_slow_ = Teuchos::TimeMonitor::getNewTimer (
149 "Tpetra::Distributor: doPosts(4): sends TA SLOW");
150 timer_doPosts3TA_sends_fast_ = Teuchos::TimeMonitor::getNewTimer (
151 "Tpetra::Distributor: doPosts(3): sends TA FAST");
152 timer_doPosts4TA_sends_fast_ = Teuchos::TimeMonitor::getNewTimer (
153 "Tpetra::Distributor: doPosts(4): sends TA FAST");
155 timer_doPosts3KV_ = Teuchos::TimeMonitor::getNewTimer (
156 "Tpetra::Distributor: doPosts(3) KV");
157 timer_doPosts4KV_ = Teuchos::TimeMonitor::getNewTimer (
158 "Tpetra::Distributor: doPosts(4) KV");
160 timer_doPosts3KV_recvs_ = Teuchos::TimeMonitor::getNewTimer (
161 "Tpetra::Distributor: doPosts(3): recvs KV");
162 timer_doPosts4KV_recvs_ = Teuchos::TimeMonitor::getNewTimer (
163 "Tpetra::Distributor: doPosts(4): recvs KV");
165 timer_doPosts3KV_barrier_ = Teuchos::TimeMonitor::getNewTimer (
166 "Tpetra::Distributor: doPosts(3): barrier KV");
167 timer_doPosts4KV_barrier_ = Teuchos::TimeMonitor::getNewTimer (
168 "Tpetra::Distributor: doPosts(4): barrier KV");
170 timer_doPosts3KV_sends_ = Teuchos::TimeMonitor::getNewTimer (
171 "Tpetra::Distributor: doPosts(3): sends KV");
172 timer_doPosts4KV_sends_ = Teuchos::TimeMonitor::getNewTimer (
173 "Tpetra::Distributor: doPosts(4): sends KV");
174 timer_doPosts3KV_sends_slow_ = Teuchos::TimeMonitor::getNewTimer (
175 "Tpetra::Distributor: doPosts(3): sends KV SLOW");
176 timer_doPosts4KV_sends_slow_ = Teuchos::TimeMonitor::getNewTimer (
177 "Tpetra::Distributor: doPosts(4): sends KV SLOW");
178 timer_doPosts3KV_sends_fast_ = Teuchos::TimeMonitor::getNewTimer (
179 "Tpetra::Distributor: doPosts(3): sends KV FAST");
180 timer_doPosts4KV_sends_fast_ = Teuchos::TimeMonitor::getNewTimer (
181 "Tpetra::Distributor: doPosts(4): sends KV FAST");
183 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
187 const Teuchos::RCP<Teuchos::FancyOStream>& ,
188 const Teuchos::RCP<Teuchos::ParameterList>& plist)
190 , howInitialized_ (Details::DISTRIBUTOR_NOT_INITIALIZED)
191 , sendType_ (Details::DISTRIBUTOR_SEND)
192 , barrierBetween_ (barrierBetween_default)
193 , selfMessage_ (false)
197 , totalReceiveLength_ (0)
198 , lastRoundBytesSend_ (0)
199 , lastRoundBytesRecv_ (0)
200 , useDistinctTags_ (useDistinctTags_default)
203 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
205 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
215 const Teuchos::RCP<Teuchos::FancyOStream>& out)
221 const Teuchos::RCP<Teuchos::ParameterList>& plist)
227 : comm_ (distributor.comm_)
228 , howInitialized_ (Details::DISTRIBUTOR_INITIALIZED_BY_COPY)
229 , sendType_ (distributor.sendType_)
230 , barrierBetween_ (distributor.barrierBetween_)
231 , verbose_ (distributor.verbose_)
232 , selfMessage_ (distributor.selfMessage_)
233 , numSends_ (distributor.numSends_)
234 , procsTo_ (distributor.procsTo_)
235 , startsTo_ (distributor.startsTo_)
236 , lengthsTo_ (distributor.lengthsTo_)
237 , maxSendLength_ (distributor.maxSendLength_)
238 , indicesTo_ (distributor.indicesTo_)
239 , numReceives_ (distributor.numReceives_)
240 , totalReceiveLength_ (distributor.totalReceiveLength_)
241 , lengthsFrom_ (distributor.lengthsFrom_)
242 , procsFrom_ (distributor.procsFrom_)
243 , startsFrom_ (distributor.startsFrom_)
244 , indicesFrom_ (distributor.indicesFrom_)
245 , reverseDistributor_ (distributor.reverseDistributor_)
246 , lastRoundBytesSend_ (distributor.lastRoundBytesSend_)
247 , lastRoundBytesRecv_ (distributor.lastRoundBytesRecv_)
248 , useDistinctTags_ (distributor.useDistinctTags_)
250 using Teuchos::ParameterList;
254 RCP<const ParameterList> rhsList = distributor.getParameterList ();
255 RCP<ParameterList> newList = rhsList.is_null () ? Teuchos::null :
256 Teuchos::parameterList (*rhsList);
259 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
261 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
265 using Teuchos::ParameterList;
266 using Teuchos::parameterList;
269 std::swap (comm_, rhs.comm_);
270 std::swap (howInitialized_, rhs.howInitialized_);
271 std::swap (sendType_, rhs.sendType_);
272 std::swap (barrierBetween_, rhs.barrierBetween_);
273 std::swap (verbose_, rhs.verbose_);
274 std::swap (selfMessage_, rhs.selfMessage_);
275 std::swap (numSends_, rhs.numSends_);
276 std::swap (procsTo_, rhs.procsTo_);
277 std::swap (startsTo_, rhs.startsTo_);
278 std::swap (lengthsTo_, rhs.lengthsTo_);
279 std::swap (maxSendLength_, rhs.maxSendLength_);
280 std::swap (indicesTo_, rhs.indicesTo_);
281 std::swap (numReceives_, rhs.numReceives_);
282 std::swap (totalReceiveLength_, rhs.totalReceiveLength_);
283 std::swap (lengthsFrom_, rhs.lengthsFrom_);
284 std::swap (procsFrom_, rhs.procsFrom_);
285 std::swap (startsFrom_, rhs.startsFrom_);
286 std::swap (indicesFrom_, rhs.indicesFrom_);
287 std::swap (reverseDistributor_, rhs.reverseDistributor_);
288 std::swap (lastRoundBytesSend_, rhs.lastRoundBytesSend_);
289 std::swap (lastRoundBytesRecv_, rhs.lastRoundBytesRecv_);
290 std::swap (useDistinctTags_, rhs.useDistinctTags_);
294 RCP<ParameterList> lhsList = this->getNonconstParameterList ();
295 RCP<ParameterList> rhsList = rhs.getNonconstParameterList ();
296 if (lhsList.getRawPtr () == rhsList.getRawPtr () && ! rhsList.is_null ()) {
297 rhsList = parameterList (*rhsList);
299 if (! rhsList.is_null ()) {
300 this->setMyParamList (rhsList);
302 if (! lhsList.is_null ()) {
303 rhs.setMyParamList (lhsList);
311 Distributor::getVerbose()
317 std::unique_ptr<std::string>
319 createPrefix(
const char methodName[])
const
322 comm_.getRawPtr(),
"Distributor", methodName);
329 using ::Tpetra::Details::Behavior;
330 using Teuchos::FancyOStream;
331 using Teuchos::getIntegralValue;
332 using Teuchos::includesVerbLevel;
333 using Teuchos::ParameterList;
334 using Teuchos::parameterList;
338 if (! plist.is_null()) {
340 plist->validateParametersAndSetDefaults (*validParams);
342 const bool barrierBetween =
343 plist->get<
bool> (
"Barrier between receives and sends");
345 getIntegralValue<Details::EDistributorSendType> (*plist,
"Send type");
346 const bool useDistinctTags = plist->get<
bool> (
"Use distinct tags");
351 const bool enable_cuda_rdma =
352 plist->get<
bool> (
"Enable MPI CUDA RDMA support");
353 TEUCHOS_TEST_FOR_EXCEPTION
354 (! enable_cuda_rdma, std::invalid_argument,
"Tpetra::Distributor::"
355 "setParameterList: " <<
"You specified \"Enable MPI CUDA RDMA "
356 "support\" = false. This is no longer valid. You don't need to "
357 "specify this option any more; Tpetra assumes it is always true. "
358 "This is a very light assumption on the MPI implementation, and in "
359 "fact does not actually involve hardware or system RDMA support. "
360 "Tpetra just assumes that the MPI implementation can tell whether a "
361 "pointer points to host memory or CUDA device memory.");
368 TEUCHOS_TEST_FOR_EXCEPTION
369 (! barrierBetween && sendType == Details::DISTRIBUTOR_RSEND,
370 std::invalid_argument,
"Tpetra::Distributor::setParameterList: " << endl
371 <<
"You specified \"Send type\"=\"Rsend\", but turned off the barrier "
372 "between receives and sends." << endl <<
"This is invalid; you must "
373 "include the barrier if you use ready sends." << endl <<
"Ready sends "
374 "require that their corresponding receives have already been posted, "
375 "and the only way to guarantee that in general is with a barrier.");
378 sendType_ = sendType;
379 barrierBetween_ = barrierBetween;
380 useDistinctTags_ = useDistinctTags;
384 this->setMyParamList (plist);
388 Teuchos::RCP<const Teuchos::ParameterList>
391 using Teuchos::Array;
392 using Teuchos::ParameterList;
393 using Teuchos::parameterList;
395 using Teuchos::setStringToIntegralParameter;
397 const bool barrierBetween = barrierBetween_default;
398 const bool useDistinctTags = useDistinctTags_default;
399 const bool debug = tpetraDistributorDebugDefault;
402 const std::string defaultSendType (
"Send");
403 Array<Details::EDistributorSendType> sendTypeEnums;
404 sendTypeEnums.push_back (Details::DISTRIBUTOR_ISEND);
405 sendTypeEnums.push_back (Details::DISTRIBUTOR_RSEND);
406 sendTypeEnums.push_back (Details::DISTRIBUTOR_SEND);
407 sendTypeEnums.push_back (Details::DISTRIBUTOR_SSEND);
409 RCP<ParameterList> plist = parameterList (
"Tpetra::Distributor");
410 plist->set (
"Barrier between receives and sends", barrierBetween,
411 "Whether to execute a barrier between receives and sends in do"
412 "[Reverse]Posts(). Required for correctness when \"Send type\""
413 "=\"Rsend\", otherwise correct but not recommended.");
414 setStringToIntegralParameter<Details::EDistributorSendType> (
"Send type",
415 defaultSendType,
"When using MPI, the variant of send to use in "
416 "do[Reverse]Posts()", sendTypes(), sendTypeEnums(), plist.getRawPtr());
417 plist->set (
"Use distinct tags", useDistinctTags,
"Whether to use distinct "
418 "MPI message tags for different code paths. Highly recommended"
419 " to avoid message collisions.");
420 plist->set (
"Debug", debug,
"Whether to print copious debugging output on "
422 plist->set (
"Timer Label",
"",
"Label for Time Monitor output");
423 plist->set (
"Enable MPI CUDA RDMA support",
true,
"Assume that MPI can "
424 "tell whether a pointer points to host memory or CUDA device "
425 "memory. You don't need to specify this option any more; "
426 "Tpetra assumes it is always true. This is a very light "
427 "assumption on the MPI implementation, and in fact does not "
428 "actually involve hardware or system RDMA support.");
436 Teuchos::setupVerboseObjectSublist (&*plist);
437 return Teuchos::rcp_const_cast<
const ParameterList> (plist);
442 {
return totalReceiveLength_; }
445 {
return numReceives_; }
448 {
return selfMessage_; }
451 {
return numSends_; }
454 {
return maxSendLength_; }
457 {
return procsFrom_; }
460 {
return lengthsFrom_; }
466 {
return lengthsTo_; }
468 Teuchos::RCP<Distributor>
470 if (reverseDistributor_.is_null () && create) {
471 createReverseDistributor ();
473 TEUCHOS_TEST_FOR_EXCEPTION
474 (reverseDistributor_.is_null () && create, std::logic_error,
"The reverse "
475 "Distributor is null after createReverseDistributor returned. "
476 "Please report this bug to the Tpetra developers.");
477 return reverseDistributor_;
482 Distributor::createReverseDistributor()
const
484 reverseDistributor_ = Teuchos::rcp(
new Distributor(comm_));
485 reverseDistributor_->howInitialized_ = Details::DISTRIBUTOR_INITIALIZED_BY_REVERSE;
486 reverseDistributor_->sendType_ = sendType_;
487 reverseDistributor_->barrierBetween_ = barrierBetween_;
488 reverseDistributor_->verbose_ = verbose_;
493 size_t totalSendLength =
494 std::accumulate (lengthsTo_.begin(), lengthsTo_.end(), 0);
499 size_t maxReceiveLength = 0;
500 const int myProcID = comm_->getRank();
501 for (
size_t i=0; i < numReceives_; ++i) {
502 if (procsFrom_[i] != myProcID) {
504 if (lengthsFrom_[i] > maxReceiveLength) {
505 maxReceiveLength = lengthsFrom_[i];
514 reverseDistributor_->selfMessage_ = selfMessage_;
515 reverseDistributor_->numSends_ = numReceives_;
516 reverseDistributor_->procsTo_ = procsFrom_;
517 reverseDistributor_->startsTo_ = startsFrom_;
518 reverseDistributor_->lengthsTo_ = lengthsFrom_;
519 reverseDistributor_->maxSendLength_ = maxReceiveLength;
520 reverseDistributor_->indicesTo_ = indicesFrom_;
521 reverseDistributor_->numReceives_ = numSends_;
522 reverseDistributor_->totalReceiveLength_ = totalSendLength;
523 reverseDistributor_->lengthsFrom_ = lengthsTo_;
524 reverseDistributor_->procsFrom_ = procsTo_;
525 reverseDistributor_->startsFrom_ = startsTo_;
526 reverseDistributor_->indicesFrom_ = indicesTo_;
535 reverseDistributor_->lastRoundBytesSend_ = 0;
536 reverseDistributor_->lastRoundBytesRecv_ = 0;
538 reverseDistributor_->useDistinctTags_ = useDistinctTags_;
551 reverseDistributor_->reverseDistributor_ = Teuchos::null;
557 using Teuchos::Array;
558 using Teuchos::CommRequest;
559 using Teuchos::FancyOStream;
560 using Teuchos::includesVerbLevel;
561 using Teuchos::is_null;
563 using Teuchos::waitAll;
566 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
567 Teuchos::TimeMonitor timeMon (*timer_doWaits_);
568 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
572 std::unique_ptr<std::string> prefix;
574 prefix = createPrefix(
"doWaits");
575 std::ostringstream os;
576 os << *prefix <<
"Start: requests_.size(): "
577 << requests_.size() << endl;
578 std::cerr << os.str();
581 if (requests_.size() > 0) {
582 waitAll(*comm_, requests_());
586 for (
auto it = requests_.begin(); it != requests_.end(); ++it) {
587 TEUCHOS_TEST_FOR_EXCEPTION
588 (! is_null(*it), std::runtime_error,
589 "Tpetra::Distributor::doWaits: Communication requests "
590 "should all be null aftr calling Teuchos::waitAll on "
591 "them, but at least one request is not null.");
596 requests_.resize (0);
600 const int localSizeNonzero = (requests_.size () != 0) ? 1 : 0;
601 int globalSizeNonzero = 0;
602 Teuchos::reduceAll<int, int> (*comm_, Teuchos::REDUCE_MAX,
604 Teuchos::outArg (globalSizeNonzero));
605 TEUCHOS_TEST_FOR_EXCEPTION(
606 globalSizeNonzero != 0, std::runtime_error,
607 "Tpetra::Distributor::doWaits: After waitAll, at least one process has "
608 "a nonzero number of outstanding posts. There should be none at this "
609 "point. Please report this bug to the Tpetra developers.");
613 std::ostringstream os;
614 os << *prefix <<
"Done" << endl;
615 std::cerr << os.str();
621 if (! reverseDistributor_.is_null()) {
622 reverseDistributor_->doWaits();
627 std::ostringstream out;
629 out <<
"\"Tpetra::Distributor\": {";
630 const std::string label = this->getObjectLabel ();
632 out <<
"Label: " << label <<
", ";
634 out <<
"How initialized: "
638 << DistributorSendTypeEnumToString (sendType_)
639 <<
", Barrier between receives and sends: "
640 << (barrierBetween_ ?
"true" :
"false")
641 <<
", Use distinct tags: "
642 << (useDistinctTags_ ?
"true" :
"false")
643 <<
", Debug: " << (verbose_ ?
"true" :
"false")
650 localDescribeToString (
const Teuchos::EVerbosityLevel vl)
const
652 using Teuchos::toString;
653 using Teuchos::VERB_HIGH;
654 using Teuchos::VERB_EXTREME;
658 if (vl <= Teuchos::VERB_LOW || comm_.is_null ()) {
659 return std::string ();
662 auto outStringP = Teuchos::rcp (
new std::ostringstream ());
663 auto outp = Teuchos::getFancyOStream (outStringP);
664 Teuchos::FancyOStream& out = *outp;
666 const int myRank = comm_->getRank ();
667 const int numProcs = comm_->getSize ();
668 out <<
"Process " << myRank <<
" of " << numProcs <<
":" << endl;
669 Teuchos::OSTab tab1 (out);
673 if (vl == VERB_HIGH || vl == VERB_EXTREME) {
674 out <<
"procsTo: " << toString (procsTo_) << endl;
675 out <<
"lengthsTo: " << toString (lengthsTo_) << endl;
678 if (vl == VERB_EXTREME) {
679 out <<
"startsTo: " << toString (startsTo_) << endl;
680 out <<
"indicesTo: " << toString (indicesTo_) << endl;
682 if (vl == VERB_HIGH || vl == VERB_EXTREME) {
685 out <<
"lengthsFrom: " << toString (lengthsFrom_) << endl;
686 out <<
"startsFrom: " << toString (startsFrom_) << endl;
687 out <<
"procsFrom: " << toString (procsFrom_) << endl;
691 return outStringP->str ();
697 const Teuchos::EVerbosityLevel verbLevel)
const
700 using Teuchos::VERB_DEFAULT;
701 using Teuchos::VERB_NONE;
702 using Teuchos::VERB_LOW;
703 using Teuchos::VERB_MEDIUM;
704 using Teuchos::VERB_HIGH;
705 using Teuchos::VERB_EXTREME;
706 const Teuchos::EVerbosityLevel vl =
707 (verbLevel == VERB_DEFAULT) ? VERB_LOW : verbLevel;
709 if (vl == VERB_NONE) {
717 if (comm_.is_null ()) {
720 const int myRank = comm_->getRank ();
721 const int numProcs = comm_->getSize ();
730 Teuchos::RCP<Teuchos::OSTab> tab0, tab1;
736 tab0 = Teuchos::rcp (
new Teuchos::OSTab (out));
739 out <<
"\"Tpetra::Distributor\":" << endl;
740 tab1 = Teuchos::rcp (
new Teuchos::OSTab (out));
742 const std::string label = this->getObjectLabel ();
744 out <<
"Label: " << label << endl;
746 out <<
"Number of processes: " << numProcs << endl
747 <<
"How initialized: "
751 out <<
"Parameters: " << endl;
752 Teuchos::OSTab tab2 (out);
753 out <<
"\"Send type\": "
754 << DistributorSendTypeEnumToString (sendType_) << endl
755 <<
"\"Barrier between receives and sends\": "
756 << (barrierBetween_ ?
"true" :
"false") << endl
757 <<
"\"Use distinct tags\": "
758 << (useDistinctTags_ ?
"true" :
"false") << endl
759 <<
"\"Debug\": " << (verbose_ ?
"true" :
"false") << endl;
765 const std::string lclStr = this->localDescribeToString (vl);
769 out <<
"Reverse Distributor:";
770 if (reverseDistributor_.is_null ()) {
771 out <<
" null" << endl;
775 reverseDistributor_->describe (out, vl);
783 using Teuchos::Array;
784 using Teuchos::ArrayRCP;
786 using Teuchos::CommStatus;
787 using Teuchos::CommRequest;
788 using Teuchos::ireceive;
791 using Teuchos::REDUCE_SUM;
792 using Teuchos::receive;
793 using Teuchos::reduce;
794 using Teuchos::scatter;
796 using Teuchos::waitAll;
799 const int myRank = comm_->getRank();
800 const int numProcs = comm_->getSize();
803 const int pathTag = 2;
804 const int tag = this->getTag (pathTag);
806 std::unique_ptr<std::string> prefix;
808 prefix = createPrefix(
"computeReceives");
809 std::ostringstream os;
811 <<
"selfMessage_: " << (selfMessage_ ?
"true" :
"false")
812 <<
", pathTag: " << pathTag <<
", tag: " << tag << endl;
813 std::cerr << os.str();
822 Array<int> toProcsFromMe (numProcs, 0);
823 #ifdef HAVE_TEUCHOS_DEBUG
824 bool counting_error =
false;
825 #endif // HAVE_TEUCHOS_DEBUG
826 for (
size_t i = 0; i < (numSends_ + (selfMessage_ ? 1 : 0)); ++i) {
827 #ifdef HAVE_TEUCHOS_DEBUG
828 if (toProcsFromMe[procsTo_[i]] != 0) {
829 counting_error =
true;
831 #endif // HAVE_TEUCHOS_DEBUG
832 toProcsFromMe[procsTo_[i]] = 1;
834 #ifdef HAVE_TEUCHOS_DEBUG
836 "Tpetra::Distributor::computeReceives: There was an error on at least "
837 "one process in counting the number of messages send by that process to "
838 "the other processs. Please report this bug to the Tpetra developers.",
840 #endif // HAVE_TEUCHOS_DEBUG
843 std::ostringstream os;
844 os << *prefix <<
"Reduce & scatter" << endl;
845 std::cerr << os.str();
901 Array<int> numRecvsOnEachProc;
902 if (myRank == root) {
903 numRecvsOnEachProc.resize (numProcs);
905 int numReceivesAsInt = 0;
906 reduce<int, int> (toProcsFromMe.getRawPtr (),
907 numRecvsOnEachProc.getRawPtr (),
908 numProcs, REDUCE_SUM, root, *comm_);
909 scatter<int, int> (numRecvsOnEachProc.getRawPtr (), 1,
910 &numReceivesAsInt, 1, root, *comm_);
911 numReceives_ =
static_cast<size_t> (numReceivesAsInt);
917 lengthsFrom_.assign (numReceives_, 0);
918 procsFrom_.assign (numReceives_, 0);
934 const size_t actualNumReceives = numReceives_ - (selfMessage_ ? 1 : 0);
940 Array<RCP<CommRequest<int> > > requests (actualNumReceives);
941 Array<ArrayRCP<size_t> > lengthsFromBuffers (actualNumReceives);
942 Array<RCP<CommStatus<int> > > statuses (actualNumReceives);
947 const int anySourceProc = MPI_ANY_SOURCE;
949 const int anySourceProc = -1;
953 std::ostringstream os;
954 os << *prefix <<
"Post " << actualNumReceives <<
" irecv"
955 << (actualNumReceives != size_t (1) ?
"s" :
"") << endl;
956 std::cerr << os.str();
960 for (
size_t i = 0; i < actualNumReceives; ++i) {
965 lengthsFromBuffers[i].resize (1);
966 lengthsFromBuffers[i][0] = as<size_t> (0);
967 requests[i] = ireceive<int, size_t> (lengthsFromBuffers[i], anySourceProc,
970 std::ostringstream os;
971 os << *prefix <<
"Posted any-proc irecv w/ tag " << tag << endl;
972 std::cerr << os.str();
977 std::ostringstream os;
978 os << *prefix <<
"Post " << numSends_ <<
" send"
979 << (numSends_ != size_t (1) ?
"s" :
"") << endl;
980 std::cerr << os.str();
990 for (
size_t i = 0; i < numSends_ + (selfMessage_ ? 1 : 0); ++i) {
991 if (procsTo_[i] != myRank) {
995 const size_t*
const lengthsTo_i = &lengthsTo_[i];
996 send<int, size_t> (lengthsTo_i, 1, as<int> (procsTo_[i]), tag, *comm_);
998 std::ostringstream os;
999 os << *prefix <<
"Posted send to Proc " << procsTo_[i] <<
" w/ tag "
1001 std::cerr << os.str();
1011 lengthsFrom_[numReceives_-1] = lengthsTo_[i];
1012 procsFrom_[numReceives_-1] = myRank;
1017 std::ostringstream os;
1018 const size_t numReq = requests.size();
1019 os << *prefix <<
"waitAll on " << numReq <<
" request"
1020 << (numReq != size_t(1) ?
"s" :
"") << endl;
1021 std::cerr << os.str();
1029 waitAll (*comm_, requests (), statuses ());
1030 for (
size_t i = 0; i < actualNumReceives; ++i) {
1031 lengthsFrom_[i] = *lengthsFromBuffers[i];
1032 procsFrom_[i] = statuses[i]->getSourceRank ();
1038 sort2 (procsFrom_.begin(), procsFrom_.end(), lengthsFrom_.begin());
1041 totalReceiveLength_ =
1042 std::accumulate (lengthsFrom_.begin (), lengthsFrom_.end (), 0);
1043 indicesFrom_.clear ();
1049 indicesFrom_.reserve (totalReceiveLength_);
1050 for (
size_t i = 0; i < totalReceiveLength_; ++i) {
1051 indicesFrom_.push_back(i);
1055 startsFrom_.clear ();
1056 startsFrom_.reserve (numReceives_);
1057 for (
size_t i = 0, j = 0; i < numReceives_; ++i) {
1058 startsFrom_.push_back(j);
1059 j += lengthsFrom_[i];
1067 std::ostringstream os;
1068 os << *prefix <<
"Done" << endl;
1069 std::cerr << os.str();
1077 using Teuchos::outArg;
1078 using Teuchos::REDUCE_MAX;
1079 using Teuchos::reduceAll;
1081 const char rawPrefix[] =
"Tpetra::Distributor::createFromSends";
1083 const size_t numExports = exportProcIDs.size();
1084 const int myProcID = comm_->getRank();
1085 const int numProcs = comm_->getSize();
1088 const size_t maxNumToPrint = verbose_ ?
1090 std::unique_ptr<std::string> prefix;
1092 prefix = createPrefix(
"createFromSends");
1093 std::ostringstream os;
1094 os << *prefix <<
"Start: ";
1098 std::cerr << os.str();
1149 Teuchos::Array<size_t> starts (numProcs + 1, 0);
1152 size_t numActive = 0;
1153 int needSendBuff = 0;
1156 for (
size_t i = 0; i < numExports; ++i) {
1157 const int exportID = exportProcIDs[i];
1158 if (exportID >= numProcs) {
1162 else if (exportID >= 0) {
1176 if (needSendBuff == 0 && starts[exportID] > 1 &&
1177 exportID != exportProcIDs[i-1]) {
1190 reduceAll<int, int> (*comm_, REDUCE_MAX, badID, outArg (gbl_badID));
1191 TEUCHOS_TEST_FOR_EXCEPTION
1192 (gbl_badID >= 0, std::runtime_error, rawPrefix <<
"Proc "
1193 << gbl_badID <<
", perhaps among other processes, got a bad "
1194 "send process ID.");
1210 #if defined(HAVE_TPETRA_THROW_EFFICIENCY_WARNINGS) || defined(HAVE_TPETRA_PRINT_EFFICIENCY_WARNINGS)
1212 int global_needSendBuff;
1213 reduceAll<int, int> (*comm_, REDUCE_MAX, needSendBuff,
1214 outArg (global_needSendBuff));
1216 global_needSendBuff != 0, std::runtime_error,
1217 "::createFromSends: Grouping export IDs together by process rank often "
1218 "improves performance.");
1223 std::ostringstream os;
1224 os << *prefix <<
"Detect whether I have a self message" << endl;
1225 std::cerr << os.str();
1230 if (starts[myProcID] != 0) {
1231 selfMessage_ =
true;
1234 selfMessage_ =
false;
1237 bool index_neq_numActive =
false;
1238 bool send_neq_numSends =
false;
1239 if (! needSendBuff) {
1241 std::ostringstream os;
1242 os << *prefix <<
"I don't need a send buffer or indicesTo_ "
1243 "(fast path)" << endl;
1244 std::cerr << os.str();
1250 for (
int i = 0; i < numProcs; ++i) {
1258 indicesTo_.resize(0);
1261 procsTo_.assign(numSends_,0);
1262 startsTo_.assign(numSends_,0);
1263 lengthsTo_.assign(numSends_,0);
1270 size_t index = 0, procIndex = 0;
1271 for (
size_t i = 0; i < numSends_; ++i) {
1272 while (exportProcIDs[procIndex] < 0) {
1275 startsTo_[i] = procIndex;
1276 int procID = exportProcIDs[procIndex];
1277 procsTo_[i] = procID;
1278 index += starts[procID];
1279 procIndex += starts[procID];
1281 if (index != numActive) {
1282 index_neq_numActive =
true;
1287 if (numSends_ > 0) {
1288 sort2(procsTo_.begin(), procsTo_.end(), startsTo_.begin());
1292 for (
size_t i = 0; i < numSends_; ++i) {
1293 int procID = procsTo_[i];
1294 lengthsTo_[i] = starts[procID];
1295 if ((procID != myProcID) && (lengthsTo_[i] > maxSendLength_)) {
1296 maxSendLength_ = lengthsTo_[i];
1302 std::ostringstream os;
1303 os << *prefix <<
"I need a send buffer & indicesTo_ "
1304 "(slow path)" << endl;
1305 std::cerr << os.str();
1313 if (starts[0] == 0 ) {
1319 for (Teuchos::Array<size_t>::iterator i=starts.begin()+1,
1321 i != starts.end(); ++i)
1323 if (*i != 0) ++numSends_;
1329 for (Teuchos::Array<size_t>::reverse_iterator ip1=starts.rbegin(),
1330 i=starts.rbegin()+1;
1331 i != starts.rend(); ++i)
1340 indicesTo_.resize(numActive);
1342 for (
size_t i = 0; i < numExports; ++i) {
1343 if (exportProcIDs[i] >= 0) {
1345 indicesTo_[starts[exportProcIDs[i]]] = i;
1347 ++starts[exportProcIDs[i]];
1359 for (
int proc = numProcs-1; proc != 0; --proc) {
1360 starts[proc] = starts[proc-1];
1363 starts[numProcs] = numActive;
1370 procsTo_.resize(numSends_);
1371 startsTo_.resize(numSends_);
1372 lengthsTo_.resize(numSends_);
1379 for (
int proc = 0; proc < numProcs; ++proc ) {
1380 if (starts[proc+1] != starts[proc]) {
1381 lengthsTo_[snd] = starts[proc+1] - starts[proc];
1382 startsTo_[snd] = starts[proc];
1384 if ((proc != myProcID) && (lengthsTo_[snd] > maxSendLength_)) {
1385 maxSendLength_ = lengthsTo_[snd];
1387 procsTo_[snd] = proc;
1391 if (snd != numSends_) {
1392 send_neq_numSends =
true;
1397 (index_neq_numActive, std::logic_error,
1398 rawPrefix <<
"logic error. Please notify the Tpetra team.", *comm_);
1400 (send_neq_numSends, std::logic_error,
1401 rawPrefix <<
"logic error. Please notify the Tpetra team.", *comm_);
1406 std::ostringstream os;
1407 os << *prefix <<
"Sending self message; numSends "
1408 << numSends_ <<
" -> " << (numSends_ - 1) << endl;
1409 std::cerr << os.str();
1419 howInitialized_ = Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS;
1422 std::ostringstream os;
1423 os << *prefix <<
"Done; totalReceiveLength_="
1424 << totalReceiveLength_ << endl;
1425 std::cerr << os.str();
1427 return totalReceiveLength_;
1433 const Teuchos::ArrayView<const int>& remoteProcIDs)
1435 std::unique_ptr<std::string> prefix;
1437 prefix = createPrefix(
"createFromSendsAndRecvs");
1438 std::ostringstream os;
1439 os << *prefix <<
"Start" << std::endl;
1440 std::cerr << os.str();
1450 howInitialized_ = Tpetra::Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS_N_RECVS;
1453 int myProcID = comm_->getRank ();
1454 int numProcs = comm_->getSize();
1456 const size_t numExportIDs = exportProcIDs.size();
1457 Teuchos::Array<size_t> starts (numProcs + 1, 0);
1459 size_t numActive = 0;
1460 int needSendBuff = 0;
1462 for(
size_t i = 0; i < numExportIDs; i++ )
1464 if( needSendBuff==0 && i && (exportProcIDs[i] < exportProcIDs[i-1]) )
1466 if( exportProcIDs[i] >= 0 )
1468 ++starts[ exportProcIDs[i] ];
1473 selfMessage_ = ( starts[myProcID] != 0 ) ? 1 : 0;
1479 if (starts[0] == 0 ) {
1485 for (Teuchos::Array<size_t>::iterator i=starts.begin()+1,
1487 i != starts.end(); ++i)
1489 if (*i != 0) ++numSends_;
1495 for (Teuchos::Array<size_t>::reverse_iterator ip1=starts.rbegin(),
1496 i=starts.rbegin()+1;
1497 i != starts.rend(); ++i)
1506 indicesTo_.resize(numActive);
1508 for (
size_t i = 0; i < numExportIDs; ++i) {
1509 if (exportProcIDs[i] >= 0) {
1511 indicesTo_[starts[exportProcIDs[i]]] = i;
1513 ++starts[exportProcIDs[i]];
1516 for (
int proc = numProcs-1; proc != 0; --proc) {
1517 starts[proc] = starts[proc-1];
1520 starts[numProcs] = numActive;
1521 procsTo_.resize(numSends_);
1522 startsTo_.resize(numSends_);
1523 lengthsTo_.resize(numSends_);
1526 for (
int proc = 0; proc < numProcs; ++proc ) {
1527 if (starts[proc+1] != starts[proc]) {
1528 lengthsTo_[snd] = starts[proc+1] - starts[proc];
1529 startsTo_[snd] = starts[proc];
1531 if ((proc != myProcID) && (lengthsTo_[snd] > maxSendLength_)) {
1532 maxSendLength_ = lengthsTo_[snd];
1534 procsTo_[snd] = proc;
1544 for (
int i = 0; i < numProcs; ++i) {
1552 indicesTo_.resize(0);
1555 procsTo_.assign(numSends_,0);
1556 startsTo_.assign(numSends_,0);
1557 lengthsTo_.assign(numSends_,0);
1564 size_t index = 0, procIndex = 0;
1565 for (
size_t i = 0; i < numSends_; ++i) {
1566 while (exportProcIDs[procIndex] < 0) {
1569 startsTo_[i] = procIndex;
1570 int procID = exportProcIDs[procIndex];
1571 procsTo_[i] = procID;
1572 index += starts[procID];
1573 procIndex += starts[procID];
1578 if (numSends_ > 0) {
1579 sort2(procsTo_.begin(), procsTo_.end(), startsTo_.begin());
1583 for (
size_t i = 0; i < numSends_; ++i) {
1584 int procID = procsTo_[i];
1585 lengthsTo_[i] = starts[procID];
1586 if ((procID != myProcID) && (lengthsTo_[i] > maxSendLength_)) {
1587 maxSendLength_ = lengthsTo_[i];
1593 numSends_ -= selfMessage_;
1594 std::vector<int> recv_list;
1595 recv_list.reserve(numSends_);
1598 for(
int i=0; i<remoteProcIDs.size(); i++) {
1599 if(remoteProcIDs[i]>last_pid) {
1600 recv_list.push_back(remoteProcIDs[i]);
1601 last_pid = remoteProcIDs[i];
1603 else if (remoteProcIDs[i]<last_pid)
1604 throw std::runtime_error(
"Tpetra::Distributor:::createFromSendsAndRecvs expected RemotePIDs to be in sorted order");
1606 numReceives_ = recv_list.size();
1608 procsFrom_.assign(numReceives_,0);
1609 lengthsFrom_.assign(numReceives_,0);
1610 indicesFrom_.assign(numReceives_,0);
1611 startsFrom_.assign(numReceives_,0);
1613 for(
size_t i=0,j=0; i<numReceives_; ++i) {
1615 procsFrom_[i] = recv_list[i];
1617 for( ; j<(size_t)remoteProcIDs.size() &&
1618 remoteProcIDs[jlast]==remoteProcIDs[j] ; j++){;}
1619 lengthsFrom_[i] = j-jlast;
1621 totalReceiveLength_ = remoteProcIDs.size();
1622 indicesFrom_.clear ();
1628 indicesFrom_.reserve (totalReceiveLength_);
1629 for (
size_t i = 0; i < totalReceiveLength_; ++i) {
1630 indicesFrom_.push_back(i);
1633 numReceives_-=selfMessage_;
1636 std::ostringstream os;
1637 os << *prefix <<
"Done" << std::endl;
1638 std::cerr << os.str();
size_t getNumReceives() const
The number of processes from which we will receive data.
std::string description() const
Return a one-line description of this object.
Teuchos::RCP< Distributor > getReverse(bool create=true) const
A reverse communication plan Distributor.
Declaration of a function that prints strings from each process.
EDistributorHowInitialized
Enum indicating how and whether a Distributor was initialized.
Teuchos::RCP< const Teuchos::ParameterList > getValidParameters() const
List of valid Distributor parameters.
static bool debug()
Whether Tpetra is in debug mode.
void verbosePrintArray(std::ostream &out, const ArrayType &x, const char name[], const size_t maxNumToPrint)
Print min(x.size(), maxNumToPrint) entries of x.
void swap(Distributor &rhs)
Swap the contents of rhs with those of *this.
std::string DistributorSendTypeEnumToString(EDistributorSendType sendType)
Convert an EDistributorSendType enum value to a string.
Teuchos::ArrayView< const size_t > getLengthsFrom() const
Number of values this process will receive from each process.
Teuchos::ArrayView< const int > getProcsFrom() const
Ranks of the processes sending values to this process.
size_t createFromSends(const Teuchos::ArrayView< const int > &exportProcIDs)
Set up Distributor using list of process ranks to which this process will send.
void gathervPrint(std::ostream &out, const std::string &s, const Teuchos::Comm< int > &comm)
On Process 0 in the given communicator, print strings from each process in that communicator, in rank order.
Teuchos::ArrayView< const int > getProcsTo() const
Ranks of the processes to which this process will send values.
void createFromSendsAndRecvs(const Teuchos::ArrayView< const int > &exportProcIDs, const Teuchos::ArrayView< const int > &remoteProcIDs)
Set up Distributor using list of process ranks to which to send, and list of process ranks from which...
bool hasSelfMessage() const
Whether the calling process will send or receive messages to itself.
Sets up and executes a communication plan for a Tpetra DistObject.
static bool verbose()
Whether Tpetra is in verbose mode.
size_t getTotalReceiveLength() const
Total number of values this process will receive from other processes.
void setParameterList(const Teuchos::RCP< Teuchos::ParameterList > &plist)
Set Distributor parameters.
#define TPETRA_EFFICIENCY_WARNING(throw_exception_test, Exception, msg)
Print or throw an efficency warning.
Teuchos::ArrayView< const size_t > getLengthsTo() const
Number of values this process will send to each process.
Teuchos::Array< std::string > distributorSendTypes()
Valid values for Distributor's "Send type" parameter.
void sort2(const IT1 &first1, const IT1 &last1, const IT2 &first2)
Sort the first array, and apply the resulting permutation to the second array.
std::string DistributorHowInitializedEnumToString(EDistributorHowInitialized how)
Convert an EDistributorHowInitialized enum value to a string.
Stand-alone utility functions and macros.
static size_t verbosePrintCountThreshold()
Number of entries below which arrays, lists, etc. will be printed in debug mode.
size_t getNumSends() const
The number of processes to which we will send data.
void describe(Teuchos::FancyOStream &out, const Teuchos::EVerbosityLevel verbLevel=Teuchos::Describable::verbLevel_default) const
Describe this object in a human-readable way to the given output stream.
size_t getMaxSendLength() const
Maximum number of values this process will send to another single process.
#define SHARED_TEST_FOR_EXCEPTION(throw_exception_test, Exception, msg, comm)
Test for exception, with reduction over the given communicator.
std::unique_ptr< std::string > createPrefix(const int myRank, const char prefix[])
Create string prefix for each line of verbose output.
EDistributorSendType
The type of MPI send that Distributor should use.
Distributor(const Teuchos::RCP< const Teuchos::Comm< int > > &comm)
Construct using the specified communicator and default parameters.
Declaration of Tpetra::Details::Behavior, a class that describes Tpetra's behavior.