12 #ifndef TPETRA_DETAILS_DISTRIBUTOR_ACTOR_HPP
13 #define TPETRA_DETAILS_DISTRIBUTOR_ACTOR_HPP
18 #include "Teuchos_Array.hpp"
19 #include "Teuchos_Comm.hpp"
21 #include "Teuchos_RCP.hpp"
22 #include "Teuchos_Time.hpp"
24 #include "Kokkos_TeuchosCommAdapters.hpp"
26 #ifdef HAVE_TPETRA_MPI
33 template <
class View1,
class View2>
34 constexpr
bool areKokkosViews = Kokkos::is_view<View1>::value && Kokkos::is_view<View2>::value;
36 class DistributorActor {
37 static constexpr
int DEFAULT_MPI_TAG = 1;
41 DistributorActor(
const DistributorActor& otherActor);
43 template <
class ExpView,
class ImpView>
44 void doPostsAndWaits(
const DistributorPlan& plan,
45 const ExpView &exports,
47 const ImpView &imports);
49 template <
class ExpView,
class ImpView>
50 void doPostsAndWaits(
const DistributorPlan& plan,
51 const ExpView &exports,
52 const Teuchos::ArrayView<const size_t>& numExportPacketsPerLID,
53 const ImpView &imports,
54 const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID);
56 template <
class ExpView,
class ImpView>
57 void doPosts(
const DistributorPlan& plan,
58 const ExpView& exports,
60 const ImpView& imports);
62 template <
class ExpView,
class ImpView>
63 void doPosts(
const DistributorPlan& plan,
64 const ExpView &exports,
65 const Teuchos::ArrayView<const size_t>& numExportPacketsPerLID,
66 const ImpView &imports,
67 const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID);
69 void doWaits(
const DistributorPlan& plan);
71 void doWaitsRecv(
const DistributorPlan& plan);
73 void doWaitsSend(
const DistributorPlan& plan);
79 #ifdef HAVE_TPETRA_MPI
80 template <
class ExpView,
class ImpView>
81 void doPostsAllToAll(
const DistributorPlan &plan,
const ExpView &exports,
82 size_t numPackets,
const ImpView &imports);
84 template <
class ExpView,
class ImpView>
86 const DistributorPlan &plan,
const ExpView &exports,
87 const Teuchos::ArrayView<const size_t> &numExportPacketsPerLID,
88 const ImpView &imports,
89 const Teuchos::ArrayView<const size_t> &numImportPacketsPerLID);
91 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
92 template <
class ExpView,
class ImpView>
93 void doPostsNbrAllToAllV(
const DistributorPlan &plan,
const ExpView &exports,
94 size_t numPackets,
const ImpView &imports);
96 template <
class ExpView,
class ImpView>
97 void doPostsNbrAllToAllV(
98 const DistributorPlan &plan,
const ExpView &exports,
99 const Teuchos::ArrayView<const size_t> &numExportPacketsPerLID,
100 const ImpView &imports,
101 const Teuchos::ArrayView<const size_t> &numImportPacketsPerLID);
102 #endif // HAVE_TPETRACORE_MPI_ADVANCE
103 #endif // HAVE_TPETRA_CORE
107 Teuchos::Array<Teuchos::RCP<Teuchos::CommRequest<int>>> requestsRecv_;
108 Teuchos::Array<Teuchos::RCP<Teuchos::CommRequest<int>>> requestsSend_;
110 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
111 Teuchos::RCP<Teuchos::Time> timer_doPosts3KV_;
112 Teuchos::RCP<Teuchos::Time> timer_doPosts4KV_;
113 Teuchos::RCP<Teuchos::Time> timer_doWaitsRecv_;
114 Teuchos::RCP<Teuchos::Time> timer_doWaitsSend_;
115 Teuchos::RCP<Teuchos::Time> timer_doPosts3KV_recvs_;
116 Teuchos::RCP<Teuchos::Time> timer_doPosts4KV_recvs_;
117 Teuchos::RCP<Teuchos::Time> timer_doPosts3KV_barrier_;
118 Teuchos::RCP<Teuchos::Time> timer_doPosts4KV_barrier_;
119 Teuchos::RCP<Teuchos::Time> timer_doPosts3KV_sends_;
120 Teuchos::RCP<Teuchos::Time> timer_doPosts4KV_sends_;
121 Teuchos::RCP<Teuchos::Time> timer_doPosts3KV_sends_slow_;
122 Teuchos::RCP<Teuchos::Time> timer_doPosts4KV_sends_slow_;
123 Teuchos::RCP<Teuchos::Time> timer_doPosts3KV_sends_fast_;
124 Teuchos::RCP<Teuchos::Time> timer_doPosts4KV_sends_fast_;
128 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
131 template <
class ExpView,
class ImpView>
132 void DistributorActor::doPostsAndWaits(
const DistributorPlan& plan,
133 const ExpView& exports,
135 const ImpView& imports)
137 static_assert(areKokkosViews<ExpView, ImpView>,
138 "Data arrays for DistributorActor::doPostsAndWaits must be Kokkos::Views");
139 doPosts(plan, exports, numPackets, imports);
143 template <
class ExpView,
class ImpView>
144 void DistributorActor::doPostsAndWaits(
const DistributorPlan& plan,
145 const ExpView& exports,
146 const Teuchos::ArrayView<const size_t>& numExportPacketsPerLID,
147 const ImpView& imports,
148 const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID)
150 static_assert(areKokkosViews<ExpView, ImpView>,
151 "Data arrays for DistributorActor::doPostsAndWaits must be Kokkos::Views");
152 doPosts(plan, exports, numExportPacketsPerLID, imports, numImportPacketsPerLID);
156 template <
typename ViewType>
157 using HostAccessibility = Kokkos::SpaceAccessibility<Kokkos::DefaultHostExecutionSpace, typename ViewType::memory_space>;
159 template <
typename DstViewType,
typename SrcViewType>
160 using enableIfHostAccessible = std::enable_if_t<HostAccessibility<DstViewType>::accessible &&
161 HostAccessibility<SrcViewType>::accessible>;
163 template <
typename DstViewType,
typename SrcViewType>
164 using enableIfNotHostAccessible = std::enable_if_t<!HostAccessibility<DstViewType>::accessible ||
165 !HostAccessibility<SrcViewType>::accessible>;
167 template <
typename DstViewType,
typename SrcViewType>
168 enableIfHostAccessible<DstViewType, SrcViewType>
169 packOffset(
const DstViewType& dst,
170 const SrcViewType& src,
171 const size_t dst_offset,
172 const size_t src_offset,
175 memcpy((
void*) (dst.data()+dst_offset), src.data()+src_offset, size*
sizeof(
typename DstViewType::value_type));
178 template <
typename DstViewType,
typename SrcViewType>
179 enableIfNotHostAccessible<DstViewType, SrcViewType>
180 packOffset(
const DstViewType& dst,
181 const SrcViewType& src,
182 const size_t dst_offset,
183 const size_t src_offset,
186 Kokkos::Compat::deep_copy_offset(dst, src, dst_offset, src_offset, size);
190 #ifdef HAVE_TPETRA_MPI
191 template <
class ExpView,
class ImpView>
192 void DistributorActor::doPostsAllToAll(
const DistributorPlan &plan,
193 const ExpView &exports,
195 const ImpView &imports) {
196 using size_type = Teuchos::Array<size_t>::size_type;
198 TEUCHOS_TEST_FOR_EXCEPTION(
199 !plan.getIndicesTo().is_null(), std::runtime_error,
200 "Send Type=\"Alltoall\" only works for fast-path communication.");
202 auto comm = plan.getComm();
203 const int myRank = comm->getRank();
204 std::vector<int> sendcounts(comm->getSize(), 0);
205 std::vector<int> sdispls(comm->getSize(), 0);
206 std::vector<int> recvcounts(comm->getSize(), 0);
207 std::vector<int> rdispls(comm->getSize(), 0);
209 size_t numBlocks = plan.getNumSends() + plan.hasSelfMessage();
210 for (
size_t p = 0; p < numBlocks; ++p) {
211 sdispls[plan.getProcsTo()[p]] = plan.getStartsTo()[p] * numPackets;
212 size_t sendcount = plan.getLengthsTo()[p] * numPackets;
214 TEUCHOS_TEST_FOR_EXCEPTION(sendcount >
size_t(INT_MAX), std::logic_error,
215 "Tpetra::Distributor::doPosts(3 args, Kokkos): "
216 "Send count for block "
217 << p <<
" (" << sendcount
219 "to be represented as int.");
220 sendcounts[plan.getProcsTo()[p]] =
static_cast<int>(sendcount);
223 const size_type actualNumReceives =
224 Teuchos::as<size_type>(plan.getNumReceives()) +
225 Teuchos::as<size_type>(plan.hasSelfMessage() ? 1 : 0);
226 size_t curBufferOffset = 0;
227 for (size_type i = 0; i < actualNumReceives; ++i) {
228 const size_t curBufLen = plan.getLengthsFrom()[i] * numPackets;
229 TEUCHOS_TEST_FOR_EXCEPTION(
230 curBufferOffset + curBufLen > static_cast<size_t>(imports.size()),
232 "Tpetra::Distributor::doPosts(3 args, Kokkos): "
233 "Exceeded size of 'imports' array in packing loop on Process "
234 << myRank <<
". imports.size() = " << imports.size()
237 << curBufferOffset <<
") + curBufLen(" << curBufLen <<
").");
238 rdispls[plan.getProcsFrom()[i]] = curBufferOffset;
240 TEUCHOS_TEST_FOR_EXCEPTION(curBufLen >
size_t(INT_MAX), std::logic_error,
241 "Tpetra::Distributor::doPosts(3 args, Kokkos): "
242 "Recv count for receive "
243 << i <<
" (" << curBufLen
245 "to be represented as int.");
246 recvcounts[plan.getProcsFrom()[i]] =
static_cast<int>(curBufLen);
247 curBufferOffset += curBufLen;
250 using T =
typename ExpView::non_const_value_type;
251 MPI_Datatype rawType = ::Tpetra::Details::MpiTypeTraits<T>::getType(T());
253 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
254 if (Details::DISTRIBUTOR_MPIADVANCE_ALLTOALL == plan.getSendType()) {
255 MPIX_Comm *mpixComm = *plan.getMPIXComm();
256 TEUCHOS_TEST_FOR_EXCEPTION(
257 !mpixComm, std::runtime_error,
258 "plan's MPIX_Comm null in doPostsAllToAll, but "
259 "DISTRIBUTOR_MPIADVANCE_ALLTOALL set: plan.howInitialized()="
262 const int err = MPIX_Alltoallv(
263 exports.data(), sendcounts.data(), sdispls.data(), rawType,
264 imports.data(), recvcounts.data(), rdispls.data(), rawType, mpixComm);
266 TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error,
267 "MPIX_Alltoallv failed with error \""
268 << Teuchos::mpiErrorCodeToString(err)
274 Teuchos::RCP<const Teuchos::MpiComm<int>> mpiComm =
275 Teuchos::rcp_dynamic_cast<
const Teuchos::MpiComm<int>>(comm);
276 Teuchos::RCP<const Teuchos::OpaqueWrapper<MPI_Comm>> rawComm =
277 mpiComm->getRawMpiComm();
279 const int err = MPI_Alltoallv(
280 exports.data(), sendcounts.data(), sdispls.data(), rawType,
281 imports.data(), recvcounts.data(), rdispls.data(), rawType, (*rawComm)());
283 TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error,
284 "MPI_Alltoallv failed with error \""
285 << Teuchos::mpiErrorCodeToString(err)
291 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
292 template <
class ExpView,
class ImpView>
293 void DistributorActor::doPostsNbrAllToAllV(
const DistributorPlan &plan,
294 const ExpView &exports,
296 const ImpView &imports) {
297 TEUCHOS_TEST_FOR_EXCEPTION(
298 !plan.getIndicesTo().is_null(), std::runtime_error,
299 "Send Type=\"Alltoall\" only works for fast-path communication.");
301 const int myRank = plan.getComm()->getRank();
302 MPIX_Comm *mpixComm = *plan.getMPIXComm();
304 const size_t numSends = plan.getNumSends() + plan.hasSelfMessage();
305 const size_t numRecvs = plan.getNumReceives() + plan.hasSelfMessage();
306 std::vector<int> sendcounts(numSends, 0);
307 std::vector<int> sdispls(numSends, 0);
308 std::vector<int> recvcounts(numRecvs, 0);
309 std::vector<int> rdispls(numRecvs, 0);
311 for (
size_t p = 0; p < numSends; ++p) {
312 sdispls[p] = plan.getStartsTo()[p] * numPackets;
313 const size_t sendcount = plan.getLengthsTo()[p] * numPackets;
315 TEUCHOS_TEST_FOR_EXCEPTION(sendcount >
size_t(INT_MAX), std::logic_error,
316 "Tpetra::Distributor::doPosts(3 args, Kokkos): "
317 "Send count for block "
318 << p <<
" (" << sendcount
320 "to be represented as int.");
321 sendcounts[p] =
static_cast<int>(sendcount);
324 size_t curBufferOffset = 0;
325 for (
size_t i = 0; i < numRecvs; ++i) {
326 const size_t curBufLen = plan.getLengthsFrom()[i] * numPackets;
327 TEUCHOS_TEST_FOR_EXCEPTION(
328 curBufferOffset + curBufLen > static_cast<size_t>(imports.size()),
330 "Tpetra::Distributor::doPosts(3 args, Kokkos): "
331 "Exceeded size of 'imports' array in packing loop on Process "
332 << myRank <<
". imports.size() = " << imports.size()
335 << curBufferOffset <<
") + curBufLen(" << curBufLen <<
").");
336 rdispls[i] = curBufferOffset;
338 TEUCHOS_TEST_FOR_EXCEPTION(curBufLen >
size_t(INT_MAX), std::logic_error,
339 "Tpetra::Distributor::doPosts(3 args, Kokkos): "
340 "Recv count for receive "
341 << i <<
" (" << curBufLen
343 "to be represented as int.");
344 recvcounts[i] =
static_cast<int>(curBufLen);
345 curBufferOffset += curBufLen;
348 using T =
typename ExpView::non_const_value_type;
349 MPI_Datatype rawType = ::Tpetra::Details::MpiTypeTraits<T>::getType(T());
351 const int err = MPIX_Neighbor_alltoallv(
352 exports.data(), sendcounts.data(), sdispls.data(), rawType,
353 imports.data(), recvcounts.data(), rdispls.data(), rawType, mpixComm);
355 TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error,
356 "MPIX_Neighbor_alltoallv failed with error \""
357 << Teuchos::mpiErrorCodeToString(err)
360 #endif // HAVE_TPETRACORE_MPI_ADVANCE
361 #endif // HAVE_TPETRA_MPI
364 template <
class ExpView,
class ImpView>
365 void DistributorActor::doPosts(
const DistributorPlan& plan,
366 const ExpView& exports,
368 const ImpView& imports)
370 static_assert(areKokkosViews<ExpView, ImpView>,
371 "Data arrays for DistributorActor::doPosts must be Kokkos::Views");
372 using Teuchos::Array;
374 using Teuchos::FancyOStream;
375 using Teuchos::includesVerbLevel;
376 using Teuchos::ireceive;
377 using Teuchos::isend;
379 using Teuchos::TypeNameTraits;
380 using Teuchos::typeName;
382 using Kokkos::Compat::create_const_view;
383 using Kokkos::Compat::create_view;
384 using Kokkos::Compat::subview_offset;
385 using Kokkos::Compat::deep_copy_offset;
386 typedef Array<size_t>::size_type size_type;
387 typedef ExpView exports_view_type;
388 typedef ImpView imports_view_type;
390 #ifdef KOKKOS_ENABLE_CUDA
392 (! std::is_same<typename ExpView::memory_space, Kokkos::CudaUVMSpace>::value &&
393 ! std::is_same<typename ImpView::memory_space, Kokkos::CudaUVMSpace>::value,
394 "Please do not use Tpetra::Distributor with UVM allocations. "
395 "See Trilinos GitHub issue #1088.");
396 #endif // KOKKOS_ENABLE_CUDA
398 #ifdef KOKKOS_ENABLE_SYCL
400 (! std::is_same<typename ExpView::memory_space, Kokkos::Experimental::SYCLSharedUSMSpace>::value &&
401 ! std::is_same<typename ImpView::memory_space, Kokkos::Experimental::SYCLSharedUSMSpace>::value,
402 "Please do not use Tpetra::Distributor with SharedUSM allocations. "
403 "See Trilinos GitHub issue #1088 (corresponding to CUDA).");
404 #endif // KOKKOS_ENABLE_SYCL
406 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
407 Teuchos::TimeMonitor timeMon (*timer_doPosts3KV_);
408 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
410 const int myRank = plan.getComm()->getRank ();
416 #if defined(HAVE_TPETRA_MPI)
420 if (sendType == Details::DISTRIBUTOR_ALLTOALL) {
421 doPostsAllToAll(plan, exports,numPackets, imports);
424 #ifdef HAVE_TPETRACORE_MPI_ADVANCE
425 else if (sendType == Details::DISTRIBUTOR_MPIADVANCE_ALLTOALL) {
426 doPostsAllToAll(plan, exports,numPackets, imports);
428 }
else if (sendType == Details::DISTRIBUTOR_MPIADVANCE_NBRALLTOALLV) {
429 doPostsNbrAllToAllV(plan, exports,numPackets, imports);
432 #endif // defined(HAVE_TPETRACORE_MPI_ADVANCE)
435 #else // HAVE_TPETRA_MPI
436 if (plan.hasSelfMessage()) {
444 size_t selfReceiveOffset = 0;
445 deep_copy_offset(imports, exports, selfReceiveOffset,
446 plan.getStartsTo()[0]*numPackets,
447 plan.getLengthsTo()[0]*numPackets);
451 #endif // HAVE_TPETRA_MPI
453 size_t selfReceiveOffset = 0;
455 #ifdef HAVE_TPETRA_DEBUG
456 TEUCHOS_TEST_FOR_EXCEPTION
457 (requestsRecv_.size () != 0,
459 "Tpetra::Distributor::doPosts(3 args, Kokkos): Process "
460 << myRank <<
": requestsRecv_.size() = " << requestsRecv_.size () <<
" != 0.");
461 TEUCHOS_TEST_FOR_EXCEPTION
462 (requestsSend_.size () != 0,
464 "Tpetra::Distributor::doPosts(3 args, Kokkos): Process "
465 << myRank <<
": requestsSend_.size() = " << requestsSend_.size () <<
" != 0.");
466 #endif // HAVE_TPETRA_DEBUG
481 const size_type actualNumReceives = as<size_type> (plan.getNumReceives()) +
482 as<size_type> (plan.hasSelfMessage() ? 1 : 0);
483 requestsRecv_.resize (0);
484 requestsSend_.resize (0);
492 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
493 Teuchos::TimeMonitor timeMonRecvs (*timer_doPosts3KV_recvs_);
494 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
496 size_t curBufferOffset = 0;
497 for (size_type i = 0; i < actualNumReceives; ++i) {
498 const size_t curBufLen = plan.getLengthsFrom()[i] * numPackets;
499 if (plan.getProcsFrom()[i] != myRank) {
507 TEUCHOS_TEST_FOR_EXCEPTION(
508 curBufferOffset + curBufLen > static_cast<size_t> (imports.size ()),
509 std::logic_error,
"Tpetra::Distributor::doPosts(3 args, Kokkos): "
510 "Exceeded size of 'imports' array in packing loop on Process " <<
511 myRank <<
". imports.size() = " << imports.size () <<
" < "
512 "curBufferOffset(" << curBufferOffset <<
") + curBufLen(" <<
514 imports_view_type recvBuf =
515 subview_offset (imports, curBufferOffset, curBufLen);
516 requestsRecv_.push_back (ireceive<int> (recvBuf, plan.getProcsFrom()[i],
517 mpiTag_, *plan.getComm()));
520 selfReceiveOffset = curBufferOffset;
522 curBufferOffset += curBufLen;
526 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
527 Teuchos::TimeMonitor timeMonSends (*timer_doPosts3KV_sends_);
528 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
535 size_t numBlocks = plan.getNumSends() + plan.hasSelfMessage();
536 size_t procIndex = 0;
537 while ((procIndex < numBlocks) && (plan.getProcsTo()[procIndex] < myRank)) {
540 if (procIndex == numBlocks) {
545 size_t selfIndex = 0;
547 if (plan.getIndicesTo().is_null()) {
549 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
550 Teuchos::TimeMonitor timeMonSends2 (*timer_doPosts3KV_sends_fast_);
551 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
555 for (
size_t i = 0; i < numBlocks; ++i) {
556 size_t p = i + procIndex;
557 if (p > (numBlocks - 1)) {
561 if (plan.getProcsTo()[p] != myRank) {
562 exports_view_type tmpSend = subview_offset(
563 exports, plan.getStartsTo()[p]*numPackets, plan.getLengthsTo()[p]*numPackets);
565 if (sendType == Details::DISTRIBUTOR_ISEND) {
569 exports_view_type tmpSendBuf =
570 subview_offset (exports, plan.getStartsTo()[p] * numPackets,
571 plan.getLengthsTo()[p] * numPackets);
572 requestsSend_.push_back (isend<int> (tmpSendBuf, plan.getProcsTo()[p],
573 mpiTag_, *plan.getComm()));
577 as<int> (tmpSend.size ()),
578 plan.getProcsTo()[p], mpiTag_, *plan.getComm());
586 if (plan.hasSelfMessage()) {
594 deep_copy_offset(imports, exports, selfReceiveOffset,
595 plan.getStartsTo()[selfNum]*numPackets,
596 plan.getLengthsTo()[selfNum]*numPackets);
602 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
603 Teuchos::TimeMonitor timeMonSends2 (*timer_doPosts3KV_sends_slow_);
604 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
606 typedef typename ExpView::non_const_value_type Packet;
607 typedef typename ExpView::array_layout Layout;
608 typedef typename ExpView::device_type Device;
609 typedef typename ExpView::memory_traits Mem;
619 #ifdef HAVE_TPETRA_DEBUG
620 if (sendType != Details::DISTRIBUTOR_SEND) {
621 if (plan.getComm()->getRank() == 0)
622 std::cout <<
"The requested Tpetra send type "
624 <<
" requires Distributor data to be ordered by"
625 <<
" the receiving processor rank. Since these"
626 <<
" data are not ordered, Tpetra will use Send"
627 <<
" instead." << std::endl;
631 Kokkos::View<Packet*,Layout,Device,Mem> sendArray (
"sendArray",
632 plan.getMaxSendLength() * numPackets);
634 for (
size_t i = 0; i < numBlocks; ++i) {
635 size_t p = i + procIndex;
636 if (p > (numBlocks - 1)) {
640 if (plan.getProcsTo()[p] != myRank) {
641 size_t sendArrayOffset = 0;
642 size_t j = plan.getStartsTo()[p];
643 for (
size_t k = 0; k < plan.getLengthsTo()[p]; ++k, ++j) {
644 packOffset(sendArray, exports, sendArrayOffset, plan.getIndicesTo()[j]*numPackets, numPackets);
645 sendArrayOffset += numPackets;
647 typename ExpView::execution_space().fence();
650 subview_offset(sendArray,
size_t(0), plan.getLengthsTo()[p]*numPackets);
653 as<int> (tmpSend.size ()),
654 plan.getProcsTo()[p], mpiTag_, *plan.getComm());
658 selfIndex = plan.getStartsTo()[p];
662 if (plan.hasSelfMessage()) {
663 for (
size_t k = 0; k < plan.getLengthsTo()[selfNum]; ++k) {
664 packOffset(imports, exports, selfReceiveOffset, plan.getIndicesTo()[selfIndex]*numPackets, numPackets);
666 selfReceiveOffset += numPackets;
673 #ifdef HAVE_TPETRA_MPI
674 template <
class ExpView,
class ImpView>
675 void DistributorActor::doPostsAllToAll(
676 const DistributorPlan &plan,
const ExpView &exports,
677 const Teuchos::ArrayView<const size_t> &numExportPacketsPerLID,
678 const ImpView &imports,
679 const Teuchos::ArrayView<const size_t> &numImportPacketsPerLID) {
680 TEUCHOS_TEST_FOR_EXCEPTION(
681 !plan.getIndicesTo().is_null(), std::runtime_error,
682 "Send Type=\"Alltoall\" only works for fast-path communication.");
684 using size_type = Teuchos::Array<size_t>::size_type;
686 auto comm = plan.getComm();
687 std::vector<int> sendcounts(comm->getSize(), 0);
688 std::vector<int> sdispls(comm->getSize(), 0);
689 std::vector<int> recvcounts(comm->getSize(), 0);
690 std::vector<int> rdispls(comm->getSize(), 0);
692 size_t curPKToffset = 0;
693 for (
size_t pp = 0; pp < plan.getNumSends(); ++pp) {
694 sdispls[plan.getProcsTo()[pp]] = curPKToffset;
695 size_t numPackets = 0;
696 for (
size_t j = plan.getStartsTo()[pp];
697 j < plan.getStartsTo()[pp] + plan.getLengthsTo()[pp]; ++j) {
698 numPackets += numExportPacketsPerLID[j];
701 TEUCHOS_TEST_FOR_EXCEPTION(numPackets >
size_t(INT_MAX), std::logic_error,
702 "Tpetra::Distributor::doPosts(4 args, Kokkos): "
703 "Send count for send "
704 << pp <<
" (" << numPackets
706 "to be represented as int.");
707 sendcounts[plan.getProcsTo()[pp]] =
static_cast<int>(numPackets);
708 curPKToffset += numPackets;
711 const size_type actualNumReceives =
712 Teuchos::as<size_type>(plan.getNumReceives()) +
713 Teuchos::as<size_type>(plan.hasSelfMessage() ? 1 : 0);
715 size_t curBufferOffset = 0;
716 size_t curLIDoffset = 0;
717 for (size_type i = 0; i < actualNumReceives; ++i) {
718 size_t totalPacketsFrom_i = 0;
719 for (
size_t j = 0; j < plan.getLengthsFrom()[i]; ++j) {
720 totalPacketsFrom_i += numImportPacketsPerLID[curLIDoffset + j];
722 curLIDoffset += plan.getLengthsFrom()[i];
724 rdispls[plan.getProcsFrom()[i]] = curBufferOffset;
727 TEUCHOS_TEST_FOR_EXCEPTION(totalPacketsFrom_i >
size_t(INT_MAX),
729 "Tpetra::Distributor::doPosts(3 args, Kokkos): "
730 "Recv count for receive "
731 << i <<
" (" << totalPacketsFrom_i
733 "to be represented as int.");
734 recvcounts[plan.getProcsFrom()[i]] =
static_cast<int>(totalPacketsFrom_i);
735 curBufferOffset += totalPacketsFrom_i;
738 Teuchos::RCP<const Teuchos::MpiComm<int>> mpiComm =
739 Teuchos::rcp_dynamic_cast<
const Teuchos::MpiComm<int>>(comm);
740 Teuchos::RCP<const Teuchos::OpaqueWrapper<MPI_Comm>> rawComm =
741 mpiComm->getRawMpiComm();
742 using T =
typename ExpView::non_const_value_type;
743 MPI_Datatype rawType = ::Tpetra::Details::MpiTypeTraits<T>::getType(T());
745 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
746 if (Details::DISTRIBUTOR_MPIADVANCE_ALLTOALL == plan.getSendType()) {
747 MPIX_Comm *mpixComm = *plan.getMPIXComm();
748 TEUCHOS_TEST_FOR_EXCEPTION(!mpixComm, std::runtime_error,
749 "MPIX_Comm is null in doPostsAllToAll \""
750 << __FILE__ <<
":" << __LINE__);
752 const int err = MPIX_Alltoallv(
753 exports.data(), sendcounts.data(), sdispls.data(), rawType,
754 imports.data(), recvcounts.data(), rdispls.data(), rawType, mpixComm);
756 TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error,
757 "MPIX_Alltoallv failed with error \""
758 << Teuchos::mpiErrorCodeToString(err)
763 #endif // HAVE_TPETRACORE_MPI_ADVANCE
765 const int err = MPI_Alltoallv(
766 exports.data(), sendcounts.data(), sdispls.data(), rawType,
767 imports.data(), recvcounts.data(), rdispls.data(), rawType, (*rawComm)());
769 TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error,
770 "MPI_Alltoallv failed with error \""
771 << Teuchos::mpiErrorCodeToString(err)
775 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
776 template <
class ExpView,
class ImpView>
777 void DistributorActor::doPostsNbrAllToAllV(
778 const DistributorPlan &plan,
const ExpView &exports,
779 const Teuchos::ArrayView<const size_t> &numExportPacketsPerLID,
780 const ImpView &imports,
781 const Teuchos::ArrayView<const size_t> &numImportPacketsPerLID) {
782 TEUCHOS_TEST_FOR_EXCEPTION(
783 !plan.getIndicesTo().is_null(), std::runtime_error,
784 "Send Type=\"Alltoall\" only works for fast-path communication.");
786 const Teuchos_Ordinal numSends = plan.getProcsTo().size();
787 const Teuchos_Ordinal numRecvs = plan.getProcsFrom().size();
789 auto comm = plan.getComm();
790 std::vector<int> sendcounts(numSends, 0);
791 std::vector<int> sdispls(numSends, 0);
792 std::vector<int> recvcounts(numRecvs, 0);
793 std::vector<int> rdispls(numRecvs, 0);
795 Teuchos::RCP<const Teuchos::MpiComm<int>> mpiComm =
796 Teuchos::rcp_dynamic_cast<
const Teuchos::MpiComm<int>>(comm);
797 Teuchos::RCP<const Teuchos::OpaqueWrapper<MPI_Comm>> rawComm =
798 mpiComm->getRawMpiComm();
799 using T =
typename ExpView::non_const_value_type;
800 MPI_Datatype rawType = ::Tpetra::Details::MpiTypeTraits<T>::getType(T());
804 size_t curPKToffset = 0;
805 for (Teuchos_Ordinal pp = 0; pp < numSends; ++pp) {
806 sdispls[pp] = curPKToffset;
807 size_t numPackets = 0;
808 for (
size_t j = plan.getStartsTo()[pp];
809 j < plan.getStartsTo()[pp] + plan.getLengthsTo()[pp]; ++j) {
810 numPackets += numExportPacketsPerLID[j];
813 TEUCHOS_TEST_FOR_EXCEPTION(numPackets >
size_t(INT_MAX), std::logic_error,
814 "Tpetra::Distributor::doPosts(4 args, Kokkos): "
815 "Send count for send "
816 << pp <<
" (" << numPackets
818 "to be represented as int.");
819 sendcounts[pp] =
static_cast<int>(numPackets);
820 curPKToffset += numPackets;
822 size_t curBufferOffset = 0;
823 size_t curLIDoffset = 0;
824 for (Teuchos_Ordinal i = 0; i < numRecvs; ++i) {
825 size_t totalPacketsFrom_i = 0;
826 for (
size_t j = 0; j < plan.getLengthsFrom()[i]; ++j) {
827 totalPacketsFrom_i += numImportPacketsPerLID[curLIDoffset + j];
829 curLIDoffset += plan.getLengthsFrom()[i];
831 rdispls[i] = curBufferOffset;
834 TEUCHOS_TEST_FOR_EXCEPTION(totalPacketsFrom_i >
size_t(INT_MAX),
836 "Tpetra::Distributor::doPosts(3 args, Kokkos): "
837 "Recv count for receive "
838 << i <<
" (" << totalPacketsFrom_i
840 "to be represented as int.");
841 recvcounts[i] =
static_cast<int>(totalPacketsFrom_i);
842 curBufferOffset += totalPacketsFrom_i;
845 MPIX_Comm *mpixComm = *plan.getMPIXComm();
846 const int err = MPIX_Neighbor_alltoallv(
847 exports.data(), sendcounts.data(), sdispls.data(), rawType,
848 imports.data(), recvcounts.data(), rdispls.data(), rawType, mpixComm);
850 TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error,
851 "MPIX_Neighbor_alltoallv failed with error \""
852 << Teuchos::mpiErrorCodeToString(err)
855 #endif // HAVE_TPETRACORE_MPI_ADVANCE
856 #endif // HAVE_TPETRA_MPI
859 template <
class ExpView,
class ImpView>
860 void DistributorActor::doPosts(
const DistributorPlan& plan,
861 const ExpView &exports,
862 const Teuchos::ArrayView<const size_t>& numExportPacketsPerLID,
863 const ImpView &imports,
864 const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID)
866 static_assert(areKokkosViews<ExpView, ImpView>,
867 "Data arrays for DistributorActor::doPosts must be Kokkos::Views");
868 using Teuchos::Array;
870 using Teuchos::ireceive;
871 using Teuchos::isend;
873 using Teuchos::TypeNameTraits;
875 using Kokkos::Compat::create_const_view;
876 using Kokkos::Compat::create_view;
877 using Kokkos::Compat::subview_offset;
878 using Kokkos::Compat::deep_copy_offset;
879 typedef Array<size_t>::size_type size_type;
880 typedef ExpView exports_view_type;
881 typedef ImpView imports_view_type;
883 #ifdef KOKKOS_ENABLE_CUDA
884 static_assert (! std::is_same<typename ExpView::memory_space, Kokkos::CudaUVMSpace>::value &&
885 ! std::is_same<typename ImpView::memory_space, Kokkos::CudaUVMSpace>::value,
886 "Please do not use Tpetra::Distributor with UVM "
887 "allocations. See GitHub issue #1088.");
888 #endif // KOKKOS_ENABLE_CUDA
890 #ifdef KOKKOS_ENABLE_SYCL
891 static_assert (! std::is_same<typename ExpView::memory_space, Kokkos::Experimental::SYCLSharedUSMSpace>::value &&
892 ! std::is_same<typename ImpView::memory_space, Kokkos::Experimental::SYCLSharedUSMSpace>::value,
893 "Please do not use Tpetra::Distributor with SharedUSM "
894 "allocations. See GitHub issue #1088 (corresponding to CUDA).");
895 #endif // KOKKOS_ENABLE_SYCL
897 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
898 Teuchos::TimeMonitor timeMon (*timer_doPosts4KV_);
899 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
905 #ifdef HAVE_TPETRA_MPI
908 if (sendType == Details::DISTRIBUTOR_ALLTOALL) {
909 doPostsAllToAll(plan, exports, numExportPacketsPerLID, imports, numImportPacketsPerLID);
912 #ifdef HAVE_TPETRACORE_MPI_ADVANCE
913 else if (sendType == Details::DISTRIBUTOR_MPIADVANCE_ALLTOALL)
915 doPostsAllToAll(plan, exports, numExportPacketsPerLID, imports, numImportPacketsPerLID);
917 }
else if (sendType == Details::DISTRIBUTOR_MPIADVANCE_NBRALLTOALLV) {
918 doPostsNbrAllToAllV(plan, exports, numExportPacketsPerLID, imports, numImportPacketsPerLID);
923 #else // HAVE_TPETRA_MPI
924 if (plan.hasSelfMessage()) {
926 size_t selfReceiveOffset = 0;
930 Array<size_t> sendPacketOffsets(plan.getNumSends(),0), packetsPerSend(plan.getNumSends(),0);
931 size_t maxNumPackets = 0;
932 size_t curPKToffset = 0;
933 for (
size_t pp=0; pp<plan.getNumSends(); ++pp) {
934 sendPacketOffsets[pp] = curPKToffset;
935 size_t numPackets = 0;
936 for (
size_t j=plan.getStartsTo()[pp]; j<plan.getStartsTo()[pp]+plan.getLengthsTo()[pp]; ++j) {
937 numPackets += numExportPacketsPerLID[j];
939 if (numPackets > maxNumPackets) maxNumPackets = numPackets;
940 packetsPerSend[pp] = numPackets;
941 curPKToffset += numPackets;
944 deep_copy_offset(imports, exports, selfReceiveOffset,
945 sendPacketOffsets[0], packetsPerSend[0]);
947 #endif // HAVE_TPETRA_MPI
949 const int myProcID = plan.getComm()->getRank ();
950 size_t selfReceiveOffset = 0;
952 #ifdef HAVE_TPETRA_DEBUG
954 size_t totalNumImportPackets = 0;
955 for (size_type ii = 0; ii < numImportPacketsPerLID.size (); ++ii) {
956 totalNumImportPackets += numImportPacketsPerLID[ii];
958 TEUCHOS_TEST_FOR_EXCEPTION(
959 imports.extent (0) < totalNumImportPackets, std::runtime_error,
960 "Tpetra::Distributor::doPosts(4 args, Kokkos): The 'imports' array must have "
961 "enough entries to hold the expected number of import packets. "
962 "imports.extent(0) = " << imports.extent (0) <<
" < "
963 "totalNumImportPackets = " << totalNumImportPackets <<
".");
964 TEUCHOS_TEST_FOR_EXCEPTION
965 (requestsRecv_.size () != 0, std::logic_error,
"Tpetra::Distributor::"
966 "doPosts(4 args, Kokkos): Process " << myProcID <<
": requestsRecv_.size () = "
967 << requestsRecv_.size () <<
" != 0.");
968 TEUCHOS_TEST_FOR_EXCEPTION
969 (requestsSend_.size () != 0, std::logic_error,
"Tpetra::Distributor::"
970 "doPosts(4 args, Kokkos): Process " << myProcID <<
": requestsSend_.size () = "
971 << requestsSend_.size () <<
" != 0.");
972 #endif // HAVE_TPETRA_DEBUG
986 const size_type actualNumReceives = as<size_type> (plan.getNumReceives()) +
987 as<size_type> (plan.hasSelfMessage() ? 1 : 0);
988 requestsRecv_.resize (0);
989 requestsSend_.resize (0);
997 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
998 Teuchos::TimeMonitor timeMonRecvs (*timer_doPosts4KV_recvs_);
999 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
1001 size_t curBufferOffset = 0;
1002 size_t curLIDoffset = 0;
1003 for (size_type i = 0; i < actualNumReceives; ++i) {
1004 size_t totalPacketsFrom_i = 0;
1005 for (
size_t j = 0; j < plan.getLengthsFrom()[i]; ++j) {
1006 totalPacketsFrom_i += numImportPacketsPerLID[curLIDoffset+j];
1009 TEUCHOS_TEST_FOR_EXCEPTION(totalPacketsFrom_i >
size_t(INT_MAX),
1010 std::logic_error,
"Tpetra::Distributor::doPosts(3 args, Kokkos): "
1011 "Recv count for receive " << i <<
" (" << totalPacketsFrom_i <<
") is too large "
1012 "to be represented as int.");
1013 curLIDoffset += plan.getLengthsFrom()[i];
1014 if (plan.getProcsFrom()[i] != myProcID && totalPacketsFrom_i) {
1023 imports_view_type recvBuf =
1024 subview_offset (imports, curBufferOffset, totalPacketsFrom_i);
1025 requestsRecv_.push_back (ireceive<int> (recvBuf, plan.getProcsFrom()[i],
1026 mpiTag_, *plan.getComm()));
1029 selfReceiveOffset = curBufferOffset;
1031 curBufferOffset += totalPacketsFrom_i;
1035 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
1036 Teuchos::TimeMonitor timeMonSends (*timer_doPosts4KV_sends_);
1037 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
1041 Array<size_t> sendPacketOffsets(plan.getNumSends(),0), packetsPerSend(plan.getNumSends(),0);
1042 size_t maxNumPackets = 0;
1043 size_t curPKToffset = 0;
1044 for (
size_t pp=0; pp<plan.getNumSends(); ++pp) {
1045 sendPacketOffsets[pp] = curPKToffset;
1046 size_t numPackets = 0;
1047 for (
size_t j=plan.getStartsTo()[pp]; j<plan.getStartsTo()[pp]+plan.getLengthsTo()[pp]; ++j) {
1048 numPackets += numExportPacketsPerLID[j];
1050 if (numPackets > maxNumPackets) maxNumPackets = numPackets;
1052 TEUCHOS_TEST_FOR_EXCEPTION(numPackets >
size_t(INT_MAX),
1053 std::logic_error,
"Tpetra::Distributor::doPosts(4 args, Kokkos): "
1054 "packetsPerSend[" << pp <<
"] = " << numPackets <<
" is too large "
1055 "to be represented as int.");
1056 packetsPerSend[pp] = numPackets;
1057 curPKToffset += numPackets;
1062 size_t numBlocks = plan.getNumSends() + plan.hasSelfMessage();
1063 size_t procIndex = 0;
1064 while ((procIndex < numBlocks) && (plan.getProcsTo()[procIndex] < myProcID)) {
1067 if (procIndex == numBlocks) {
1072 size_t selfIndex = 0;
1073 if (plan.getIndicesTo().is_null()) {
1075 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
1076 Teuchos::TimeMonitor timeMonSends2 (*timer_doPosts4KV_sends_fast_);
1077 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
1081 for (
size_t i = 0; i < numBlocks; ++i) {
1082 size_t p = i + procIndex;
1083 if (p > (numBlocks - 1)) {
1087 if (plan.getProcsTo()[p] != myProcID && packetsPerSend[p] > 0) {
1088 exports_view_type tmpSend =
1089 subview_offset(exports, sendPacketOffsets[p], packetsPerSend[p]);
1091 if (sendType == Details::DISTRIBUTOR_ISEND) {
1092 exports_view_type tmpSendBuf =
1093 subview_offset (exports, sendPacketOffsets[p], packetsPerSend[p]);
1094 requestsSend_.push_back (isend<int> (tmpSendBuf, plan.getProcsTo()[p],
1095 mpiTag_, *plan.getComm()));
1099 as<int> (tmpSend.size ()),
1100 plan.getProcsTo()[p], mpiTag_, *plan.getComm());
1108 if (plan.hasSelfMessage()) {
1109 deep_copy_offset(imports, exports, selfReceiveOffset,
1110 sendPacketOffsets[selfNum], packetsPerSend[selfNum]);
1115 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
1116 Teuchos::TimeMonitor timeMonSends2 (*timer_doPosts4KV_sends_slow_);
1117 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
1120 typedef typename ExpView::non_const_value_type Packet;
1121 typedef typename ExpView::array_layout Layout;
1122 typedef typename ExpView::device_type Device;
1123 typedef typename ExpView::memory_traits Mem;
1133 #ifdef HAVE_TPETRA_DEBUG
1134 if (sendType != Details::DISTRIBUTOR_SEND) {
1135 if (plan.getComm()->getRank() == 0)
1136 std::cout <<
"The requested Tpetra send type "
1138 <<
" requires Distributor data to be ordered by"
1139 <<
" the receiving processor rank. Since these"
1140 <<
" data are not ordered, Tpetra will use Send"
1141 <<
" instead." << std::endl;
1145 Kokkos::View<Packet*,Layout,Device,Mem> sendArray (
"sendArray",
1148 Array<size_t> indicesOffsets (numExportPacketsPerLID.size(), 0);
1150 for (
int j=0; j<numExportPacketsPerLID.size(); ++j) {
1151 indicesOffsets[j] = ioffset;
1152 ioffset += numExportPacketsPerLID[j];
1155 for (
size_t i = 0; i < numBlocks; ++i) {
1156 size_t p = i + procIndex;
1157 if (p > (numBlocks - 1)) {
1161 if (plan.getProcsTo()[p] != myProcID) {
1162 size_t sendArrayOffset = 0;
1163 size_t j = plan.getStartsTo()[p];
1164 size_t numPacketsTo_p = 0;
1165 for (
size_t k = 0; k < plan.getLengthsTo()[p]; ++k, ++j) {
1166 numPacketsTo_p += numExportPacketsPerLID[j];
1167 deep_copy_offset(sendArray, exports, sendArrayOffset,
1168 indicesOffsets[j], numExportPacketsPerLID[j]);
1169 sendArrayOffset += numExportPacketsPerLID[j];
1171 typename ExpView::execution_space().fence();
1173 if (numPacketsTo_p > 0) {
1175 subview_offset(sendArray,
size_t(0), numPacketsTo_p);
1178 as<int> (tmpSend.size ()),
1179 plan.getProcsTo()[p], mpiTag_, *plan.getComm());
1184 selfIndex = plan.getStartsTo()[p];
1188 if (plan.hasSelfMessage()) {
1189 for (
size_t k = 0; k < plan.getLengthsTo()[selfNum]; ++k) {
1190 deep_copy_offset(imports, exports, selfReceiveOffset,
1191 indicesOffsets[selfIndex],
1192 numExportPacketsPerLID[selfIndex]);
1193 selfReceiveOffset += numExportPacketsPerLID[selfIndex];
Add specializations of Teuchos::Details::MpiTypeTraits for Kokkos::complex<float> and Kokkos::complex...
std::string DistributorSendTypeEnumToString(EDistributorSendType sendType)
Convert an EDistributorSendType enum value to a string.
std::string DistributorHowInitializedEnumToString(EDistributorHowInitialized how)
Convert an EDistributorHowInitialized enum value to a string.
Stand-alone utility functions and macros.
EDistributorSendType
The type of MPI send that Distributor should use.