12 #ifndef TPETRA_DETAILS_DISTRIBUTOR_ACTOR_HPP
13 #define TPETRA_DETAILS_DISTRIBUTOR_ACTOR_HPP
18 #include "Teuchos_Array.hpp"
19 #include "Teuchos_Comm.hpp"
21 #include "Teuchos_RCP.hpp"
22 #include "Teuchos_Time.hpp"
24 #include "Kokkos_TeuchosCommAdapters.hpp"
26 #ifdef HAVE_TPETRA_MPI
33 template <
class View1,
class View2>
34 constexpr
bool areKokkosViews = Kokkos::is_view<View1>::value && Kokkos::is_view<View2>::value;
36 class DistributorActor {
37 static constexpr
int DEFAULT_MPI_TAG = 1;
41 DistributorActor(
const DistributorActor& otherActor);
43 template <
class ExpView,
class ImpView>
44 void doPostsAndWaits(
const DistributorPlan& plan,
45 const ExpView &exports,
47 const ImpView &imports);
49 template <
class ExpView,
class ImpView>
50 void doPostsAndWaits(
const DistributorPlan& plan,
51 const ExpView &exports,
52 const Teuchos::ArrayView<const size_t>& numExportPacketsPerLID,
53 const ImpView &imports,
54 const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID);
56 template <
class ExpView,
class ImpView>
57 void doPosts(
const DistributorPlan& plan,
58 const ExpView& exports,
60 const ImpView& imports);
62 template <
class ExpView,
class ImpView>
63 void doPosts(
const DistributorPlan& plan,
64 const ExpView &exports,
65 const Teuchos::ArrayView<const size_t>& numExportPacketsPerLID,
66 const ImpView &imports,
67 const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID);
69 void doWaits(
const DistributorPlan& plan);
75 #ifdef HAVE_TPETRA_MPI
76 template <
class ExpView,
class ImpView>
77 void doPostsAllToAll(
const DistributorPlan &plan,
const ExpView &exports,
78 size_t numPackets,
const ImpView &imports);
80 template <
class ExpView,
class ImpView>
82 const DistributorPlan &plan,
const ExpView &exports,
83 const Teuchos::ArrayView<const size_t> &numExportPacketsPerLID,
84 const ImpView &imports,
85 const Teuchos::ArrayView<const size_t> &numImportPacketsPerLID);
87 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
88 template <
class ExpView,
class ImpView>
89 void doPostsNbrAllToAllV(
const DistributorPlan &plan,
const ExpView &exports,
90 size_t numPackets,
const ImpView &imports);
92 template <
class ExpView,
class ImpView>
93 void doPostsNbrAllToAllV(
94 const DistributorPlan &plan,
const ExpView &exports,
95 const Teuchos::ArrayView<const size_t> &numExportPacketsPerLID,
96 const ImpView &imports,
97 const Teuchos::ArrayView<const size_t> &numImportPacketsPerLID);
98 #endif // HAVE_TPETRACORE_MPI_ADVANCE
99 #endif // HAVE_TPETRA_CORE
103 Teuchos::Array<Teuchos::RCP<Teuchos::CommRequest<int>>> requests_;
105 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
106 Teuchos::RCP<Teuchos::Time> timer_doPosts3KV_;
107 Teuchos::RCP<Teuchos::Time> timer_doPosts4KV_;
108 Teuchos::RCP<Teuchos::Time> timer_doWaits_;
109 Teuchos::RCP<Teuchos::Time> timer_doPosts3KV_recvs_;
110 Teuchos::RCP<Teuchos::Time> timer_doPosts4KV_recvs_;
111 Teuchos::RCP<Teuchos::Time> timer_doPosts3KV_barrier_;
112 Teuchos::RCP<Teuchos::Time> timer_doPosts4KV_barrier_;
113 Teuchos::RCP<Teuchos::Time> timer_doPosts3KV_sends_;
114 Teuchos::RCP<Teuchos::Time> timer_doPosts4KV_sends_;
115 Teuchos::RCP<Teuchos::Time> timer_doPosts3KV_sends_slow_;
116 Teuchos::RCP<Teuchos::Time> timer_doPosts4KV_sends_slow_;
117 Teuchos::RCP<Teuchos::Time> timer_doPosts3KV_sends_fast_;
118 Teuchos::RCP<Teuchos::Time> timer_doPosts4KV_sends_fast_;
122 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
125 template <
class ExpView,
class ImpView>
126 void DistributorActor::doPostsAndWaits(
const DistributorPlan& plan,
127 const ExpView& exports,
129 const ImpView& imports)
131 static_assert(areKokkosViews<ExpView, ImpView>,
132 "Data arrays for DistributorActor::doPostsAndWaits must be Kokkos::Views");
133 doPosts(plan, exports, numPackets, imports);
137 template <
class ExpView,
class ImpView>
138 void DistributorActor::doPostsAndWaits(
const DistributorPlan& plan,
139 const ExpView& exports,
140 const Teuchos::ArrayView<const size_t>& numExportPacketsPerLID,
141 const ImpView& imports,
142 const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID)
144 static_assert(areKokkosViews<ExpView, ImpView>,
145 "Data arrays for DistributorActor::doPostsAndWaits must be Kokkos::Views");
146 doPosts(plan, exports, numExportPacketsPerLID, imports, numImportPacketsPerLID);
150 template <
typename ViewType>
151 using HostAccessibility = Kokkos::SpaceAccessibility<Kokkos::DefaultHostExecutionSpace, typename ViewType::memory_space>;
153 template <
typename DstViewType,
typename SrcViewType>
154 using enableIfHostAccessible = std::enable_if_t<HostAccessibility<DstViewType>::accessible &&
155 HostAccessibility<SrcViewType>::accessible>;
157 template <
typename DstViewType,
typename SrcViewType>
158 using enableIfNotHostAccessible = std::enable_if_t<!HostAccessibility<DstViewType>::accessible ||
159 !HostAccessibility<SrcViewType>::accessible>;
161 template <
typename DstViewType,
typename SrcViewType>
162 enableIfHostAccessible<DstViewType, SrcViewType>
163 packOffset(
const DstViewType& dst,
164 const SrcViewType& src,
165 const size_t dst_offset,
166 const size_t src_offset,
169 memcpy((
void*) (dst.data()+dst_offset), src.data()+src_offset, size*
sizeof(
typename DstViewType::value_type));
172 template <
typename DstViewType,
typename SrcViewType>
173 enableIfNotHostAccessible<DstViewType, SrcViewType>
174 packOffset(
const DstViewType& dst,
175 const SrcViewType& src,
176 const size_t dst_offset,
177 const size_t src_offset,
180 Kokkos::Compat::deep_copy_offset(dst, src, dst_offset, src_offset, size);
184 #ifdef HAVE_TPETRA_MPI
185 template <
class ExpView,
class ImpView>
186 void DistributorActor::doPostsAllToAll(
const DistributorPlan &plan,
187 const ExpView &exports,
189 const ImpView &imports) {
190 using size_type = Teuchos::Array<size_t>::size_type;
192 TEUCHOS_TEST_FOR_EXCEPTION(
193 !plan.getIndicesTo().is_null(), std::runtime_error,
194 "Send Type=\"Alltoall\" only works for fast-path communication.");
196 auto comm = plan.getComm();
197 const int myRank = comm->getRank();
198 std::vector<int> sendcounts(comm->getSize(), 0);
199 std::vector<int> sdispls(comm->getSize(), 0);
200 std::vector<int> recvcounts(comm->getSize(), 0);
201 std::vector<int> rdispls(comm->getSize(), 0);
203 size_t numBlocks = plan.getNumSends() + plan.hasSelfMessage();
204 for (
size_t p = 0; p < numBlocks; ++p) {
205 sdispls[plan.getProcsTo()[p]] = plan.getStartsTo()[p] * numPackets;
206 size_t sendcount = plan.getLengthsTo()[p] * numPackets;
208 TEUCHOS_TEST_FOR_EXCEPTION(sendcount >
size_t(INT_MAX), std::logic_error,
209 "Tpetra::Distributor::doPosts(3 args, Kokkos): "
210 "Send count for block "
211 << p <<
" (" << sendcount
213 "to be represented as int.");
214 sendcounts[plan.getProcsTo()[p]] =
static_cast<int>(sendcount);
217 const size_type actualNumReceives =
218 Teuchos::as<size_type>(plan.getNumReceives()) +
219 Teuchos::as<size_type>(plan.hasSelfMessage() ? 1 : 0);
220 size_t curBufferOffset = 0;
221 for (size_type i = 0; i < actualNumReceives; ++i) {
222 const size_t curBufLen = plan.getLengthsFrom()[i] * numPackets;
223 TEUCHOS_TEST_FOR_EXCEPTION(
224 curBufferOffset + curBufLen > static_cast<size_t>(imports.size()),
226 "Tpetra::Distributor::doPosts(3 args, Kokkos): "
227 "Exceeded size of 'imports' array in packing loop on Process "
228 << myRank <<
". imports.size() = " << imports.size()
231 << curBufferOffset <<
") + curBufLen(" << curBufLen <<
").");
232 rdispls[plan.getProcsFrom()[i]] = curBufferOffset;
234 TEUCHOS_TEST_FOR_EXCEPTION(curBufLen >
size_t(INT_MAX), std::logic_error,
235 "Tpetra::Distributor::doPosts(3 args, Kokkos): "
236 "Recv count for receive "
237 << i <<
" (" << curBufLen
239 "to be represented as int.");
240 recvcounts[plan.getProcsFrom()[i]] =
static_cast<int>(curBufLen);
241 curBufferOffset += curBufLen;
244 using T =
typename ExpView::non_const_value_type;
245 MPI_Datatype rawType = ::Tpetra::Details::MpiTypeTraits<T>::getType(T());
247 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
248 if (Details::DISTRIBUTOR_MPIADVANCE_ALLTOALL == plan.getSendType()) {
249 MPIX_Comm *mpixComm = *plan.getMPIXComm();
250 TEUCHOS_TEST_FOR_EXCEPTION(
251 !mpixComm, std::runtime_error,
252 "plan's MPIX_Comm null in doPostsAllToAll, but "
253 "DISTRIBUTOR_MPIADVANCE_ALLTOALL set: plan.howInitialized()="
256 const int err = MPIX_Alltoallv(
257 exports.data(), sendcounts.data(), sdispls.data(), rawType,
258 imports.data(), recvcounts.data(), rdispls.data(), rawType, mpixComm);
260 TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error,
261 "MPIX_Alltoallv failed with error \""
262 << Teuchos::mpiErrorCodeToString(err)
268 Teuchos::RCP<const Teuchos::MpiComm<int>> mpiComm =
269 Teuchos::rcp_dynamic_cast<
const Teuchos::MpiComm<int>>(comm);
270 Teuchos::RCP<const Teuchos::OpaqueWrapper<MPI_Comm>> rawComm =
271 mpiComm->getRawMpiComm();
273 const int err = MPI_Alltoallv(
274 exports.data(), sendcounts.data(), sdispls.data(), rawType,
275 imports.data(), recvcounts.data(), rdispls.data(), rawType, (*rawComm)());
277 TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error,
278 "MPI_Alltoallv failed with error \""
279 << Teuchos::mpiErrorCodeToString(err)
285 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
286 template <
class ExpView,
class ImpView>
287 void DistributorActor::doPostsNbrAllToAllV(
const DistributorPlan &plan,
288 const ExpView &exports,
290 const ImpView &imports) {
291 TEUCHOS_TEST_FOR_EXCEPTION(
292 !plan.getIndicesTo().is_null(), std::runtime_error,
293 "Send Type=\"Alltoall\" only works for fast-path communication.");
295 const int myRank = plan.getComm()->getRank();
296 MPIX_Comm *mpixComm = *plan.getMPIXComm();
298 const size_t numSends = plan.getNumSends() + plan.hasSelfMessage();
299 const size_t numRecvs = plan.getNumReceives() + plan.hasSelfMessage();
300 std::vector<int> sendcounts(numSends, 0);
301 std::vector<int> sdispls(numSends, 0);
302 std::vector<int> recvcounts(numRecvs, 0);
303 std::vector<int> rdispls(numRecvs, 0);
305 for (
size_t p = 0; p < numSends; ++p) {
306 sdispls[p] = plan.getStartsTo()[p] * numPackets;
307 const size_t sendcount = plan.getLengthsTo()[p] * numPackets;
309 TEUCHOS_TEST_FOR_EXCEPTION(sendcount >
size_t(INT_MAX), std::logic_error,
310 "Tpetra::Distributor::doPosts(3 args, Kokkos): "
311 "Send count for block "
312 << p <<
" (" << sendcount
314 "to be represented as int.");
315 sendcounts[p] =
static_cast<int>(sendcount);
318 size_t curBufferOffset = 0;
319 for (
size_t i = 0; i < numRecvs; ++i) {
320 const size_t curBufLen = plan.getLengthsFrom()[i] * numPackets;
321 TEUCHOS_TEST_FOR_EXCEPTION(
322 curBufferOffset + curBufLen > static_cast<size_t>(imports.size()),
324 "Tpetra::Distributor::doPosts(3 args, Kokkos): "
325 "Exceeded size of 'imports' array in packing loop on Process "
326 << myRank <<
". imports.size() = " << imports.size()
329 << curBufferOffset <<
") + curBufLen(" << curBufLen <<
").");
330 rdispls[i] = curBufferOffset;
332 TEUCHOS_TEST_FOR_EXCEPTION(curBufLen >
size_t(INT_MAX), std::logic_error,
333 "Tpetra::Distributor::doPosts(3 args, Kokkos): "
334 "Recv count for receive "
335 << i <<
" (" << curBufLen
337 "to be represented as int.");
338 recvcounts[i] =
static_cast<int>(curBufLen);
339 curBufferOffset += curBufLen;
342 using T =
typename ExpView::non_const_value_type;
343 MPI_Datatype rawType = ::Tpetra::Details::MpiTypeTraits<T>::getType(T());
345 const int err = MPIX_Neighbor_alltoallv(
346 exports.data(), sendcounts.data(), sdispls.data(), rawType,
347 imports.data(), recvcounts.data(), rdispls.data(), rawType, mpixComm);
349 TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error,
350 "MPIX_Neighbor_alltoallv failed with error \""
351 << Teuchos::mpiErrorCodeToString(err)
354 #endif // HAVE_TPETRACORE_MPI_ADVANCE
355 #endif // HAVE_TPETRA_MPI
358 template <
class ExpView,
class ImpView>
359 void DistributorActor::doPosts(
const DistributorPlan& plan,
360 const ExpView& exports,
362 const ImpView& imports)
364 static_assert(areKokkosViews<ExpView, ImpView>,
365 "Data arrays for DistributorActor::doPosts must be Kokkos::Views");
366 using Teuchos::Array;
368 using Teuchos::FancyOStream;
369 using Teuchos::includesVerbLevel;
370 using Teuchos::ireceive;
371 using Teuchos::isend;
373 using Teuchos::TypeNameTraits;
374 using Teuchos::typeName;
376 using Kokkos::Compat::create_const_view;
377 using Kokkos::Compat::create_view;
378 using Kokkos::Compat::subview_offset;
379 using Kokkos::Compat::deep_copy_offset;
380 typedef Array<size_t>::size_type size_type;
381 typedef ExpView exports_view_type;
382 typedef ImpView imports_view_type;
384 #ifdef KOKKOS_ENABLE_CUDA
386 (! std::is_same<typename ExpView::memory_space, Kokkos::CudaUVMSpace>::value &&
387 ! std::is_same<typename ImpView::memory_space, Kokkos::CudaUVMSpace>::value,
388 "Please do not use Tpetra::Distributor with UVM allocations. "
389 "See Trilinos GitHub issue #1088.");
390 #endif // KOKKOS_ENABLE_CUDA
392 #ifdef KOKKOS_ENABLE_SYCL
394 (! std::is_same<typename ExpView::memory_space, Kokkos::Experimental::SYCLSharedUSMSpace>::value &&
395 ! std::is_same<typename ImpView::memory_space, Kokkos::Experimental::SYCLSharedUSMSpace>::value,
396 "Please do not use Tpetra::Distributor with SharedUSM allocations. "
397 "See Trilinos GitHub issue #1088 (corresponding to CUDA).");
398 #endif // KOKKOS_ENABLE_SYCL
400 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
401 Teuchos::TimeMonitor timeMon (*timer_doPosts3KV_);
402 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
404 const int myRank = plan.getComm()->getRank ();
410 #if defined(HAVE_TPETRA_MPI)
414 if (sendType == Details::DISTRIBUTOR_ALLTOALL) {
415 doPostsAllToAll(plan, exports,numPackets, imports);
418 #ifdef HAVE_TPETRACORE_MPI_ADVANCE
419 else if (sendType == Details::DISTRIBUTOR_MPIADVANCE_ALLTOALL) {
420 doPostsAllToAll(plan, exports,numPackets, imports);
422 }
else if (sendType == Details::DISTRIBUTOR_MPIADVANCE_NBRALLTOALLV) {
423 doPostsNbrAllToAllV(plan, exports,numPackets, imports);
426 #endif // defined(HAVE_TPETRACORE_MPI_ADVANCE)
429 #else // HAVE_TPETRA_MPI
430 if (plan.hasSelfMessage()) {
438 size_t selfReceiveOffset = 0;
439 deep_copy_offset(imports, exports, selfReceiveOffset,
440 plan.getStartsTo()[0]*numPackets,
441 plan.getLengthsTo()[0]*numPackets);
445 #endif // HAVE_TPETRA_MPI
447 size_t selfReceiveOffset = 0;
449 #ifdef HAVE_TPETRA_DEBUG
450 TEUCHOS_TEST_FOR_EXCEPTION
451 (requests_.size () != 0,
453 "Tpetra::Distributor::doPosts(3 args, Kokkos): Process "
454 << myRank <<
": requests_.size() = " << requests_.size () <<
" != 0.");
455 #endif // HAVE_TPETRA_DEBUG
470 const size_type actualNumReceives = as<size_type> (plan.getNumReceives()) +
471 as<size_type> (plan.hasSelfMessage() ? 1 : 0);
472 requests_.resize (0);
480 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
481 Teuchos::TimeMonitor timeMonRecvs (*timer_doPosts3KV_recvs_);
482 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
484 size_t curBufferOffset = 0;
485 for (size_type i = 0; i < actualNumReceives; ++i) {
486 const size_t curBufLen = plan.getLengthsFrom()[i] * numPackets;
487 if (plan.getProcsFrom()[i] != myRank) {
495 TEUCHOS_TEST_FOR_EXCEPTION(
496 curBufferOffset + curBufLen > static_cast<size_t> (imports.size ()),
497 std::logic_error,
"Tpetra::Distributor::doPosts(3 args, Kokkos): "
498 "Exceeded size of 'imports' array in packing loop on Process " <<
499 myRank <<
". imports.size() = " << imports.size () <<
" < "
500 "curBufferOffset(" << curBufferOffset <<
") + curBufLen(" <<
502 imports_view_type recvBuf =
503 subview_offset (imports, curBufferOffset, curBufLen);
504 requests_.push_back (ireceive<int> (recvBuf, plan.getProcsFrom()[i],
505 mpiTag_, *plan.getComm()));
508 selfReceiveOffset = curBufferOffset;
510 curBufferOffset += curBufLen;
514 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
515 Teuchos::TimeMonitor timeMonSends (*timer_doPosts3KV_sends_);
516 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
523 size_t numBlocks = plan.getNumSends() + plan.hasSelfMessage();
524 size_t procIndex = 0;
525 while ((procIndex < numBlocks) && (plan.getProcsTo()[procIndex] < myRank)) {
528 if (procIndex == numBlocks) {
533 size_t selfIndex = 0;
535 if (plan.getIndicesTo().is_null()) {
537 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
538 Teuchos::TimeMonitor timeMonSends2 (*timer_doPosts3KV_sends_fast_);
539 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
543 for (
size_t i = 0; i < numBlocks; ++i) {
544 size_t p = i + procIndex;
545 if (p > (numBlocks - 1)) {
549 if (plan.getProcsTo()[p] != myRank) {
550 exports_view_type tmpSend = subview_offset(
551 exports, plan.getStartsTo()[p]*numPackets, plan.getLengthsTo()[p]*numPackets);
553 if (sendType == Details::DISTRIBUTOR_ISEND) {
557 exports_view_type tmpSendBuf =
558 subview_offset (exports, plan.getStartsTo()[p] * numPackets,
559 plan.getLengthsTo()[p] * numPackets);
560 requests_.push_back (isend<int> (tmpSendBuf, plan.getProcsTo()[p],
561 mpiTag_, *plan.getComm()));
565 as<int> (tmpSend.size ()),
566 plan.getProcsTo()[p], mpiTag_, *plan.getComm());
574 if (plan.hasSelfMessage()) {
582 deep_copy_offset(imports, exports, selfReceiveOffset,
583 plan.getStartsTo()[selfNum]*numPackets,
584 plan.getLengthsTo()[selfNum]*numPackets);
590 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
591 Teuchos::TimeMonitor timeMonSends2 (*timer_doPosts3KV_sends_slow_);
592 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
594 typedef typename ExpView::non_const_value_type Packet;
595 typedef typename ExpView::array_layout Layout;
596 typedef typename ExpView::device_type Device;
597 typedef typename ExpView::memory_traits Mem;
607 #ifdef HAVE_TPETRA_DEBUG
608 if (sendType != Details::DISTRIBUTOR_SEND) {
609 if (plan.getComm()->getRank() == 0)
610 std::cout <<
"The requested Tpetra send type "
612 <<
" requires Distributor data to be ordered by"
613 <<
" the receiving processor rank. Since these"
614 <<
" data are not ordered, Tpetra will use Send"
615 <<
" instead." << std::endl;
619 Kokkos::View<Packet*,Layout,Device,Mem> sendArray (
"sendArray",
620 plan.getMaxSendLength() * numPackets);
622 for (
size_t i = 0; i < numBlocks; ++i) {
623 size_t p = i + procIndex;
624 if (p > (numBlocks - 1)) {
628 if (plan.getProcsTo()[p] != myRank) {
629 size_t sendArrayOffset = 0;
630 size_t j = plan.getStartsTo()[p];
631 for (
size_t k = 0; k < plan.getLengthsTo()[p]; ++k, ++j) {
632 packOffset(sendArray, exports, sendArrayOffset, plan.getIndicesTo()[j]*numPackets, numPackets);
633 sendArrayOffset += numPackets;
635 typename ExpView::execution_space().fence();
638 subview_offset(sendArray,
size_t(0), plan.getLengthsTo()[p]*numPackets);
641 as<int> (tmpSend.size ()),
642 plan.getProcsTo()[p], mpiTag_, *plan.getComm());
646 selfIndex = plan.getStartsTo()[p];
650 if (plan.hasSelfMessage()) {
651 for (
size_t k = 0; k < plan.getLengthsTo()[selfNum]; ++k) {
652 packOffset(imports, exports, selfReceiveOffset, plan.getIndicesTo()[selfIndex]*numPackets, numPackets);
654 selfReceiveOffset += numPackets;
661 #ifdef HAVE_TPETRA_MPI
662 template <
class ExpView,
class ImpView>
663 void DistributorActor::doPostsAllToAll(
664 const DistributorPlan &plan,
const ExpView &exports,
665 const Teuchos::ArrayView<const size_t> &numExportPacketsPerLID,
666 const ImpView &imports,
667 const Teuchos::ArrayView<const size_t> &numImportPacketsPerLID) {
668 TEUCHOS_TEST_FOR_EXCEPTION(
669 !plan.getIndicesTo().is_null(), std::runtime_error,
670 "Send Type=\"Alltoall\" only works for fast-path communication.");
672 using size_type = Teuchos::Array<size_t>::size_type;
674 auto comm = plan.getComm();
675 std::vector<int> sendcounts(comm->getSize(), 0);
676 std::vector<int> sdispls(comm->getSize(), 0);
677 std::vector<int> recvcounts(comm->getSize(), 0);
678 std::vector<int> rdispls(comm->getSize(), 0);
680 size_t curPKToffset = 0;
681 for (
size_t pp = 0; pp < plan.getNumSends(); ++pp) {
682 sdispls[plan.getProcsTo()[pp]] = curPKToffset;
683 size_t numPackets = 0;
684 for (
size_t j = plan.getStartsTo()[pp];
685 j < plan.getStartsTo()[pp] + plan.getLengthsTo()[pp]; ++j) {
686 numPackets += numExportPacketsPerLID[j];
689 TEUCHOS_TEST_FOR_EXCEPTION(numPackets >
size_t(INT_MAX), std::logic_error,
690 "Tpetra::Distributor::doPosts(4 args, Kokkos): "
691 "Send count for send "
692 << pp <<
" (" << numPackets
694 "to be represented as int.");
695 sendcounts[plan.getProcsTo()[pp]] =
static_cast<int>(numPackets);
696 curPKToffset += numPackets;
699 const size_type actualNumReceives =
700 Teuchos::as<size_type>(plan.getNumReceives()) +
701 Teuchos::as<size_type>(plan.hasSelfMessage() ? 1 : 0);
703 size_t curBufferOffset = 0;
704 size_t curLIDoffset = 0;
705 for (size_type i = 0; i < actualNumReceives; ++i) {
706 size_t totalPacketsFrom_i = 0;
707 for (
size_t j = 0; j < plan.getLengthsFrom()[i]; ++j) {
708 totalPacketsFrom_i += numImportPacketsPerLID[curLIDoffset + j];
710 curLIDoffset += plan.getLengthsFrom()[i];
712 rdispls[plan.getProcsFrom()[i]] = curBufferOffset;
715 TEUCHOS_TEST_FOR_EXCEPTION(totalPacketsFrom_i >
size_t(INT_MAX),
717 "Tpetra::Distributor::doPosts(3 args, Kokkos): "
718 "Recv count for receive "
719 << i <<
" (" << totalPacketsFrom_i
721 "to be represented as int.");
722 recvcounts[plan.getProcsFrom()[i]] =
static_cast<int>(totalPacketsFrom_i);
723 curBufferOffset += totalPacketsFrom_i;
726 Teuchos::RCP<const Teuchos::MpiComm<int>> mpiComm =
727 Teuchos::rcp_dynamic_cast<
const Teuchos::MpiComm<int>>(comm);
728 Teuchos::RCP<const Teuchos::OpaqueWrapper<MPI_Comm>> rawComm =
729 mpiComm->getRawMpiComm();
730 using T =
typename ExpView::non_const_value_type;
731 MPI_Datatype rawType = ::Tpetra::Details::MpiTypeTraits<T>::getType(T());
733 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
734 if (Details::DISTRIBUTOR_MPIADVANCE_ALLTOALL == plan.getSendType()) {
735 MPIX_Comm *mpixComm = *plan.getMPIXComm();
736 TEUCHOS_TEST_FOR_EXCEPTION(!mpixComm, std::runtime_error,
737 "MPIX_Comm is null in doPostsAllToAll \""
738 << __FILE__ <<
":" << __LINE__);
740 const int err = MPIX_Alltoallv(
741 exports.data(), sendcounts.data(), sdispls.data(), rawType,
742 imports.data(), recvcounts.data(), rdispls.data(), rawType, mpixComm);
744 TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error,
745 "MPIX_Alltoallv failed with error \""
746 << Teuchos::mpiErrorCodeToString(err)
751 #endif // HAVE_TPETRACORE_MPI_ADVANCE
753 const int err = MPI_Alltoallv(
754 exports.data(), sendcounts.data(), sdispls.data(), rawType,
755 imports.data(), recvcounts.data(), rdispls.data(), rawType, (*rawComm)());
757 TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error,
758 "MPI_Alltoallv failed with error \""
759 << Teuchos::mpiErrorCodeToString(err)
763 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
764 template <
class ExpView,
class ImpView>
765 void DistributorActor::doPostsNbrAllToAllV(
766 const DistributorPlan &plan,
const ExpView &exports,
767 const Teuchos::ArrayView<const size_t> &numExportPacketsPerLID,
768 const ImpView &imports,
769 const Teuchos::ArrayView<const size_t> &numImportPacketsPerLID) {
770 TEUCHOS_TEST_FOR_EXCEPTION(
771 !plan.getIndicesTo().is_null(), std::runtime_error,
772 "Send Type=\"Alltoall\" only works for fast-path communication.");
774 const Teuchos_Ordinal numSends = plan.getProcsTo().size();
775 const Teuchos_Ordinal numRecvs = plan.getProcsFrom().size();
777 auto comm = plan.getComm();
778 std::vector<int> sendcounts(numSends, 0);
779 std::vector<int> sdispls(numSends, 0);
780 std::vector<int> recvcounts(numRecvs, 0);
781 std::vector<int> rdispls(numRecvs, 0);
783 Teuchos::RCP<const Teuchos::MpiComm<int>> mpiComm =
784 Teuchos::rcp_dynamic_cast<
const Teuchos::MpiComm<int>>(comm);
785 Teuchos::RCP<const Teuchos::OpaqueWrapper<MPI_Comm>> rawComm =
786 mpiComm->getRawMpiComm();
787 using T =
typename ExpView::non_const_value_type;
788 MPI_Datatype rawType = ::Tpetra::Details::MpiTypeTraits<T>::getType(T());
792 size_t curPKToffset = 0;
793 for (Teuchos_Ordinal pp = 0; pp < numSends; ++pp) {
794 sdispls[pp] = curPKToffset;
795 size_t numPackets = 0;
796 for (
size_t j = plan.getStartsTo()[pp];
797 j < plan.getStartsTo()[pp] + plan.getLengthsTo()[pp]; ++j) {
798 numPackets += numExportPacketsPerLID[j];
801 TEUCHOS_TEST_FOR_EXCEPTION(numPackets >
size_t(INT_MAX), std::logic_error,
802 "Tpetra::Distributor::doPosts(4 args, Kokkos): "
803 "Send count for send "
804 << pp <<
" (" << numPackets
806 "to be represented as int.");
807 sendcounts[pp] =
static_cast<int>(numPackets);
808 curPKToffset += numPackets;
810 size_t curBufferOffset = 0;
811 size_t curLIDoffset = 0;
812 for (Teuchos_Ordinal i = 0; i < numRecvs; ++i) {
813 size_t totalPacketsFrom_i = 0;
814 for (
size_t j = 0; j < plan.getLengthsFrom()[i]; ++j) {
815 totalPacketsFrom_i += numImportPacketsPerLID[curLIDoffset + j];
817 curLIDoffset += plan.getLengthsFrom()[i];
819 rdispls[i] = curBufferOffset;
822 TEUCHOS_TEST_FOR_EXCEPTION(totalPacketsFrom_i >
size_t(INT_MAX),
824 "Tpetra::Distributor::doPosts(3 args, Kokkos): "
825 "Recv count for receive "
826 << i <<
" (" << totalPacketsFrom_i
828 "to be represented as int.");
829 recvcounts[i] =
static_cast<int>(totalPacketsFrom_i);
830 curBufferOffset += totalPacketsFrom_i;
833 MPIX_Comm *mpixComm = *plan.getMPIXComm();
834 const int err = MPIX_Neighbor_alltoallv(
835 exports.data(), sendcounts.data(), sdispls.data(), rawType,
836 imports.data(), recvcounts.data(), rdispls.data(), rawType, mpixComm);
838 TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error,
839 "MPIX_Neighbor_alltoallv failed with error \""
840 << Teuchos::mpiErrorCodeToString(err)
843 #endif // HAVE_TPETRACORE_MPI_ADVANCE
844 #endif // HAVE_TPETRA_MPI
847 template <
class ExpView,
class ImpView>
848 void DistributorActor::doPosts(
const DistributorPlan& plan,
849 const ExpView &exports,
850 const Teuchos::ArrayView<const size_t>& numExportPacketsPerLID,
851 const ImpView &imports,
852 const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID)
854 static_assert(areKokkosViews<ExpView, ImpView>,
855 "Data arrays for DistributorActor::doPosts must be Kokkos::Views");
856 using Teuchos::Array;
858 using Teuchos::ireceive;
859 using Teuchos::isend;
861 using Teuchos::TypeNameTraits;
863 using Kokkos::Compat::create_const_view;
864 using Kokkos::Compat::create_view;
865 using Kokkos::Compat::subview_offset;
866 using Kokkos::Compat::deep_copy_offset;
867 typedef Array<size_t>::size_type size_type;
868 typedef ExpView exports_view_type;
869 typedef ImpView imports_view_type;
871 #ifdef KOKKOS_ENABLE_CUDA
872 static_assert (! std::is_same<typename ExpView::memory_space, Kokkos::CudaUVMSpace>::value &&
873 ! std::is_same<typename ImpView::memory_space, Kokkos::CudaUVMSpace>::value,
874 "Please do not use Tpetra::Distributor with UVM "
875 "allocations. See GitHub issue #1088.");
876 #endif // KOKKOS_ENABLE_CUDA
878 #ifdef KOKKOS_ENABLE_SYCL
879 static_assert (! std::is_same<typename ExpView::memory_space, Kokkos::Experimental::SYCLSharedUSMSpace>::value &&
880 ! std::is_same<typename ImpView::memory_space, Kokkos::Experimental::SYCLSharedUSMSpace>::value,
881 "Please do not use Tpetra::Distributor with SharedUSM "
882 "allocations. See GitHub issue #1088 (corresponding to CUDA).");
883 #endif // KOKKOS_ENABLE_SYCL
885 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
886 Teuchos::TimeMonitor timeMon (*timer_doPosts4KV_);
887 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
893 #ifdef HAVE_TPETRA_MPI
896 if (sendType == Details::DISTRIBUTOR_ALLTOALL) {
897 doPostsAllToAll(plan, exports, numExportPacketsPerLID, imports, numImportPacketsPerLID);
900 #ifdef HAVE_TPETRACORE_MPI_ADVANCE
901 else if (sendType == Details::DISTRIBUTOR_MPIADVANCE_ALLTOALL)
903 doPostsAllToAll(plan, exports, numExportPacketsPerLID, imports, numImportPacketsPerLID);
905 }
else if (sendType == Details::DISTRIBUTOR_MPIADVANCE_NBRALLTOALLV) {
906 doPostsNbrAllToAllV(plan, exports, numExportPacketsPerLID, imports, numImportPacketsPerLID);
911 #else // HAVE_TPETRA_MPI
912 if (plan.hasSelfMessage()) {
914 size_t selfReceiveOffset = 0;
918 Array<size_t> sendPacketOffsets(plan.getNumSends(),0), packetsPerSend(plan.getNumSends(),0);
919 size_t maxNumPackets = 0;
920 size_t curPKToffset = 0;
921 for (
size_t pp=0; pp<plan.getNumSends(); ++pp) {
922 sendPacketOffsets[pp] = curPKToffset;
923 size_t numPackets = 0;
924 for (
size_t j=plan.getStartsTo()[pp]; j<plan.getStartsTo()[pp]+plan.getLengthsTo()[pp]; ++j) {
925 numPackets += numExportPacketsPerLID[j];
927 if (numPackets > maxNumPackets) maxNumPackets = numPackets;
928 packetsPerSend[pp] = numPackets;
929 curPKToffset += numPackets;
932 deep_copy_offset(imports, exports, selfReceiveOffset,
933 sendPacketOffsets[0], packetsPerSend[0]);
935 #endif // HAVE_TPETRA_MPI
937 const int myProcID = plan.getComm()->getRank ();
938 size_t selfReceiveOffset = 0;
940 #ifdef HAVE_TPETRA_DEBUG
942 size_t totalNumImportPackets = 0;
943 for (size_type ii = 0; ii < numImportPacketsPerLID.size (); ++ii) {
944 totalNumImportPackets += numImportPacketsPerLID[ii];
946 TEUCHOS_TEST_FOR_EXCEPTION(
947 imports.extent (0) < totalNumImportPackets, std::runtime_error,
948 "Tpetra::Distributor::doPosts(4 args, Kokkos): The 'imports' array must have "
949 "enough entries to hold the expected number of import packets. "
950 "imports.extent(0) = " << imports.extent (0) <<
" < "
951 "totalNumImportPackets = " << totalNumImportPackets <<
".");
952 TEUCHOS_TEST_FOR_EXCEPTION
953 (requests_.size () != 0, std::logic_error,
"Tpetra::Distributor::"
954 "doPosts(4 args, Kokkos): Process " << myProcID <<
": requests_.size () = "
955 << requests_.size () <<
" != 0.");
956 #endif // HAVE_TPETRA_DEBUG
970 const size_type actualNumReceives = as<size_type> (plan.getNumReceives()) +
971 as<size_type> (plan.hasSelfMessage() ? 1 : 0);
972 requests_.resize (0);
980 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
981 Teuchos::TimeMonitor timeMonRecvs (*timer_doPosts4KV_recvs_);
982 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
984 size_t curBufferOffset = 0;
985 size_t curLIDoffset = 0;
986 for (size_type i = 0; i < actualNumReceives; ++i) {
987 size_t totalPacketsFrom_i = 0;
988 for (
size_t j = 0; j < plan.getLengthsFrom()[i]; ++j) {
989 totalPacketsFrom_i += numImportPacketsPerLID[curLIDoffset+j];
992 TEUCHOS_TEST_FOR_EXCEPTION(totalPacketsFrom_i >
size_t(INT_MAX),
993 std::logic_error,
"Tpetra::Distributor::doPosts(3 args, Kokkos): "
994 "Recv count for receive " << i <<
" (" << totalPacketsFrom_i <<
") is too large "
995 "to be represented as int.");
996 curLIDoffset += plan.getLengthsFrom()[i];
997 if (plan.getProcsFrom()[i] != myProcID && totalPacketsFrom_i) {
1006 imports_view_type recvBuf =
1007 subview_offset (imports, curBufferOffset, totalPacketsFrom_i);
1008 requests_.push_back (ireceive<int> (recvBuf, plan.getProcsFrom()[i],
1009 mpiTag_, *plan.getComm()));
1012 selfReceiveOffset = curBufferOffset;
1014 curBufferOffset += totalPacketsFrom_i;
1018 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
1019 Teuchos::TimeMonitor timeMonSends (*timer_doPosts4KV_sends_);
1020 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
1024 Array<size_t> sendPacketOffsets(plan.getNumSends(),0), packetsPerSend(plan.getNumSends(),0);
1025 size_t maxNumPackets = 0;
1026 size_t curPKToffset = 0;
1027 for (
size_t pp=0; pp<plan.getNumSends(); ++pp) {
1028 sendPacketOffsets[pp] = curPKToffset;
1029 size_t numPackets = 0;
1030 for (
size_t j=plan.getStartsTo()[pp]; j<plan.getStartsTo()[pp]+plan.getLengthsTo()[pp]; ++j) {
1031 numPackets += numExportPacketsPerLID[j];
1033 if (numPackets > maxNumPackets) maxNumPackets = numPackets;
1035 TEUCHOS_TEST_FOR_EXCEPTION(numPackets >
size_t(INT_MAX),
1036 std::logic_error,
"Tpetra::Distributor::doPosts(4 args, Kokkos): "
1037 "packetsPerSend[" << pp <<
"] = " << numPackets <<
" is too large "
1038 "to be represented as int.");
1039 packetsPerSend[pp] = numPackets;
1040 curPKToffset += numPackets;
1045 size_t numBlocks = plan.getNumSends() + plan.hasSelfMessage();
1046 size_t procIndex = 0;
1047 while ((procIndex < numBlocks) && (plan.getProcsTo()[procIndex] < myProcID)) {
1050 if (procIndex == numBlocks) {
1055 size_t selfIndex = 0;
1056 if (plan.getIndicesTo().is_null()) {
1058 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
1059 Teuchos::TimeMonitor timeMonSends2 (*timer_doPosts4KV_sends_fast_);
1060 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
1064 for (
size_t i = 0; i < numBlocks; ++i) {
1065 size_t p = i + procIndex;
1066 if (p > (numBlocks - 1)) {
1070 if (plan.getProcsTo()[p] != myProcID && packetsPerSend[p] > 0) {
1071 exports_view_type tmpSend =
1072 subview_offset(exports, sendPacketOffsets[p], packetsPerSend[p]);
1074 if (sendType == Details::DISTRIBUTOR_ISEND) {
1075 exports_view_type tmpSendBuf =
1076 subview_offset (exports, sendPacketOffsets[p], packetsPerSend[p]);
1077 requests_.push_back (isend<int> (tmpSendBuf, plan.getProcsTo()[p],
1078 mpiTag_, *plan.getComm()));
1082 as<int> (tmpSend.size ()),
1083 plan.getProcsTo()[p], mpiTag_, *plan.getComm());
1091 if (plan.hasSelfMessage()) {
1092 deep_copy_offset(imports, exports, selfReceiveOffset,
1093 sendPacketOffsets[selfNum], packetsPerSend[selfNum]);
1098 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
1099 Teuchos::TimeMonitor timeMonSends2 (*timer_doPosts4KV_sends_slow_);
1100 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
1103 typedef typename ExpView::non_const_value_type Packet;
1104 typedef typename ExpView::array_layout Layout;
1105 typedef typename ExpView::device_type Device;
1106 typedef typename ExpView::memory_traits Mem;
1116 #ifdef HAVE_TPETRA_DEBUG
1117 if (sendType != Details::DISTRIBUTOR_SEND) {
1118 if (plan.getComm()->getRank() == 0)
1119 std::cout <<
"The requested Tpetra send type "
1121 <<
" requires Distributor data to be ordered by"
1122 <<
" the receiving processor rank. Since these"
1123 <<
" data are not ordered, Tpetra will use Send"
1124 <<
" instead." << std::endl;
1128 Kokkos::View<Packet*,Layout,Device,Mem> sendArray (
"sendArray",
1131 Array<size_t> indicesOffsets (numExportPacketsPerLID.size(), 0);
1133 for (
int j=0; j<numExportPacketsPerLID.size(); ++j) {
1134 indicesOffsets[j] = ioffset;
1135 ioffset += numExportPacketsPerLID[j];
1138 for (
size_t i = 0; i < numBlocks; ++i) {
1139 size_t p = i + procIndex;
1140 if (p > (numBlocks - 1)) {
1144 if (plan.getProcsTo()[p] != myProcID) {
1145 size_t sendArrayOffset = 0;
1146 size_t j = plan.getStartsTo()[p];
1147 size_t numPacketsTo_p = 0;
1148 for (
size_t k = 0; k < plan.getLengthsTo()[p]; ++k, ++j) {
1149 numPacketsTo_p += numExportPacketsPerLID[j];
1150 deep_copy_offset(sendArray, exports, sendArrayOffset,
1151 indicesOffsets[j], numExportPacketsPerLID[j]);
1152 sendArrayOffset += numExportPacketsPerLID[j];
1154 typename ExpView::execution_space().fence();
1156 if (numPacketsTo_p > 0) {
1158 subview_offset(sendArray,
size_t(0), numPacketsTo_p);
1161 as<int> (tmpSend.size ()),
1162 plan.getProcsTo()[p], mpiTag_, *plan.getComm());
1167 selfIndex = plan.getStartsTo()[p];
1171 if (plan.hasSelfMessage()) {
1172 for (
size_t k = 0; k < plan.getLengthsTo()[selfNum]; ++k) {
1173 deep_copy_offset(imports, exports, selfReceiveOffset,
1174 indicesOffsets[selfIndex],
1175 numExportPacketsPerLID[selfIndex]);
1176 selfReceiveOffset += numExportPacketsPerLID[selfIndex];
Add specializations of Teuchos::Details::MpiTypeTraits for Kokkos::complex<float> and Kokkos::complex...
std::string DistributorSendTypeEnumToString(EDistributorSendType sendType)
Convert an EDistributorSendType enum value to a string.
std::string DistributorHowInitializedEnumToString(EDistributorHowInitialized how)
Convert an EDistributorHowInitialized enum value to a string.
Stand-alone utility functions and macros.
EDistributorSendType
The type of MPI send that Distributor should use.