41 #include "Tpetra_Distributor.hpp"
42 #include "Tpetra_Details_gathervPrint.hpp"
43 #include "Tpetra_Details_makeValidVerboseStream.hpp"
44 #include "Teuchos_StandardParameterEntryValidators.hpp"
45 #include "Teuchos_VerboseObjectParameterListHelpers.hpp"
53 if (sendType == DISTRIBUTOR_ISEND) {
56 else if (sendType == DISTRIBUTOR_RSEND) {
59 else if (sendType == DISTRIBUTOR_SEND) {
62 else if (sendType == DISTRIBUTOR_SSEND) {
66 TEUCHOS_TEST_FOR_EXCEPTION(
true, std::invalid_argument,
"Invalid "
67 "EDistributorSendType enum value " << sendType <<
".");
75 case Details::DISTRIBUTOR_NOT_INITIALIZED:
76 return "Not initialized yet";
77 case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS:
78 return "By createFromSends";
79 case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_RECVS:
80 return "By createFromRecvs";
81 case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS_N_RECVS:
82 return "By createFromSendsAndRecvs";
83 case Details::DISTRIBUTOR_INITIALIZED_BY_REVERSE:
84 return "By createReverseDistributor";
85 case Details::DISTRIBUTOR_INITIALIZED_BY_COPY:
86 return "By copy constructor";
93 Teuchos::Array<std::string>
96 Teuchos::Array<std::string> sendTypes;
97 sendTypes.push_back (
"Isend");
98 sendTypes.push_back (
"Rsend");
99 sendTypes.push_back (
"Send");
100 sendTypes.push_back (
"Ssend");
110 const bool tpetraDistributorDebugDefault =
false;
112 const bool barrierBetween_default =
false;
114 const bool useDistinctTags_default =
true;
117 int Distributor::getTag (
const int pathTag)
const {
118 return useDistinctTags_ ? pathTag : comm_->getTag ();
122 #ifdef TPETRA_DISTRIBUTOR_TIMERS
123 void Distributor::makeTimers () {
124 const std::string name_doPosts3 =
"Tpetra::Distributor: doPosts(3)";
125 const std::string name_doPosts4 =
"Tpetra::Distributor: doPosts(4)";
126 const std::string name_doWaits =
"Tpetra::Distributor: doWaits";
127 const std::string name_doPosts3_recvs =
"Tpetra::Distributor: doPosts(3): recvs";
128 const std::string name_doPosts4_recvs =
"Tpetra::Distributor: doPosts(4): recvs";
129 const std::string name_doPosts3_barrier =
"Tpetra::Distributor: doPosts(3): barrier";
130 const std::string name_doPosts4_barrier =
"Tpetra::Distributor: doPosts(4): barrier";
131 const std::string name_doPosts3_sends =
"Tpetra::Distributor: doPosts(3): sends";
132 const std::string name_doPosts4_sends =
"Tpetra::Distributor: doPosts(4): sends";
134 timer_doPosts3_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts3);
135 timer_doPosts4_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts4);
136 timer_doWaits_ = Teuchos::TimeMonitor::getNewTimer (name_doWaits);
137 timer_doPosts3_recvs_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts3_recvs);
138 timer_doPosts4_recvs_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts4_recvs);
139 timer_doPosts3_barrier_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts3_barrier);
140 timer_doPosts4_barrier_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts4_barrier);
141 timer_doPosts3_sends_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts3_sends);
142 timer_doPosts4_sends_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts4_sends);
144 #endif // TPETRA_DISTRIBUTOR_TIMERS
148 const Teuchos::RCP<Teuchos::FancyOStream>& out,
149 const Teuchos::RCP<Teuchos::ParameterList>& plist)
151 , out_ (::Tpetra::Details::makeValidVerboseStream (out))
152 , howInitialized_ (Details::DISTRIBUTOR_NOT_INITIALIZED)
153 , sendType_ (Details::DISTRIBUTOR_SEND)
154 , barrierBetween_ (barrierBetween_default)
155 , verbose_ (tpetraDistributorDebugDefault)
156 , selfMessage_ (false)
160 , totalReceiveLength_ (0)
161 , lastRoundBytesSend_ (0)
162 , lastRoundBytesRecv_ (0)
163 , useDistinctTags_ (useDistinctTags_default)
165 TEUCHOS_ASSERT( ! out_.is_null () );
168 #ifdef TPETRA_DISTRIBUTOR_TIMERS
170 #endif // TPETRA_DISTRIBUTOR_TIMERS
180 const Teuchos::RCP<Teuchos::FancyOStream>& out)
186 const Teuchos::RCP<Teuchos::ParameterList>& plist)
192 : comm_ (distributor.comm_)
193 , out_ (distributor.out_)
194 , howInitialized_ (Details::DISTRIBUTOR_INITIALIZED_BY_COPY)
195 , sendType_ (distributor.sendType_)
196 , barrierBetween_ (distributor.barrierBetween_)
197 , verbose_ (distributor.verbose_)
198 , selfMessage_ (distributor.selfMessage_)
199 , numSends_ (distributor.numSends_)
200 , procsTo_ (distributor.procsTo_)
201 , startsTo_ (distributor.startsTo_)
202 , lengthsTo_ (distributor.lengthsTo_)
203 , maxSendLength_ (distributor.maxSendLength_)
204 , indicesTo_ (distributor.indicesTo_)
205 , numReceives_ (distributor.numReceives_)
206 , totalReceiveLength_ (distributor.totalReceiveLength_)
207 , lengthsFrom_ (distributor.lengthsFrom_)
208 , procsFrom_ (distributor.procsFrom_)
209 , startsFrom_ (distributor.startsFrom_)
210 , indicesFrom_ (distributor.indicesFrom_)
211 , reverseDistributor_ (distributor.reverseDistributor_)
212 , lastRoundBytesSend_ (distributor.lastRoundBytesSend_)
213 , lastRoundBytesRecv_ (distributor.lastRoundBytesRecv_)
214 , useDistinctTags_ (distributor.useDistinctTags_)
216 using Teuchos::ParameterList;
220 TEUCHOS_ASSERT( ! out_.is_null () );
222 RCP<const ParameterList> rhsList = distributor.getParameterList ();
223 RCP<ParameterList> newList = rhsList.is_null () ? Teuchos::null :
224 Teuchos::parameterList (*rhsList);
227 #ifdef TPETRA_DISTRIBUTOR_TIMERS
229 #endif // TPETRA_DISTRIBUTOR_TIMERS
233 using Teuchos::ParameterList;
234 using Teuchos::parameterList;
237 std::swap (comm_, rhs.comm_);
238 std::swap (out_, rhs.out_);
239 std::swap (howInitialized_, rhs.howInitialized_);
240 std::swap (sendType_, rhs.sendType_);
241 std::swap (barrierBetween_, rhs.barrierBetween_);
242 std::swap (verbose_, rhs.verbose_);
243 std::swap (selfMessage_, rhs.selfMessage_);
244 std::swap (numSends_, rhs.numSends_);
245 std::swap (procsTo_, rhs.procsTo_);
246 std::swap (startsTo_, rhs.startsTo_);
247 std::swap (lengthsTo_, rhs.lengthsTo_);
248 std::swap (maxSendLength_, rhs.maxSendLength_);
249 std::swap (indicesTo_, rhs.indicesTo_);
250 std::swap (numReceives_, rhs.numReceives_);
251 std::swap (totalReceiveLength_, rhs.totalReceiveLength_);
252 std::swap (lengthsFrom_, rhs.lengthsFrom_);
253 std::swap (procsFrom_, rhs.procsFrom_);
254 std::swap (startsFrom_, rhs.startsFrom_);
255 std::swap (indicesFrom_, rhs.indicesFrom_);
256 std::swap (reverseDistributor_, rhs.reverseDistributor_);
257 std::swap (lastRoundBytesSend_, rhs.lastRoundBytesSend_);
258 std::swap (lastRoundBytesRecv_, rhs.lastRoundBytesRecv_);
259 std::swap (useDistinctTags_, rhs.useDistinctTags_);
263 RCP<ParameterList> lhsList = this->getNonconstParameterList ();
264 RCP<ParameterList> rhsList = rhs.getNonconstParameterList ();
265 if (lhsList.getRawPtr () == rhsList.getRawPtr () && ! rhsList.is_null ()) {
266 rhsList = parameterList (*rhsList);
268 if (! rhsList.is_null ()) {
269 this->setMyParamList (rhsList);
271 if (! lhsList.is_null ()) {
272 rhs.setMyParamList (lhsList);
283 using ::Tpetra::Details::Behavior;
284 using Teuchos::FancyOStream;
285 using Teuchos::getIntegralValue;
286 using Teuchos::includesVerbLevel;
287 using Teuchos::OSTab;
288 using Teuchos::ParameterList;
289 using Teuchos::parameterList;
293 const bool verboseDefault = Behavior::verbose (
"Distributor") ||
294 Behavior::verbose (
"Tpetra::Distributor");
296 if (plist.is_null ()) {
297 verbose_ = verboseDefault;
301 plist->validateParametersAndSetDefaults (*validParams);
303 const bool barrierBetween =
304 plist->get<
bool> (
"Barrier between receives and sends");
306 getIntegralValue<Details::EDistributorSendType> (*plist,
"Send type");
307 const bool useDistinctTags = plist->get<
bool> (
"Use distinct tags");
308 const bool debug = plist->get<
bool> (
"Debug");
313 const bool enable_cuda_rdma =
314 plist->get<
bool> (
"Enable MPI CUDA RDMA support");
315 TEUCHOS_TEST_FOR_EXCEPTION
316 (! enable_cuda_rdma, std::invalid_argument,
"Tpetra::Distributor::"
317 "setParameterList: " <<
"You specified \"Enable MPI CUDA RDMA "
318 "support\" = false. This is no longer valid. You don't need to "
319 "specify this option any more; Tpetra assumes it is always true. "
320 "This is a very light assumption on the MPI implementation, and in "
321 "fact does not actually involve hardware or system RDMA support. "
322 "Tpetra just assumes that the MPI implementation can tell whether a "
323 "pointer points to host memory or CUDA device memory.");
330 TEUCHOS_TEST_FOR_EXCEPTION
331 (! barrierBetween && sendType == Details::DISTRIBUTOR_RSEND,
332 std::invalid_argument,
"Tpetra::Distributor::setParameterList: " << endl
333 <<
"You specified \"Send type\"=\"Rsend\", but turned off the barrier "
334 "between receives and sends." << endl <<
"This is invalid; you must "
335 "include the barrier if you use ready sends." << endl <<
"Ready sends "
336 "require that their corresponding receives have already been posted, "
337 "and the only way to guarantee that in general is with a barrier.");
340 sendType_ = sendType;
341 barrierBetween_ = barrierBetween;
342 useDistinctTags_ = useDistinctTags;
343 verbose_ = debug || verboseDefault;
347 this->setMyParamList (plist);
351 Teuchos::RCP<const Teuchos::ParameterList>
354 using Teuchos::Array;
355 using Teuchos::ParameterList;
356 using Teuchos::parameterList;
358 using Teuchos::setStringToIntegralParameter;
360 const bool barrierBetween = barrierBetween_default;
361 const bool useDistinctTags = useDistinctTags_default;
362 const bool debug = tpetraDistributorDebugDefault;
365 const std::string defaultSendType (
"Send");
366 Array<Details::EDistributorSendType> sendTypeEnums;
367 sendTypeEnums.push_back (Details::DISTRIBUTOR_ISEND);
368 sendTypeEnums.push_back (Details::DISTRIBUTOR_RSEND);
369 sendTypeEnums.push_back (Details::DISTRIBUTOR_SEND);
370 sendTypeEnums.push_back (Details::DISTRIBUTOR_SSEND);
372 RCP<ParameterList> plist = parameterList (
"Tpetra::Distributor");
373 plist->set (
"Barrier between receives and sends", barrierBetween,
374 "Whether to execute a barrier between receives and sends in do"
375 "[Reverse]Posts(). Required for correctness when \"Send type\""
376 "=\"Rsend\", otherwise correct but not recommended.");
377 setStringToIntegralParameter<Details::EDistributorSendType> (
"Send type",
378 defaultSendType,
"When using MPI, the variant of send to use in "
379 "do[Reverse]Posts()", sendTypes(), sendTypeEnums(), plist.getRawPtr());
380 plist->set (
"Use distinct tags", useDistinctTags,
"Whether to use distinct "
381 "MPI message tags for different code paths. Highly recommended"
382 " to avoid message collisions.");
383 plist->set (
"Debug", debug,
"Whether to print copious debugging output on "
385 plist->set (
"Timer Label",
"",
"Label for Time Monitor output");
386 plist->set (
"Enable MPI CUDA RDMA support",
true,
"Assume that MPI can "
387 "tell whether a pointer points to host memory or CUDA device "
388 "memory. You don't need to specify this option any more; "
389 "Tpetra assumes it is always true. This is a very light "
390 "assumption on the MPI implementation, and in fact does not "
391 "actually involve hardware or system RDMA support.");
399 Teuchos::setupVerboseObjectSublist (&*plist);
400 return Teuchos::rcp_const_cast<
const ParameterList> (plist);
405 {
return totalReceiveLength_; }
408 {
return numReceives_; }
411 {
return selfMessage_; }
414 {
return numSends_; }
417 {
return maxSendLength_; }
420 {
return procsFrom_; }
423 {
return lengthsFrom_; }
429 {
return lengthsTo_; }
431 Teuchos::RCP<Distributor>
433 if (reverseDistributor_.is_null ()) {
434 createReverseDistributor ();
436 TEUCHOS_TEST_FOR_EXCEPTION
437 (reverseDistributor_.is_null (), std::logic_error,
"The reverse "
438 "Distributor is null after createReverseDistributor returned. "
439 "Please report this bug to the Tpetra developers.");
440 return reverseDistributor_;
445 Distributor::createReverseDistributor()
const
447 reverseDistributor_ = Teuchos::rcp (
new Distributor (comm_, out_));
448 reverseDistributor_->howInitialized_ = Details::DISTRIBUTOR_INITIALIZED_BY_REVERSE;
449 reverseDistributor_->sendType_ = sendType_;
450 reverseDistributor_->barrierBetween_ = barrierBetween_;
451 reverseDistributor_->verbose_ = verbose_;
456 size_t totalSendLength =
457 std::accumulate (lengthsTo_.begin(), lengthsTo_.end(), 0);
462 size_t maxReceiveLength = 0;
463 const int myProcID = comm_->getRank();
464 for (
size_t i=0; i < numReceives_; ++i) {
465 if (procsFrom_[i] != myProcID) {
467 if (lengthsFrom_[i] > maxReceiveLength) {
468 maxReceiveLength = lengthsFrom_[i];
477 reverseDistributor_->selfMessage_ = selfMessage_;
478 reverseDistributor_->numSends_ = numReceives_;
479 reverseDistributor_->procsTo_ = procsFrom_;
480 reverseDistributor_->startsTo_ = startsFrom_;
481 reverseDistributor_->lengthsTo_ = lengthsFrom_;
482 reverseDistributor_->maxSendLength_ = maxReceiveLength;
483 reverseDistributor_->indicesTo_ = indicesFrom_;
484 reverseDistributor_->numReceives_ = numSends_;
485 reverseDistributor_->totalReceiveLength_ = totalSendLength;
486 reverseDistributor_->lengthsFrom_ = lengthsTo_;
487 reverseDistributor_->procsFrom_ = procsTo_;
488 reverseDistributor_->startsFrom_ = startsTo_;
489 reverseDistributor_->indicesFrom_ = indicesTo_;
498 reverseDistributor_->lastRoundBytesSend_ = 0;
499 reverseDistributor_->lastRoundBytesRecv_ = 0;
501 reverseDistributor_->useDistinctTags_ = useDistinctTags_;
514 reverseDistributor_->reverseDistributor_ = Teuchos::null;
519 using Teuchos::Array;
520 using Teuchos::CommRequest;
521 using Teuchos::FancyOStream;
522 using Teuchos::includesVerbLevel;
523 using Teuchos::is_null;
524 using Teuchos::OSTab;
526 using Teuchos::waitAll;
529 Teuchos::OSTab tab (out_);
531 #ifdef TPETRA_DISTRIBUTOR_TIMERS
532 Teuchos::TimeMonitor timeMon (*timer_doWaits_);
533 #endif // TPETRA_DISTRIBUTOR_TIMERS
535 const int myRank = comm_->getRank ();
538 std::ostringstream os;
539 os << myRank <<
": doWaits: # reqs = "
540 << requests_.size () << endl;
544 if (requests_.size() > 0) {
545 waitAll (*comm_, requests_());
547 #ifdef HAVE_TEUCHOS_DEBUG
549 for (Array<RCP<CommRequest<int> > >::const_iterator it = requests_.begin();
550 it != requests_.end(); ++it)
552 TEUCHOS_TEST_FOR_EXCEPTION( ! is_null (*it), std::runtime_error,
553 Teuchos::typeName(*
this) <<
"::doWaits(): Communication requests "
554 "should all be null aftr calling Teuchos::waitAll() on them, but "
555 "at least one request is not null.");
557 #endif // HAVE_TEUCHOS_DEBUG
560 requests_.resize (0);
563 #ifdef HAVE_TEUCHOS_DEBUG
565 const int localSizeNonzero = (requests_.size () != 0) ? 1 : 0;
566 int globalSizeNonzero = 0;
567 Teuchos::reduceAll<int, int> (*comm_, Teuchos::REDUCE_MAX,
569 Teuchos::outArg (globalSizeNonzero));
570 TEUCHOS_TEST_FOR_EXCEPTION(
571 globalSizeNonzero != 0, std::runtime_error,
572 "Tpetra::Distributor::doWaits: After waitAll, at least one process has "
573 "a nonzero number of outstanding posts. There should be none at this "
574 "point. Please report this bug to the Tpetra developers.");
576 #endif // HAVE_TEUCHOS_DEBUG
579 std::ostringstream os;
580 os << myRank <<
": doWaits done" << endl;
587 if (! reverseDistributor_.is_null()) {
588 reverseDistributor_->doWaits();
593 std::ostringstream out;
595 out <<
"\"Tpetra::Distributor\": {";
596 const std::string label = this->getObjectLabel ();
598 out <<
"Label: " << label <<
", ";
600 out <<
"How initialized: "
604 << DistributorSendTypeEnumToString (sendType_)
605 <<
", Barrier between receives and sends: "
606 << (barrierBetween_ ?
"true" :
"false")
607 <<
", Use distinct tags: "
608 << (useDistinctTags_ ?
"true" :
"false")
609 <<
", Debug: " << (verbose_ ?
"true" :
"false")
616 localDescribeToString (
const Teuchos::EVerbosityLevel vl)
const
618 using Teuchos::toString;
619 using Teuchos::VERB_HIGH;
620 using Teuchos::VERB_EXTREME;
624 if (vl <= Teuchos::VERB_LOW || comm_.is_null ()) {
625 return std::string ();
628 auto outStringP = Teuchos::rcp (
new std::ostringstream ());
629 auto outp = Teuchos::getFancyOStream (outStringP);
630 Teuchos::FancyOStream& out = *outp;
632 const int myRank = comm_->getRank ();
633 const int numProcs = comm_->getSize ();
634 out <<
"Process " << myRank <<
" of " << numProcs <<
":" << endl;
635 Teuchos::OSTab tab1 (out);
639 if (vl == VERB_HIGH || vl == VERB_EXTREME) {
640 out <<
"procsTo: " << toString (procsTo_) << endl;
641 out <<
"lengthsTo: " << toString (lengthsTo_) << endl;
644 if (vl == VERB_EXTREME) {
645 out <<
"startsTo: " << toString (startsTo_) << endl;
646 out <<
"indicesTo: " << toString (indicesTo_) << endl;
648 if (vl == VERB_HIGH || vl == VERB_EXTREME) {
651 out <<
"lengthsFrom: " << toString (lengthsFrom_) << endl;
652 out <<
"startsFrom: " << toString (startsFrom_) << endl;
653 out <<
"procsFrom: " << toString (procsFrom_) << endl;
657 return outStringP->str ();
663 const Teuchos::EVerbosityLevel verbLevel)
const
666 using Teuchos::VERB_DEFAULT;
667 using Teuchos::VERB_NONE;
668 using Teuchos::VERB_LOW;
669 using Teuchos::VERB_MEDIUM;
670 using Teuchos::VERB_HIGH;
671 using Teuchos::VERB_EXTREME;
672 const Teuchos::EVerbosityLevel vl =
673 (verbLevel == VERB_DEFAULT) ? VERB_LOW : verbLevel;
675 if (vl == VERB_NONE) {
683 if (comm_.is_null ()) {
686 const int myRank = comm_->getRank ();
687 const int numProcs = comm_->getSize ();
696 Teuchos::RCP<Teuchos::OSTab> tab0, tab1;
702 tab0 = Teuchos::rcp (
new Teuchos::OSTab (out));
705 out <<
"\"Tpetra::Distributor\":" << endl;
706 tab1 = Teuchos::rcp (
new Teuchos::OSTab (out));
708 const std::string label = this->getObjectLabel ();
710 out <<
"Label: " << label << endl;
712 out <<
"Number of processes: " << numProcs << endl
713 <<
"How initialized: "
717 out <<
"Parameters: " << endl;
718 Teuchos::OSTab tab2 (out);
719 out <<
"\"Send type\": "
720 << DistributorSendTypeEnumToString (sendType_) << endl
721 <<
"\"Barrier between receives and sends\": "
722 << (barrierBetween_ ?
"true" :
"false") << endl
723 <<
"\"Use distinct tags\": "
724 << (useDistinctTags_ ?
"true" :
"false") << endl
725 <<
"\"Debug\": " << (verbose_ ?
"true" :
"false") << endl;
731 const std::string lclStr = this->localDescribeToString (vl);
735 out <<
"Reverse Distributor:";
736 if (reverseDistributor_.is_null ()) {
737 out <<
" null" << endl;
741 reverseDistributor_->describe (out, vl);
749 using Teuchos::Array;
750 using Teuchos::ArrayRCP;
752 using Teuchos::CommStatus;
753 using Teuchos::CommRequest;
754 using Teuchos::ireceive;
757 using Teuchos::REDUCE_SUM;
758 using Teuchos::receive;
759 using Teuchos::reduce;
760 using Teuchos::scatter;
762 using Teuchos::waitAll;
765 Teuchos::OSTab tab (out_);
766 const int myRank = comm_->getRank();
767 const int numProcs = comm_->getSize();
770 const int pathTag = 2;
771 const int tag = this->getTag (pathTag);
773 std::unique_ptr<std::string> prefix;
775 std::ostringstream os;
776 os <<
"Proc " << myRank <<
": computeReceives: ";
777 prefix = std::unique_ptr<std::string> (
new std::string (os.str ()));
778 os <<
"{selfMessage_: " << (selfMessage_ ?
"true" :
"false")
779 <<
", tag: " << tag <<
"}" << endl;
789 Array<int> toProcsFromMe (numProcs, 0);
790 #ifdef HAVE_TEUCHOS_DEBUG
791 bool counting_error =
false;
792 #endif // HAVE_TEUCHOS_DEBUG
793 for (
size_t i = 0; i < (numSends_ + (selfMessage_ ? 1 : 0)); ++i) {
794 #ifdef HAVE_TEUCHOS_DEBUG
795 if (toProcsFromMe[procsTo_[i]] != 0) {
796 counting_error =
true;
798 #endif // HAVE_TEUCHOS_DEBUG
799 toProcsFromMe[procsTo_[i]] = 1;
801 #ifdef HAVE_TEUCHOS_DEBUG
803 "Tpetra::Distributor::computeReceives: There was an error on at least "
804 "one process in counting the number of messages send by that process to "
805 "the other processs. Please report this bug to the Tpetra developers.",
807 #endif // HAVE_TEUCHOS_DEBUG
810 std::ostringstream os;
811 os << *prefix <<
"Reduce & scatter" << endl;
868 Array<int> numRecvsOnEachProc;
869 if (myRank == root) {
870 numRecvsOnEachProc.resize (numProcs);
872 int numReceivesAsInt = 0;
873 reduce<int, int> (toProcsFromMe.getRawPtr (),
874 numRecvsOnEachProc.getRawPtr (),
875 numProcs, REDUCE_SUM, root, *comm_);
876 scatter<int, int> (numRecvsOnEachProc.getRawPtr (), 1,
877 &numReceivesAsInt, 1, root, *comm_);
878 numReceives_ =
static_cast<size_t> (numReceivesAsInt);
884 lengthsFrom_.assign (numReceives_, 0);
885 procsFrom_.assign (numReceives_, 0);
901 const size_t actualNumReceives = numReceives_ - (selfMessage_ ? 1 : 0);
907 Array<RCP<CommRequest<int> > > requests (actualNumReceives);
908 Array<ArrayRCP<size_t> > lengthsFromBuffers (actualNumReceives);
909 Array<RCP<CommStatus<int> > > statuses (actualNumReceives);
914 const int anySourceProc = MPI_ANY_SOURCE;
916 const int anySourceProc = -1;
920 std::ostringstream os;
921 os << *prefix <<
"Post " << actualNumReceives <<
" irecv"
922 << (actualNumReceives != size_t (1) ?
"s" :
"") << endl;
927 for (
size_t i = 0; i < actualNumReceives; ++i) {
932 lengthsFromBuffers[i].resize (1);
933 lengthsFromBuffers[i][0] = as<size_t> (0);
934 requests[i] = ireceive<int, size_t> (lengthsFromBuffers[i], anySourceProc,
937 std::ostringstream os;
938 os << *prefix <<
"Posted any-proc irecv w/ tag " << tag << endl;
944 std::ostringstream os;
945 os << *prefix <<
"Post " << numSends_ <<
" send"
946 << (numSends_ != size_t (1) ?
"s" :
"") << endl;
957 for (
size_t i = 0; i < numSends_ + (selfMessage_ ? 1 : 0); ++i) {
958 if (procsTo_[i] != myRank) {
962 const size_t*
const lengthsTo_i = &lengthsTo_[i];
963 send<int, size_t> (lengthsTo_i, 1, as<int> (procsTo_[i]), tag, *comm_);
965 std::ostringstream os;
966 os << *prefix <<
"Posted send to Proc " << procsTo_[i] <<
" w/ tag "
978 lengthsFrom_[numReceives_-1] = lengthsTo_[i];
979 procsFrom_[numReceives_-1] = myRank;
984 std::ostringstream os;
985 os << myRank <<
": computeReceives: waitAll on "
986 << requests.size () <<
" requests" << endl;
995 waitAll (*comm_, requests (), statuses ());
996 for (
size_t i = 0; i < actualNumReceives; ++i) {
997 lengthsFrom_[i] = *lengthsFromBuffers[i];
998 procsFrom_[i] = statuses[i]->getSourceRank ();
1004 sort2 (procsFrom_.begin(), procsFrom_.end(), lengthsFrom_.begin());
1007 totalReceiveLength_ =
1008 std::accumulate (lengthsFrom_.begin (), lengthsFrom_.end (), 0);
1009 indicesFrom_.clear ();
1015 indicesFrom_.reserve (totalReceiveLength_);
1016 for (
size_t i = 0; i < totalReceiveLength_; ++i) {
1017 indicesFrom_.push_back(i);
1021 startsFrom_.clear ();
1022 startsFrom_.reserve (numReceives_);
1023 for (
size_t i = 0, j = 0; i < numReceives_; ++i) {
1024 startsFrom_.push_back(j);
1025 j += lengthsFrom_[i];
1033 std::ostringstream os;
1034 os << *prefix <<
"Done!" << endl;
1043 using Teuchos::outArg;
1044 using Teuchos::REDUCE_MAX;
1045 using Teuchos::reduceAll;
1047 const char rawPrefix[] =
"Tpetra::Distributor::createFromSends: ";
1049 Teuchos::OSTab tab (out_);
1050 const size_t numExports = exportProcIDs.size();
1051 const int myProcID = comm_->getRank();
1052 const int numProcs = comm_->getSize();
1054 std::unique_ptr<std::string> prefix;
1056 std::ostringstream os;
1057 os <<
"Proc " << myProcID <<
": " << rawPrefix <<
": ";
1058 prefix = std::unique_ptr<std::string> (
new std::string (os.str ()));
1059 os <<
"exportPIDs: " << exportProcIDs << endl;
1111 Teuchos::Array<size_t> starts (numProcs + 1, 0);
1114 size_t numActive = 0;
1115 int needSendBuff = 0;
1117 #ifdef HAVE_TPETRA_DEBUG
1119 #endif // HAVE_TPETRA_DEBUG
1120 for (
size_t i = 0; i < numExports; ++i) {
1121 const int exportID = exportProcIDs[i];
1122 if (exportID >= numProcs) {
1123 #ifdef HAVE_TPETRA_DEBUG
1125 #endif // HAVE_TPETRA_DEBUG
1128 else if (exportID >= 0) {
1142 if (needSendBuff == 0 && starts[exportID] > 1 &&
1143 exportID != exportProcIDs[i-1]) {
1150 #ifdef HAVE_TPETRA_DEBUG
1157 reduceAll<int, int> (*comm_, REDUCE_MAX, badID, outArg (gbl_badID));
1158 TEUCHOS_TEST_FOR_EXCEPTION(gbl_badID >= 0, std::runtime_error,
1159 Teuchos::typeName(*
this) <<
"::createFromSends: Proc " << gbl_badID
1160 <<
", perhaps among other processes, got a bad send process ID.");
1175 #endif // HAVE_TPETRA_DEBUG
1177 #if defined(HAVE_TPETRA_THROW_EFFICIENCY_WARNINGS) || defined(HAVE_TPETRA_PRINT_EFFICIENCY_WARNINGS)
1179 int global_needSendBuff;
1180 reduceAll<int, int> (*comm_, REDUCE_MAX, needSendBuff,
1181 outArg (global_needSendBuff));
1183 global_needSendBuff != 0, std::runtime_error,
1184 "::createFromSends: Grouping export IDs together by process rank often "
1185 "improves performance.");
1191 if (starts[myProcID] != 0) {
1192 selfMessage_ =
true;
1195 selfMessage_ =
false;
1198 #ifdef HAVE_TEUCHOS_DEBUG
1199 bool index_neq_numActive =
false;
1200 bool send_neq_numSends =
false;
1202 if (! needSendBuff) {
1207 for (
int i = 0; i < numProcs; ++i) {
1215 indicesTo_.resize(0);
1218 procsTo_.assign(numSends_,0);
1219 startsTo_.assign(numSends_,0);
1220 lengthsTo_.assign(numSends_,0);
1227 size_t index = 0, procIndex = 0;
1228 for (
size_t i = 0; i < numSends_; ++i) {
1229 while (exportProcIDs[procIndex] < 0) {
1232 startsTo_[i] = procIndex;
1233 int procID = exportProcIDs[procIndex];
1234 procsTo_[i] = procID;
1235 index += starts[procID];
1236 procIndex += starts[procID];
1238 #ifdef HAVE_TEUCHOS_DEBUG
1239 if (index != numActive) {
1240 index_neq_numActive =
true;
1246 if (numSends_ > 0) {
1247 sort2(procsTo_.begin(), procsTo_.end(), startsTo_.begin());
1251 for (
size_t i = 0; i < numSends_; ++i) {
1252 int procID = procsTo_[i];
1253 lengthsTo_[i] = starts[procID];
1254 if ((procID != myProcID) && (lengthsTo_[i] > maxSendLength_)) {
1255 maxSendLength_ = lengthsTo_[i];
1266 if (starts[0] == 0 ) {
1272 for (Teuchos::Array<size_t>::iterator i=starts.begin()+1,
1274 i != starts.end(); ++i)
1276 if (*i != 0) ++numSends_;
1282 for (Teuchos::Array<size_t>::reverse_iterator ip1=starts.rbegin(),
1283 i=starts.rbegin()+1;
1284 i != starts.rend(); ++i)
1293 indicesTo_.resize(numActive);
1295 for (
size_t i = 0; i < numExports; ++i) {
1296 if (exportProcIDs[i] >= 0) {
1298 indicesTo_[starts[exportProcIDs[i]]] = i;
1300 ++starts[exportProcIDs[i]];
1312 for (
int proc = numProcs-1; proc != 0; --proc) {
1313 starts[proc] = starts[proc-1];
1316 starts[numProcs] = numActive;
1323 procsTo_.resize(numSends_);
1324 startsTo_.resize(numSends_);
1325 lengthsTo_.resize(numSends_);
1332 for (
int proc = 0; proc < numProcs; ++proc ) {
1333 if (starts[proc+1] != starts[proc]) {
1334 lengthsTo_[snd] = starts[proc+1] - starts[proc];
1335 startsTo_[snd] = starts[proc];
1337 if ((proc != myProcID) && (lengthsTo_[snd] > maxSendLength_)) {
1338 maxSendLength_ = lengthsTo_[snd];
1340 procsTo_[snd] = proc;
1344 #ifdef HAVE_TEUCHOS_DEBUG
1345 if (snd != numSends_) {
1346 send_neq_numSends =
true;
1350 #ifdef HAVE_TEUCHOS_DEBUG
1352 "Tpetra::Distributor::createFromSends: logic error. Please notify the Tpetra team.",*comm_);
1354 "Tpetra::Distributor::createFromSends: logic error. Please notify the Tpetra team.",*comm_);
1357 if (selfMessage_) --numSends_;
1363 std::ostringstream os;
1364 os << *prefix <<
"Done!" << endl;
1370 howInitialized_ = Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS;
1372 return totalReceiveLength_;
1378 const Teuchos::ArrayView<const int>& remoteProcIDs)
1387 howInitialized_ = Tpetra::Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS_N_RECVS;
1390 int myProcID = comm_->getRank ();
1391 int numProcs = comm_->getSize();
1393 const size_t numExportIDs = exportProcIDs.size();
1394 Teuchos::Array<size_t> starts (numProcs + 1, 0);
1396 size_t numActive = 0;
1397 int needSendBuff = 0;
1399 for(
size_t i = 0; i < numExportIDs; i++ )
1401 if( needSendBuff==0 && i && (exportProcIDs[i] < exportProcIDs[i-1]) )
1403 if( exportProcIDs[i] >= 0 )
1405 ++starts[ exportProcIDs[i] ];
1410 selfMessage_ = ( starts[myProcID] != 0 ) ? 1 : 0;
1416 if (starts[0] == 0 ) {
1422 for (Teuchos::Array<size_t>::iterator i=starts.begin()+1,
1424 i != starts.end(); ++i)
1426 if (*i != 0) ++numSends_;
1432 for (Teuchos::Array<size_t>::reverse_iterator ip1=starts.rbegin(),
1433 i=starts.rbegin()+1;
1434 i != starts.rend(); ++i)
1443 indicesTo_.resize(numActive);
1445 for (
size_t i = 0; i < numExportIDs; ++i) {
1446 if (exportProcIDs[i] >= 0) {
1448 indicesTo_[starts[exportProcIDs[i]]] = i;
1450 ++starts[exportProcIDs[i]];
1453 for (
int proc = numProcs-1; proc != 0; --proc) {
1454 starts[proc] = starts[proc-1];
1457 starts[numProcs] = numActive;
1458 procsTo_.resize(numSends_);
1459 startsTo_.resize(numSends_);
1460 lengthsTo_.resize(numSends_);
1463 for (
int proc = 0; proc < numProcs; ++proc ) {
1464 if (starts[proc+1] != starts[proc]) {
1465 lengthsTo_[snd] = starts[proc+1] - starts[proc];
1466 startsTo_[snd] = starts[proc];
1468 if ((proc != myProcID) && (lengthsTo_[snd] > maxSendLength_)) {
1469 maxSendLength_ = lengthsTo_[snd];
1471 procsTo_[snd] = proc;
1481 for (
int i = 0; i < numProcs; ++i) {
1489 indicesTo_.resize(0);
1492 procsTo_.assign(numSends_,0);
1493 startsTo_.assign(numSends_,0);
1494 lengthsTo_.assign(numSends_,0);
1501 size_t index = 0, procIndex = 0;
1502 for (
size_t i = 0; i < numSends_; ++i) {
1503 while (exportProcIDs[procIndex] < 0) {
1506 startsTo_[i] = procIndex;
1507 int procID = exportProcIDs[procIndex];
1508 procsTo_[i] = procID;
1509 index += starts[procID];
1510 procIndex += starts[procID];
1515 if (numSends_ > 0) {
1516 sort2(procsTo_.begin(), procsTo_.end(), startsTo_.begin());
1520 for (
size_t i = 0; i < numSends_; ++i) {
1521 int procID = procsTo_[i];
1522 lengthsTo_[i] = starts[procID];
1523 if ((procID != myProcID) && (lengthsTo_[i] > maxSendLength_)) {
1524 maxSendLength_ = lengthsTo_[i];
1530 numSends_ -= selfMessage_;
1531 std::vector<int> recv_list;
1532 recv_list.reserve(numSends_);
1535 for(
int i=0; i<remoteProcIDs.size(); i++) {
1536 if(remoteProcIDs[i]>last_pid) {
1537 recv_list.push_back(remoteProcIDs[i]);
1538 last_pid = remoteProcIDs[i];
1540 else if (remoteProcIDs[i]<last_pid)
1541 throw std::runtime_error(
"Tpetra::Distributor:::createFromSendsAndRecvs expected RemotePIDs to be in sorted order");
1543 numReceives_ = recv_list.size();
1545 procsFrom_.assign(numReceives_,0);
1546 lengthsFrom_.assign(numReceives_,0);
1547 indicesFrom_.assign(numReceives_,0);
1548 startsFrom_.assign(numReceives_,0);
1550 for(
size_t i=0,j=0; i<numReceives_; ++i) {
1552 procsFrom_[i] = recv_list[i];
1554 for( ; j<(size_t)remoteProcIDs.size() &&
1555 remoteProcIDs[jlast]==remoteProcIDs[j] ; j++){;}
1556 lengthsFrom_[i] = j-jlast;
1558 totalReceiveLength_ = remoteProcIDs.size();
1559 indicesFrom_.clear ();
1565 indicesFrom_.reserve (totalReceiveLength_);
1566 for (
size_t i = 0; i < totalReceiveLength_; ++i) {
1567 indicesFrom_.push_back(i);
1570 numReceives_-=selfMessage_;
size_t getNumReceives() const
The number of processes from which we will receive data.
std::string description() const
Return a one-line description of this object.
EDistributorHowInitialized
Enum indicating how and whether a Distributor was initialized.
Teuchos::RCP< const Teuchos::ParameterList > getValidParameters() const
List of valid Distributor parameters.
void swap(Distributor &rhs)
Swap the contents of rhs with those of *this.
std::string DistributorSendTypeEnumToString(EDistributorSendType sendType)
Convert an EDistributorSendType enum value to a string.
Teuchos::ArrayView< const size_t > getLengthsFrom() const
Number of values this process will receive from each process.
Teuchos::ArrayView< const int > getProcsFrom() const
Ranks of the processes sending values to this process.
size_t createFromSends(const Teuchos::ArrayView< const int > &exportProcIDs)
Set up Distributor using list of process ranks to which this process will send.
void gathervPrint(std::ostream &out, const std::string &s, const Teuchos::Comm< int > &comm)
On Process 0 in the given communicator, print strings from each process in that communicator, in rank order.
Teuchos::ArrayView< const int > getProcsTo() const
Ranks of the processes to which this process will send values.
void createFromSendsAndRecvs(const Teuchos::ArrayView< const int > &exportProcIDs, const Teuchos::ArrayView< const int > &remoteProcIDs)
Set up Distributor using list of process ranks to which to send, and list of process ranks from which...
bool hasSelfMessage() const
Whether the calling process will send or receive messages to itself.
Sets up and executes a communication plan for a Tpetra DistObject.
size_t getTotalReceiveLength() const
Total number of values this process will receive from other processes.
void setParameterList(const Teuchos::RCP< Teuchos::ParameterList > &plist)
Set Distributor parameters.
#define TPETRA_EFFICIENCY_WARNING(throw_exception_test, Exception, msg)
Print or throw an efficency warning.
Teuchos::ArrayView< const size_t > getLengthsTo() const
Number of values this process will send to each process.
Teuchos::Array< std::string > distributorSendTypes()
Valid values for Distributor's "Send type" parameter.
void sort2(const IT1 &first1, const IT1 &last1, const IT2 &first2)
Sort the first array, and apply the resulting permutation to the second array.
std::string DistributorHowInitializedEnumToString(EDistributorHowInitialized how)
Convert an EDistributorHowInitialized enum value to a string.
size_t getNumSends() const
The number of processes to which we will send data.
void describe(Teuchos::FancyOStream &out, const Teuchos::EVerbosityLevel verbLevel=Teuchos::Describable::verbLevel_default) const
Describe this object in a human-readable way to the given output stream.
size_t getMaxSendLength() const
Maximum number of values this process will send to another single process.
#define SHARED_TEST_FOR_EXCEPTION(throw_exception_test, Exception, msg, comm)
Test for exception, with reduction over the given communicator.
Teuchos::RCP< Distributor > getReverse() const
A reverse communication plan Distributor.
EDistributorSendType
The type of MPI send that Distributor should use.
Distributor(const Teuchos::RCP< const Teuchos::Comm< int > > &comm)
Construct using the specified communicator and default parameters.