41 #ifndef TPETRA_DETAILS_DISTRIBUTOR_ACTOR_HPP
42 #define TPETRA_DETAILS_DISTRIBUTOR_ACTOR_HPP
47 #include "Teuchos_Array.hpp"
48 #include "Teuchos_Comm.hpp"
50 #include "Teuchos_RCP.hpp"
51 #include "Teuchos_Time.hpp"
53 #include "Kokkos_TeuchosCommAdapters.hpp"
55 #ifdef HAVE_TPETRA_MPI
62 template <
class View1,
class View2>
63 constexpr
bool areKokkosViews = Kokkos::is_view<View1>::value && Kokkos::is_view<View2>::value;
65 class DistributorActor {
66 static constexpr
int DEFAULT_MPI_TAG = 1;
70 DistributorActor(
const DistributorActor& otherActor);
72 template <
class ExpView,
class ImpView>
73 void doPostsAndWaits(
const DistributorPlan& plan,
74 const ExpView &exports,
76 const ImpView &imports);
78 template <
class ExpView,
class ImpView>
79 void doPostsAndWaits(
const DistributorPlan& plan,
80 const ExpView &exports,
81 const Teuchos::ArrayView<const size_t>& numExportPacketsPerLID,
82 const ImpView &imports,
83 const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID);
85 template <
class ExpView,
class ImpView>
86 void doPosts(
const DistributorPlan& plan,
87 const ExpView& exports,
89 const ImpView& imports);
91 template <
class ExpView,
class ImpView>
92 void doPosts(
const DistributorPlan& plan,
93 const ExpView &exports,
94 const Teuchos::ArrayView<const size_t>& numExportPacketsPerLID,
95 const ImpView &imports,
96 const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID);
98 void doWaits(
const DistributorPlan& plan);
100 bool isReady()
const;
104 #ifdef HAVE_TPETRA_MPI
105 template <
class ExpView,
class ImpView>
106 void doPostsAllToAll(
const DistributorPlan &plan,
const ExpView &exports,
107 size_t numPackets,
const ImpView &imports);
109 template <
class ExpView,
class ImpView>
110 void doPostsAllToAll(
111 const DistributorPlan &plan,
const ExpView &exports,
112 const Teuchos::ArrayView<const size_t> &numExportPacketsPerLID,
113 const ImpView &imports,
114 const Teuchos::ArrayView<const size_t> &numImportPacketsPerLID);
116 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
117 template <
class ExpView,
class ImpView>
118 void doPostsNbrAllToAllV(
const DistributorPlan &plan,
const ExpView &exports,
119 size_t numPackets,
const ImpView &imports);
121 template <
class ExpView,
class ImpView>
122 void doPostsNbrAllToAllV(
123 const DistributorPlan &plan,
const ExpView &exports,
124 const Teuchos::ArrayView<const size_t> &numExportPacketsPerLID,
125 const ImpView &imports,
126 const Teuchos::ArrayView<const size_t> &numImportPacketsPerLID);
127 #endif // HAVE_TPETRACORE_MPI_ADVANCE
128 #endif // HAVE_TPETRA_CORE
132 Teuchos::Array<Teuchos::RCP<Teuchos::CommRequest<int>>> requests_;
134 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
135 Teuchos::RCP<Teuchos::Time> timer_doPosts3KV_;
136 Teuchos::RCP<Teuchos::Time> timer_doPosts4KV_;
137 Teuchos::RCP<Teuchos::Time> timer_doWaits_;
138 Teuchos::RCP<Teuchos::Time> timer_doPosts3KV_recvs_;
139 Teuchos::RCP<Teuchos::Time> timer_doPosts4KV_recvs_;
140 Teuchos::RCP<Teuchos::Time> timer_doPosts3KV_barrier_;
141 Teuchos::RCP<Teuchos::Time> timer_doPosts4KV_barrier_;
142 Teuchos::RCP<Teuchos::Time> timer_doPosts3KV_sends_;
143 Teuchos::RCP<Teuchos::Time> timer_doPosts4KV_sends_;
144 Teuchos::RCP<Teuchos::Time> timer_doPosts3KV_sends_slow_;
145 Teuchos::RCP<Teuchos::Time> timer_doPosts4KV_sends_slow_;
146 Teuchos::RCP<Teuchos::Time> timer_doPosts3KV_sends_fast_;
147 Teuchos::RCP<Teuchos::Time> timer_doPosts4KV_sends_fast_;
151 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
154 template <
class ExpView,
class ImpView>
155 void DistributorActor::doPostsAndWaits(
const DistributorPlan& plan,
156 const ExpView& exports,
158 const ImpView& imports)
160 static_assert(areKokkosViews<ExpView, ImpView>,
161 "Data arrays for DistributorActor::doPostsAndWaits must be Kokkos::Views");
162 doPosts(plan, exports, numPackets, imports);
166 template <
class ExpView,
class ImpView>
167 void DistributorActor::doPostsAndWaits(
const DistributorPlan& plan,
168 const ExpView& exports,
169 const Teuchos::ArrayView<const size_t>& numExportPacketsPerLID,
170 const ImpView& imports,
171 const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID)
173 static_assert(areKokkosViews<ExpView, ImpView>,
174 "Data arrays for DistributorActor::doPostsAndWaits must be Kokkos::Views");
175 doPosts(plan, exports, numExportPacketsPerLID, imports, numImportPacketsPerLID);
179 template <
typename ViewType>
180 using HostAccessibility = Kokkos::SpaceAccessibility<Kokkos::DefaultHostExecutionSpace, typename ViewType::memory_space>;
182 template <
typename DstViewType,
typename SrcViewType>
183 using enableIfHostAccessible = std::enable_if_t<HostAccessibility<DstViewType>::accessible &&
184 HostAccessibility<SrcViewType>::accessible>;
186 template <
typename DstViewType,
typename SrcViewType>
187 using enableIfNotHostAccessible = std::enable_if_t<!HostAccessibility<DstViewType>::accessible ||
188 !HostAccessibility<SrcViewType>::accessible>;
190 template <
typename DstViewType,
typename SrcViewType>
191 enableIfHostAccessible<DstViewType, SrcViewType>
192 packOffset(
const DstViewType& dst,
193 const SrcViewType& src,
194 const size_t dst_offset,
195 const size_t src_offset,
198 memcpy(dst.data()+dst_offset, src.data()+src_offset, size*
sizeof(
typename DstViewType::value_type));
201 template <
typename DstViewType,
typename SrcViewType>
202 enableIfNotHostAccessible<DstViewType, SrcViewType>
203 packOffset(
const DstViewType& dst,
204 const SrcViewType& src,
205 const size_t dst_offset,
206 const size_t src_offset,
209 Kokkos::Compat::deep_copy_offset(dst, src, dst_offset, src_offset, size);
213 #ifdef HAVE_TPETRA_MPI
214 template <
class ExpView,
class ImpView>
215 void DistributorActor::doPostsAllToAll(
const DistributorPlan &plan,
216 const ExpView &exports,
218 const ImpView &imports) {
219 using size_type = Teuchos::Array<size_t>::size_type;
221 TEUCHOS_TEST_FOR_EXCEPTION(
222 !plan.getIndicesTo().is_null(), std::runtime_error,
223 "Send Type=\"Alltoall\" only works for fast-path communication.");
225 auto comm = plan.getComm();
226 const int myRank = comm->getRank();
227 std::vector<int> sendcounts(comm->getSize(), 0);
228 std::vector<int> sdispls(comm->getSize(), 0);
229 std::vector<int> recvcounts(comm->getSize(), 0);
230 std::vector<int> rdispls(comm->getSize(), 0);
232 size_t numBlocks = plan.getNumSends() + plan.hasSelfMessage();
233 for (
size_t p = 0; p < numBlocks; ++p) {
234 sdispls[plan.getProcsTo()[p]] = plan.getStartsTo()[p] * numPackets;
235 size_t sendcount = plan.getLengthsTo()[p] * numPackets;
237 TEUCHOS_TEST_FOR_EXCEPTION(sendcount >
size_t(INT_MAX), std::logic_error,
238 "Tpetra::Distributor::doPosts(3 args, Kokkos): "
239 "Send count for block "
240 << p <<
" (" << sendcount
242 "to be represented as int.");
243 sendcounts[plan.getProcsTo()[p]] =
static_cast<int>(sendcount);
246 const size_type actualNumReceives =
247 Teuchos::as<size_type>(plan.getNumReceives()) +
248 Teuchos::as<size_type>(plan.hasSelfMessage() ? 1 : 0);
249 size_t curBufferOffset = 0;
250 for (size_type i = 0; i < actualNumReceives; ++i) {
251 const size_t curBufLen = plan.getLengthsFrom()[i] * numPackets;
252 TEUCHOS_TEST_FOR_EXCEPTION(
253 curBufferOffset + curBufLen > static_cast<size_t>(imports.size()),
255 "Tpetra::Distributor::doPosts(3 args, Kokkos): "
256 "Exceeded size of 'imports' array in packing loop on Process "
257 << myRank <<
". imports.size() = " << imports.size()
260 << curBufferOffset <<
") + curBufLen(" << curBufLen <<
").");
261 rdispls[plan.getProcsFrom()[i]] = curBufferOffset;
263 TEUCHOS_TEST_FOR_EXCEPTION(curBufLen >
size_t(INT_MAX), std::logic_error,
264 "Tpetra::Distributor::doPosts(3 args, Kokkos): "
265 "Recv count for receive "
266 << i <<
" (" << curBufLen
268 "to be represented as int.");
269 recvcounts[plan.getProcsFrom()[i]] =
static_cast<int>(curBufLen);
270 curBufferOffset += curBufLen;
273 using T =
typename ExpView::non_const_value_type;
274 MPI_Datatype rawType = ::Tpetra::Details::MpiTypeTraits<T>::getType(T());
276 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
277 if (Details::DISTRIBUTOR_MPIADVANCE_ALLTOALL == plan.getSendType()) {
278 MPIX_Comm *mpixComm = *plan.getMPIXComm();
279 TEUCHOS_TEST_FOR_EXCEPTION(
280 !mpixComm, std::runtime_error,
281 "plan's MPIX_Comm null in doPostsAllToAll, but "
282 "DISTRIBUTOR_MPIADVANCE_ALLTOALL set: plan.howInitialized()="
285 const int err = MPIX_Alltoallv(
286 exports.data(), sendcounts.data(), sdispls.data(), rawType,
287 imports.data(), recvcounts.data(), rdispls.data(), rawType, mpixComm);
289 TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error,
290 "MPIX_Alltoallv failed with error \""
291 << Teuchos::mpiErrorCodeToString(err)
297 Teuchos::RCP<const Teuchos::MpiComm<int>> mpiComm =
298 Teuchos::rcp_dynamic_cast<
const Teuchos::MpiComm<int>>(comm);
299 Teuchos::RCP<const Teuchos::OpaqueWrapper<MPI_Comm>> rawComm =
300 mpiComm->getRawMpiComm();
302 const int err = MPI_Alltoallv(
303 exports.data(), sendcounts.data(), sdispls.data(), rawType,
304 imports.data(), recvcounts.data(), rdispls.data(), rawType, (*rawComm)());
306 TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error,
307 "MPI_Alltoallv failed with error \""
308 << Teuchos::mpiErrorCodeToString(err)
314 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
315 template <
class ExpView,
class ImpView>
316 void DistributorActor::doPostsNbrAllToAllV(
const DistributorPlan &plan,
317 const ExpView &exports,
319 const ImpView &imports) {
320 TEUCHOS_TEST_FOR_EXCEPTION(
321 !plan.getIndicesTo().is_null(), std::runtime_error,
322 "Send Type=\"Alltoall\" only works for fast-path communication.");
324 const int myRank = plan.getComm()->getRank();
325 MPIX_Comm *mpixComm = *plan.getMPIXComm();
327 const size_t numSends = plan.getNumSends() + plan.hasSelfMessage();
328 const size_t numRecvs = plan.getNumReceives() + plan.hasSelfMessage();
329 std::vector<int> sendcounts(numSends, 0);
330 std::vector<int> sdispls(numSends, 0);
331 std::vector<int> recvcounts(numRecvs, 0);
332 std::vector<int> rdispls(numRecvs, 0);
334 for (
size_t p = 0; p < numSends; ++p) {
335 sdispls[p] = plan.getStartsTo()[p] * numPackets;
336 const size_t sendcount = plan.getLengthsTo()[p] * numPackets;
338 TEUCHOS_TEST_FOR_EXCEPTION(sendcount >
size_t(INT_MAX), std::logic_error,
339 "Tpetra::Distributor::doPosts(3 args, Kokkos): "
340 "Send count for block "
341 << p <<
" (" << sendcount
343 "to be represented as int.");
344 sendcounts[p] =
static_cast<int>(sendcount);
347 size_t curBufferOffset = 0;
348 for (
size_t i = 0; i < numRecvs; ++i) {
349 const size_t curBufLen = plan.getLengthsFrom()[i] * numPackets;
350 TEUCHOS_TEST_FOR_EXCEPTION(
351 curBufferOffset + curBufLen > static_cast<size_t>(imports.size()),
353 "Tpetra::Distributor::doPosts(3 args, Kokkos): "
354 "Exceeded size of 'imports' array in packing loop on Process "
355 << myRank <<
". imports.size() = " << imports.size()
358 << curBufferOffset <<
") + curBufLen(" << curBufLen <<
").");
359 rdispls[i] = curBufferOffset;
361 TEUCHOS_TEST_FOR_EXCEPTION(curBufLen >
size_t(INT_MAX), std::logic_error,
362 "Tpetra::Distributor::doPosts(3 args, Kokkos): "
363 "Recv count for receive "
364 << i <<
" (" << curBufLen
366 "to be represented as int.");
367 recvcounts[i] =
static_cast<int>(curBufLen);
368 curBufferOffset += curBufLen;
371 using T =
typename ExpView::non_const_value_type;
372 MPI_Datatype rawType = ::Tpetra::Details::MpiTypeTraits<T>::getType(T());
374 const int err = MPIX_Neighbor_alltoallv(
375 exports.data(), sendcounts.data(), sdispls.data(), rawType,
376 imports.data(), recvcounts.data(), rdispls.data(), rawType, mpixComm);
378 TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error,
379 "MPIX_Neighbor_alltoallv failed with error \""
380 << Teuchos::mpiErrorCodeToString(err)
383 #endif // HAVE_TPETRACORE_MPI_ADVANCE
384 #endif // HAVE_TPETRA_MPI
387 template <
class ExpView,
class ImpView>
388 void DistributorActor::doPosts(
const DistributorPlan& plan,
389 const ExpView& exports,
391 const ImpView& imports)
393 static_assert(areKokkosViews<ExpView, ImpView>,
394 "Data arrays for DistributorActor::doPosts must be Kokkos::Views");
395 using Teuchos::Array;
397 using Teuchos::FancyOStream;
398 using Teuchos::includesVerbLevel;
399 using Teuchos::ireceive;
400 using Teuchos::isend;
402 using Teuchos::TypeNameTraits;
403 using Teuchos::typeName;
405 using Kokkos::Compat::create_const_view;
406 using Kokkos::Compat::create_view;
407 using Kokkos::Compat::subview_offset;
408 using Kokkos::Compat::deep_copy_offset;
409 typedef Array<size_t>::size_type size_type;
410 typedef ExpView exports_view_type;
411 typedef ImpView imports_view_type;
413 #ifdef KOKKOS_ENABLE_CUDA
415 (! std::is_same<typename ExpView::memory_space, Kokkos::CudaUVMSpace>::value &&
416 ! std::is_same<typename ImpView::memory_space, Kokkos::CudaUVMSpace>::value,
417 "Please do not use Tpetra::Distributor with UVM allocations. "
418 "See Trilinos GitHub issue #1088.");
419 #endif // KOKKOS_ENABLE_CUDA
421 #ifdef KOKKOS_ENABLE_SYCL
423 (! std::is_same<typename ExpView::memory_space, Kokkos::Experimental::SYCLSharedUSMSpace>::value &&
424 ! std::is_same<typename ImpView::memory_space, Kokkos::Experimental::SYCLSharedUSMSpace>::value,
425 "Please do not use Tpetra::Distributor with SharedUSM allocations. "
426 "See Trilinos GitHub issue #1088 (corresponding to CUDA).");
427 #endif // KOKKOS_ENABLE_SYCL
429 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
430 Teuchos::TimeMonitor timeMon (*timer_doPosts3KV_);
431 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
433 const int myRank = plan.getComm()->getRank ();
439 #if defined(HAVE_TPETRA_MPI)
443 if (sendType == Details::DISTRIBUTOR_ALLTOALL) {
444 doPostsAllToAll(plan, exports,numPackets, imports);
447 #ifdef HAVE_TPETRACORE_MPI_ADVANCE
448 else if (sendType == Details::DISTRIBUTOR_MPIADVANCE_ALLTOALL) {
449 doPostsAllToAll(plan, exports,numPackets, imports);
451 }
else if (sendType == Details::DISTRIBUTOR_MPIADVANCE_NBRALLTOALLV) {
452 doPostsNbrAllToAllV(plan, exports,numPackets, imports);
455 #endif // defined(HAVE_TPETRACORE_MPI_ADVANCE)
458 #else // HAVE_TPETRA_MPI
459 if (plan.hasSelfMessage()) {
467 size_t selfReceiveOffset = 0;
468 deep_copy_offset(imports, exports, selfReceiveOffset,
469 plan.getStartsTo()[0]*numPackets,
470 plan.getLengthsTo()[0]*numPackets);
474 #endif // HAVE_TPETRA_MPI
476 size_t selfReceiveOffset = 0;
478 #ifdef HAVE_TPETRA_DEBUG
479 TEUCHOS_TEST_FOR_EXCEPTION
480 (requests_.size () != 0,
482 "Tpetra::Distributor::doPosts(3 args, Kokkos): Process "
483 << myRank <<
": requests_.size() = " << requests_.size () <<
" != 0.");
484 #endif // HAVE_TPETRA_DEBUG
499 const size_type actualNumReceives = as<size_type> (plan.getNumReceives()) +
500 as<size_type> (plan.hasSelfMessage() ? 1 : 0);
501 requests_.resize (0);
509 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
510 Teuchos::TimeMonitor timeMonRecvs (*timer_doPosts3KV_recvs_);
511 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
513 size_t curBufferOffset = 0;
514 for (size_type i = 0; i < actualNumReceives; ++i) {
515 const size_t curBufLen = plan.getLengthsFrom()[i] * numPackets;
516 if (plan.getProcsFrom()[i] != myRank) {
524 TEUCHOS_TEST_FOR_EXCEPTION(
525 curBufferOffset + curBufLen > static_cast<size_t> (imports.size ()),
526 std::logic_error,
"Tpetra::Distributor::doPosts(3 args, Kokkos): "
527 "Exceeded size of 'imports' array in packing loop on Process " <<
528 myRank <<
". imports.size() = " << imports.size () <<
" < "
529 "curBufferOffset(" << curBufferOffset <<
") + curBufLen(" <<
531 imports_view_type recvBuf =
532 subview_offset (imports, curBufferOffset, curBufLen);
533 requests_.push_back (ireceive<int> (recvBuf, plan.getProcsFrom()[i],
534 mpiTag_, *plan.getComm()));
537 selfReceiveOffset = curBufferOffset;
539 curBufferOffset += curBufLen;
543 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
544 Teuchos::TimeMonitor timeMonSends (*timer_doPosts3KV_sends_);
545 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
552 size_t numBlocks = plan.getNumSends() + plan.hasSelfMessage();
553 size_t procIndex = 0;
554 while ((procIndex < numBlocks) && (plan.getProcsTo()[procIndex] < myRank)) {
557 if (procIndex == numBlocks) {
562 size_t selfIndex = 0;
564 if (plan.getIndicesTo().is_null()) {
566 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
567 Teuchos::TimeMonitor timeMonSends2 (*timer_doPosts3KV_sends_fast_);
568 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
572 for (
size_t i = 0; i < numBlocks; ++i) {
573 size_t p = i + procIndex;
574 if (p > (numBlocks - 1)) {
578 if (plan.getProcsTo()[p] != myRank) {
579 exports_view_type tmpSend = subview_offset(
580 exports, plan.getStartsTo()[p]*numPackets, plan.getLengthsTo()[p]*numPackets);
582 if (sendType == Details::DISTRIBUTOR_ISEND) {
586 exports_view_type tmpSendBuf =
587 subview_offset (exports, plan.getStartsTo()[p] * numPackets,
588 plan.getLengthsTo()[p] * numPackets);
589 requests_.push_back (isend<int> (tmpSendBuf, plan.getProcsTo()[p],
590 mpiTag_, *plan.getComm()));
594 as<int> (tmpSend.size ()),
595 plan.getProcsTo()[p], mpiTag_, *plan.getComm());
603 if (plan.hasSelfMessage()) {
611 deep_copy_offset(imports, exports, selfReceiveOffset,
612 plan.getStartsTo()[selfNum]*numPackets,
613 plan.getLengthsTo()[selfNum]*numPackets);
619 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
620 Teuchos::TimeMonitor timeMonSends2 (*timer_doPosts3KV_sends_slow_);
621 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
623 typedef typename ExpView::non_const_value_type Packet;
624 typedef typename ExpView::array_layout Layout;
625 typedef typename ExpView::device_type Device;
626 typedef typename ExpView::memory_traits Mem;
636 #ifdef HAVE_TPETRA_DEBUG
637 if (sendType != Details::DISTRIBUTOR_SEND) {
638 if (plan.getComm()->getRank() == 0)
639 std::cout <<
"The requested Tpetra send type "
641 <<
" requires Distributor data to be ordered by"
642 <<
" the receiving processor rank. Since these"
643 <<
" data are not ordered, Tpetra will use Send"
644 <<
" instead." << std::endl;
648 Kokkos::View<Packet*,Layout,Device,Mem> sendArray (
"sendArray",
649 plan.getMaxSendLength() * numPackets);
651 for (
size_t i = 0; i < numBlocks; ++i) {
652 size_t p = i + procIndex;
653 if (p > (numBlocks - 1)) {
657 if (plan.getProcsTo()[p] != myRank) {
658 size_t sendArrayOffset = 0;
659 size_t j = plan.getStartsTo()[p];
660 for (
size_t k = 0; k < plan.getLengthsTo()[p]; ++k, ++j) {
661 packOffset(sendArray, exports, sendArrayOffset, plan.getIndicesTo()[j]*numPackets, numPackets);
662 sendArrayOffset += numPackets;
664 typename ExpView::execution_space().fence();
667 subview_offset(sendArray,
size_t(0), plan.getLengthsTo()[p]*numPackets);
670 as<int> (tmpSend.size ()),
671 plan.getProcsTo()[p], mpiTag_, *plan.getComm());
675 selfIndex = plan.getStartsTo()[p];
679 if (plan.hasSelfMessage()) {
680 for (
size_t k = 0; k < plan.getLengthsTo()[selfNum]; ++k) {
681 packOffset(imports, exports, selfReceiveOffset, plan.getIndicesTo()[selfIndex]*numPackets, numPackets);
683 selfReceiveOffset += numPackets;
690 #ifdef HAVE_TPETRA_MPI
691 template <
class ExpView,
class ImpView>
692 void DistributorActor::doPostsAllToAll(
693 const DistributorPlan &plan,
const ExpView &exports,
694 const Teuchos::ArrayView<const size_t> &numExportPacketsPerLID,
695 const ImpView &imports,
696 const Teuchos::ArrayView<const size_t> &numImportPacketsPerLID) {
697 TEUCHOS_TEST_FOR_EXCEPTION(
698 !plan.getIndicesTo().is_null(), std::runtime_error,
699 "Send Type=\"Alltoall\" only works for fast-path communication.");
701 using size_type = Teuchos::Array<size_t>::size_type;
703 auto comm = plan.getComm();
704 std::vector<int> sendcounts(comm->getSize(), 0);
705 std::vector<int> sdispls(comm->getSize(), 0);
706 std::vector<int> recvcounts(comm->getSize(), 0);
707 std::vector<int> rdispls(comm->getSize(), 0);
709 size_t curPKToffset = 0;
710 for (
size_t pp = 0; pp < plan.getNumSends(); ++pp) {
711 sdispls[plan.getProcsTo()[pp]] = curPKToffset;
712 size_t numPackets = 0;
713 for (
size_t j = plan.getStartsTo()[pp];
714 j < plan.getStartsTo()[pp] + plan.getLengthsTo()[pp]; ++j) {
715 numPackets += numExportPacketsPerLID[j];
718 TEUCHOS_TEST_FOR_EXCEPTION(numPackets >
size_t(INT_MAX), std::logic_error,
719 "Tpetra::Distributor::doPosts(4 args, Kokkos): "
720 "Send count for send "
721 << pp <<
" (" << numPackets
723 "to be represented as int.");
724 sendcounts[plan.getProcsTo()[pp]] =
static_cast<int>(numPackets);
725 curPKToffset += numPackets;
728 const size_type actualNumReceives =
729 Teuchos::as<size_type>(plan.getNumReceives()) +
730 Teuchos::as<size_type>(plan.hasSelfMessage() ? 1 : 0);
732 size_t curBufferOffset = 0;
733 size_t curLIDoffset = 0;
734 for (size_type i = 0; i < actualNumReceives; ++i) {
735 size_t totalPacketsFrom_i = 0;
736 for (
size_t j = 0; j < plan.getLengthsFrom()[i]; ++j) {
737 totalPacketsFrom_i += numImportPacketsPerLID[curLIDoffset + j];
739 curLIDoffset += plan.getLengthsFrom()[i];
741 rdispls[plan.getProcsFrom()[i]] = curBufferOffset;
744 TEUCHOS_TEST_FOR_EXCEPTION(totalPacketsFrom_i >
size_t(INT_MAX),
746 "Tpetra::Distributor::doPosts(3 args, Kokkos): "
747 "Recv count for receive "
748 << i <<
" (" << totalPacketsFrom_i
750 "to be represented as int.");
751 recvcounts[plan.getProcsFrom()[i]] =
static_cast<int>(totalPacketsFrom_i);
752 curBufferOffset += totalPacketsFrom_i;
755 Teuchos::RCP<const Teuchos::MpiComm<int>> mpiComm =
756 Teuchos::rcp_dynamic_cast<
const Teuchos::MpiComm<int>>(comm);
757 Teuchos::RCP<const Teuchos::OpaqueWrapper<MPI_Comm>> rawComm =
758 mpiComm->getRawMpiComm();
759 using T =
typename ExpView::non_const_value_type;
760 MPI_Datatype rawType = ::Tpetra::Details::MpiTypeTraits<T>::getType(T());
762 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
763 if (Details::DISTRIBUTOR_MPIADVANCE_ALLTOALL == plan.getSendType()) {
764 MPIX_Comm *mpixComm = *plan.getMPIXComm();
765 TEUCHOS_TEST_FOR_EXCEPTION(!mpixComm, std::runtime_error,
766 "MPIX_Comm is null in doPostsAllToAll \""
767 << __FILE__ <<
":" << __LINE__);
769 const int err = MPIX_Alltoallv(
770 exports.data(), sendcounts.data(), sdispls.data(), rawType,
771 imports.data(), recvcounts.data(), rdispls.data(), rawType, mpixComm);
773 TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error,
774 "MPIX_Alltoallv failed with error \""
775 << Teuchos::mpiErrorCodeToString(err)
780 #endif // HAVE_TPETRACORE_MPI_ADVANCE
782 const int err = MPI_Alltoallv(
783 exports.data(), sendcounts.data(), sdispls.data(), rawType,
784 imports.data(), recvcounts.data(), rdispls.data(), rawType, (*rawComm)());
786 TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error,
787 "MPI_Alltoallv failed with error \""
788 << Teuchos::mpiErrorCodeToString(err)
792 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
793 template <
class ExpView,
class ImpView>
794 void DistributorActor::doPostsNbrAllToAllV(
795 const DistributorPlan &plan,
const ExpView &exports,
796 const Teuchos::ArrayView<const size_t> &numExportPacketsPerLID,
797 const ImpView &imports,
798 const Teuchos::ArrayView<const size_t> &numImportPacketsPerLID) {
799 TEUCHOS_TEST_FOR_EXCEPTION(
800 !plan.getIndicesTo().is_null(), std::runtime_error,
801 "Send Type=\"Alltoall\" only works for fast-path communication.");
803 const Teuchos_Ordinal numSends = plan.getProcsTo().size();
804 const Teuchos_Ordinal numRecvs = plan.getProcsFrom().size();
806 auto comm = plan.getComm();
807 std::vector<int> sendcounts(numSends, 0);
808 std::vector<int> sdispls(numSends, 0);
809 std::vector<int> recvcounts(numRecvs, 0);
810 std::vector<int> rdispls(numRecvs, 0);
812 Teuchos::RCP<const Teuchos::MpiComm<int>> mpiComm =
813 Teuchos::rcp_dynamic_cast<
const Teuchos::MpiComm<int>>(comm);
814 Teuchos::RCP<const Teuchos::OpaqueWrapper<MPI_Comm>> rawComm =
815 mpiComm->getRawMpiComm();
816 using T =
typename ExpView::non_const_value_type;
817 MPI_Datatype rawType = ::Tpetra::Details::MpiTypeTraits<T>::getType(T());
821 size_t curPKToffset = 0;
822 for (Teuchos_Ordinal pp = 0; pp < numSends; ++pp) {
823 sdispls[pp] = curPKToffset;
824 size_t numPackets = 0;
825 for (
size_t j = plan.getStartsTo()[pp];
826 j < plan.getStartsTo()[pp] + plan.getLengthsTo()[pp]; ++j) {
827 numPackets += numExportPacketsPerLID[j];
830 TEUCHOS_TEST_FOR_EXCEPTION(numPackets >
size_t(INT_MAX), std::logic_error,
831 "Tpetra::Distributor::doPosts(4 args, Kokkos): "
832 "Send count for send "
833 << pp <<
" (" << numPackets
835 "to be represented as int.");
836 sendcounts[pp] =
static_cast<int>(numPackets);
837 curPKToffset += numPackets;
839 size_t curBufferOffset = 0;
840 size_t curLIDoffset = 0;
841 for (Teuchos_Ordinal i = 0; i < numRecvs; ++i) {
842 size_t totalPacketsFrom_i = 0;
843 for (
size_t j = 0; j < plan.getLengthsFrom()[i]; ++j) {
844 totalPacketsFrom_i += numImportPacketsPerLID[curLIDoffset + j];
846 curLIDoffset += plan.getLengthsFrom()[i];
848 rdispls[i] = curBufferOffset;
851 TEUCHOS_TEST_FOR_EXCEPTION(totalPacketsFrom_i >
size_t(INT_MAX),
853 "Tpetra::Distributor::doPosts(3 args, Kokkos): "
854 "Recv count for receive "
855 << i <<
" (" << totalPacketsFrom_i
857 "to be represented as int.");
858 recvcounts[i] =
static_cast<int>(totalPacketsFrom_i);
859 curBufferOffset += totalPacketsFrom_i;
862 MPIX_Comm *mpixComm = *plan.getMPIXComm();
863 const int err = MPIX_Neighbor_alltoallv(
864 exports.data(), sendcounts.data(), sdispls.data(), rawType,
865 imports.data(), recvcounts.data(), rdispls.data(), rawType, mpixComm);
867 TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error,
868 "MPIX_Neighbor_alltoallv failed with error \""
869 << Teuchos::mpiErrorCodeToString(err)
872 #endif // HAVE_TPETRACORE_MPI_ADVANCE
873 #endif // HAVE_TPETRA_MPI
876 template <
class ExpView,
class ImpView>
877 void DistributorActor::doPosts(
const DistributorPlan& plan,
878 const ExpView &exports,
879 const Teuchos::ArrayView<const size_t>& numExportPacketsPerLID,
880 const ImpView &imports,
881 const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID)
883 static_assert(areKokkosViews<ExpView, ImpView>,
884 "Data arrays for DistributorActor::doPosts must be Kokkos::Views");
885 using Teuchos::Array;
887 using Teuchos::ireceive;
888 using Teuchos::isend;
890 using Teuchos::TypeNameTraits;
892 using Kokkos::Compat::create_const_view;
893 using Kokkos::Compat::create_view;
894 using Kokkos::Compat::subview_offset;
895 using Kokkos::Compat::deep_copy_offset;
896 typedef Array<size_t>::size_type size_type;
897 typedef ExpView exports_view_type;
898 typedef ImpView imports_view_type;
900 #ifdef KOKKOS_ENABLE_CUDA
901 static_assert (! std::is_same<typename ExpView::memory_space, Kokkos::CudaUVMSpace>::value &&
902 ! std::is_same<typename ImpView::memory_space, Kokkos::CudaUVMSpace>::value,
903 "Please do not use Tpetra::Distributor with UVM "
904 "allocations. See GitHub issue #1088.");
905 #endif // KOKKOS_ENABLE_CUDA
907 #ifdef KOKKOS_ENABLE_SYCL
908 static_assert (! std::is_same<typename ExpView::memory_space, Kokkos::Experimental::SYCLSharedUSMSpace>::value &&
909 ! std::is_same<typename ImpView::memory_space, Kokkos::Experimental::SYCLSharedUSMSpace>::value,
910 "Please do not use Tpetra::Distributor with SharedUSM "
911 "allocations. See GitHub issue #1088 (corresponding to CUDA).");
912 #endif // KOKKOS_ENABLE_SYCL
914 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
915 Teuchos::TimeMonitor timeMon (*timer_doPosts4KV_);
916 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
922 #ifdef HAVE_TPETRA_MPI
925 if (sendType == Details::DISTRIBUTOR_ALLTOALL) {
926 doPostsAllToAll(plan, exports, numExportPacketsPerLID, imports, numImportPacketsPerLID);
929 #ifdef HAVE_TPETRACORE_MPI_ADVANCE
930 else if (sendType == Details::DISTRIBUTOR_MPIADVANCE_ALLTOALL)
932 doPostsAllToAll(plan, exports, numExportPacketsPerLID, imports, numImportPacketsPerLID);
934 }
else if (sendType == Details::DISTRIBUTOR_MPIADVANCE_NBRALLTOALLV) {
935 doPostsNbrAllToAllV(plan, exports, numExportPacketsPerLID, imports, numImportPacketsPerLID);
940 #else // HAVE_TPETRA_MPI
941 if (plan.hasSelfMessage()) {
943 size_t selfReceiveOffset = 0;
947 Array<size_t> sendPacketOffsets(plan.getNumSends(),0), packetsPerSend(plan.getNumSends(),0);
948 size_t maxNumPackets = 0;
949 size_t curPKToffset = 0;
950 for (
size_t pp=0; pp<plan.getNumSends(); ++pp) {
951 sendPacketOffsets[pp] = curPKToffset;
952 size_t numPackets = 0;
953 for (
size_t j=plan.getStartsTo()[pp]; j<plan.getStartsTo()[pp]+plan.getLengthsTo()[pp]; ++j) {
954 numPackets += numExportPacketsPerLID[j];
956 if (numPackets > maxNumPackets) maxNumPackets = numPackets;
957 packetsPerSend[pp] = numPackets;
958 curPKToffset += numPackets;
961 deep_copy_offset(imports, exports, selfReceiveOffset,
962 sendPacketOffsets[0], packetsPerSend[0]);
964 #endif // HAVE_TPETRA_MPI
966 const int myProcID = plan.getComm()->getRank ();
967 size_t selfReceiveOffset = 0;
969 #ifdef HAVE_TPETRA_DEBUG
971 size_t totalNumImportPackets = 0;
972 for (size_type ii = 0; ii < numImportPacketsPerLID.size (); ++ii) {
973 totalNumImportPackets += numImportPacketsPerLID[ii];
975 TEUCHOS_TEST_FOR_EXCEPTION(
976 imports.extent (0) < totalNumImportPackets, std::runtime_error,
977 "Tpetra::Distributor::doPosts(4 args, Kokkos): The 'imports' array must have "
978 "enough entries to hold the expected number of import packets. "
979 "imports.extent(0) = " << imports.extent (0) <<
" < "
980 "totalNumImportPackets = " << totalNumImportPackets <<
".");
981 TEUCHOS_TEST_FOR_EXCEPTION
982 (requests_.size () != 0, std::logic_error,
"Tpetra::Distributor::"
983 "doPosts(4 args, Kokkos): Process " << myProcID <<
": requests_.size () = "
984 << requests_.size () <<
" != 0.");
985 #endif // HAVE_TPETRA_DEBUG
999 const size_type actualNumReceives = as<size_type> (plan.getNumReceives()) +
1000 as<size_type> (plan.hasSelfMessage() ? 1 : 0);
1001 requests_.resize (0);
1009 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
1010 Teuchos::TimeMonitor timeMonRecvs (*timer_doPosts4KV_recvs_);
1011 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
1013 size_t curBufferOffset = 0;
1014 size_t curLIDoffset = 0;
1015 for (size_type i = 0; i < actualNumReceives; ++i) {
1016 size_t totalPacketsFrom_i = 0;
1017 for (
size_t j = 0; j < plan.getLengthsFrom()[i]; ++j) {
1018 totalPacketsFrom_i += numImportPacketsPerLID[curLIDoffset+j];
1021 TEUCHOS_TEST_FOR_EXCEPTION(totalPacketsFrom_i >
size_t(INT_MAX),
1022 std::logic_error,
"Tpetra::Distributor::doPosts(3 args, Kokkos): "
1023 "Recv count for receive " << i <<
" (" << totalPacketsFrom_i <<
") is too large "
1024 "to be represented as int.");
1025 curLIDoffset += plan.getLengthsFrom()[i];
1026 if (plan.getProcsFrom()[i] != myProcID && totalPacketsFrom_i) {
1035 imports_view_type recvBuf =
1036 subview_offset (imports, curBufferOffset, totalPacketsFrom_i);
1037 requests_.push_back (ireceive<int> (recvBuf, plan.getProcsFrom()[i],
1038 mpiTag_, *plan.getComm()));
1041 selfReceiveOffset = curBufferOffset;
1043 curBufferOffset += totalPacketsFrom_i;
1047 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
1048 Teuchos::TimeMonitor timeMonSends (*timer_doPosts4KV_sends_);
1049 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
1053 Array<size_t> sendPacketOffsets(plan.getNumSends(),0), packetsPerSend(plan.getNumSends(),0);
1054 size_t maxNumPackets = 0;
1055 size_t curPKToffset = 0;
1056 for (
size_t pp=0; pp<plan.getNumSends(); ++pp) {
1057 sendPacketOffsets[pp] = curPKToffset;
1058 size_t numPackets = 0;
1059 for (
size_t j=plan.getStartsTo()[pp]; j<plan.getStartsTo()[pp]+plan.getLengthsTo()[pp]; ++j) {
1060 numPackets += numExportPacketsPerLID[j];
1062 if (numPackets > maxNumPackets) maxNumPackets = numPackets;
1064 TEUCHOS_TEST_FOR_EXCEPTION(numPackets >
size_t(INT_MAX),
1065 std::logic_error,
"Tpetra::Distributor::doPosts(4 args, Kokkos): "
1066 "packetsPerSend[" << pp <<
"] = " << numPackets <<
" is too large "
1067 "to be represented as int.");
1068 packetsPerSend[pp] = numPackets;
1069 curPKToffset += numPackets;
1074 size_t numBlocks = plan.getNumSends() + plan.hasSelfMessage();
1075 size_t procIndex = 0;
1076 while ((procIndex < numBlocks) && (plan.getProcsTo()[procIndex] < myProcID)) {
1079 if (procIndex == numBlocks) {
1084 size_t selfIndex = 0;
1085 if (plan.getIndicesTo().is_null()) {
1087 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
1088 Teuchos::TimeMonitor timeMonSends2 (*timer_doPosts4KV_sends_fast_);
1089 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
1093 for (
size_t i = 0; i < numBlocks; ++i) {
1094 size_t p = i + procIndex;
1095 if (p > (numBlocks - 1)) {
1099 if (plan.getProcsTo()[p] != myProcID && packetsPerSend[p] > 0) {
1100 exports_view_type tmpSend =
1101 subview_offset(exports, sendPacketOffsets[p], packetsPerSend[p]);
1103 if (sendType == Details::DISTRIBUTOR_ISEND) {
1104 exports_view_type tmpSendBuf =
1105 subview_offset (exports, sendPacketOffsets[p], packetsPerSend[p]);
1106 requests_.push_back (isend<int> (tmpSendBuf, plan.getProcsTo()[p],
1107 mpiTag_, *plan.getComm()));
1111 as<int> (tmpSend.size ()),
1112 plan.getProcsTo()[p], mpiTag_, *plan.getComm());
1120 if (plan.hasSelfMessage()) {
1121 deep_copy_offset(imports, exports, selfReceiveOffset,
1122 sendPacketOffsets[selfNum], packetsPerSend[selfNum]);
1127 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
1128 Teuchos::TimeMonitor timeMonSends2 (*timer_doPosts4KV_sends_slow_);
1129 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
1132 typedef typename ExpView::non_const_value_type Packet;
1133 typedef typename ExpView::array_layout Layout;
1134 typedef typename ExpView::device_type Device;
1135 typedef typename ExpView::memory_traits Mem;
1145 #ifdef HAVE_TPETRA_DEBUG
1146 if (sendType != Details::DISTRIBUTOR_SEND) {
1147 if (plan.getComm()->getRank() == 0)
1148 std::cout <<
"The requested Tpetra send type "
1150 <<
" requires Distributor data to be ordered by"
1151 <<
" the receiving processor rank. Since these"
1152 <<
" data are not ordered, Tpetra will use Send"
1153 <<
" instead." << std::endl;
1157 Kokkos::View<Packet*,Layout,Device,Mem> sendArray (
"sendArray",
1160 Array<size_t> indicesOffsets (numExportPacketsPerLID.size(), 0);
1162 for (
int j=0; j<numExportPacketsPerLID.size(); ++j) {
1163 indicesOffsets[j] = ioffset;
1164 ioffset += numExportPacketsPerLID[j];
1167 for (
size_t i = 0; i < numBlocks; ++i) {
1168 size_t p = i + procIndex;
1169 if (p > (numBlocks - 1)) {
1173 if (plan.getProcsTo()[p] != myProcID) {
1174 size_t sendArrayOffset = 0;
1175 size_t j = plan.getStartsTo()[p];
1176 size_t numPacketsTo_p = 0;
1177 for (
size_t k = 0; k < plan.getLengthsTo()[p]; ++k, ++j) {
1178 numPacketsTo_p += numExportPacketsPerLID[j];
1179 deep_copy_offset(sendArray, exports, sendArrayOffset,
1180 indicesOffsets[j], numExportPacketsPerLID[j]);
1181 sendArrayOffset += numExportPacketsPerLID[j];
1183 typename ExpView::execution_space().fence();
1185 if (numPacketsTo_p > 0) {
1187 subview_offset(sendArray,
size_t(0), numPacketsTo_p);
1190 as<int> (tmpSend.size ()),
1191 plan.getProcsTo()[p], mpiTag_, *plan.getComm());
1196 selfIndex = plan.getStartsTo()[p];
1200 if (plan.hasSelfMessage()) {
1201 for (
size_t k = 0; k < plan.getLengthsTo()[selfNum]; ++k) {
1202 deep_copy_offset(imports, exports, selfReceiveOffset,
1203 indicesOffsets[selfIndex],
1204 numExportPacketsPerLID[selfIndex]);
1205 selfReceiveOffset += numExportPacketsPerLID[selfIndex];
Add specializations of Teuchos::Details::MpiTypeTraits for Kokkos::complex<float> and Kokkos::complex...
std::string DistributorSendTypeEnumToString(EDistributorSendType sendType)
Convert an EDistributorSendType enum value to a string.
std::string DistributorHowInitializedEnumToString(EDistributorHowInitialized how)
Convert an EDistributorHowInitialized enum value to a string.
Stand-alone utility functions and macros.
EDistributorSendType
The type of MPI send that Distributor should use.