Tpetra parallel linear algebra  Version of the Day
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
Tpetra_Details_DistributorActor.hpp
1 // @HEADER
2 // *****************************************************************************
3 // Tpetra: Templated Linear Algebra Services Package
4 //
5 // Copyright 2008 NTESS and the Tpetra contributors.
6 // SPDX-License-Identifier: BSD-3-Clause
7 // *****************************************************************************
8 // @HEADER
9 
10 // clang-format off
11 
12 #ifndef TPETRA_DETAILS_DISTRIBUTOR_ACTOR_HPP
13 #define TPETRA_DETAILS_DISTRIBUTOR_ACTOR_HPP
14 
16 #include "Tpetra_Util.hpp"
17 
18 #include "Teuchos_Array.hpp"
19 #include "Teuchos_Comm.hpp"
21 #include "Teuchos_RCP.hpp"
22 #include "Teuchos_Time.hpp"
23 
24 #include "Kokkos_TeuchosCommAdapters.hpp"
25 #include "Kokkos_StdAlgorithms.hpp"
26 
27 #ifdef HAVE_TPETRA_MPI
28 #include "mpi.h"
29 #endif
30 
31 namespace Tpetra {
32 namespace Details {
33 
34 template <class View1, class View2>
35 constexpr bool areKokkosViews = Kokkos::is_view<View1>::value && Kokkos::is_view<View2>::value;
36 
37 class DistributorActor {
38  static constexpr int DEFAULT_MPI_TAG = 1;
39 
40 public:
41  DistributorActor();
42  DistributorActor(const DistributorActor& otherActor);
43 
44  template <class ExpView, class ImpView>
45  void doPostsAndWaits(const DistributorPlan& plan,
46  const ExpView &exports,
47  size_t numPackets,
48  const ImpView &imports);
49 
50  template <class ExpView, class ImpView>
51  void doPostsAndWaits(const DistributorPlan& plan,
52  const ExpView &exports,
53  const Teuchos::ArrayView<const size_t>& numExportPacketsPerLID,
54  const ImpView &imports,
55  const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID);
56 
57  template <class ExpView, class ExpPacketsView, class ImpView, class ImpPacketsView>
58  void doPostsAndWaitsKokkos(const DistributorPlan& plan,
59  const ExpView &exports,
60  const ExpPacketsView &numExportPacketsPerLID,
61  const ImpView &imports,
62  const ImpPacketsView &numImportPacketsPerLID);
63 
64  template <class ExpView, class ImpView>
65  void doPosts(const DistributorPlan& plan,
66  const ExpView& exports,
67  size_t numPackets,
68  const ImpView& imports);
69 
70  template <class ExpView, class ImpView>
71  void doPosts(const DistributorPlan& plan,
72  const ExpView &exports,
73  const Teuchos::ArrayView<const size_t>& numExportPacketsPerLID,
74  const ImpView &imports,
75  const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID);
76 
77  template <class ExpView, class ExpPacketsView, class ImpView, class ImpPacketsView>
78  void doPostsKokkos(const DistributorPlan& plan,
79  const ExpView &exports,
80  const ExpPacketsView &numExportPacketsPerLID,
81  const ImpView &imports,
82  const ImpPacketsView &numImportPacketsPerLID);
83 
84  template <class ExpView, class ExpPacketsView, class ImpView, class ImpPacketsView>
85  void doPostsAllToAllKokkos(
86  const DistributorPlan &plan, const ExpView &exports,
87  const ExpPacketsView &numExportPacketsPerLID,
88  const ImpView &imports,
89  const ImpPacketsView &numImportPacketsPerLID);
90 
91  template <class ExpView, class ExpPacketsView, class ImpView, class ImpPacketsView>
92  void doPostsNbrAllToAllVKokkos(
93  const DistributorPlan &plan, const ExpView &exports,
94  const ExpPacketsView &numExportPacketsPerLID,
95  const ImpView &imports,
96  const ImpPacketsView &numImportPacketsPerLID);
97 
98  void doWaits(const DistributorPlan& plan);
99 
100  bool isReady() const;
101 
102 private:
103 // clang-format on
104 #ifdef HAVE_TPETRA_MPI
105  template <class ExpView, class ImpView>
106  void doPostsAllToAll(const DistributorPlan &plan, const ExpView &exports,
107  size_t numPackets, const ImpView &imports);
108 
109  template <class ExpView, class ImpView>
110  void doPostsAllToAll(
111  const DistributorPlan &plan, const ExpView &exports,
112  const Teuchos::ArrayView<const size_t> &numExportPacketsPerLID,
113  const ImpView &imports,
114  const Teuchos::ArrayView<const size_t> &numImportPacketsPerLID);
115 
116 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
117  template <class ExpView, class ImpView>
118  void doPostsNbrAllToAllV(const DistributorPlan &plan, const ExpView &exports,
119  size_t numPackets, const ImpView &imports);
120 
121  template <class ExpView, class ImpView>
122  void doPostsNbrAllToAllV(
123  const DistributorPlan &plan, const ExpView &exports,
124  const Teuchos::ArrayView<const size_t> &numExportPacketsPerLID,
125  const ImpView &imports,
126  const Teuchos::ArrayView<const size_t> &numImportPacketsPerLID);
127 #endif // HAVE_TPETRACORE_MPI_ADVANCE
128 #endif // HAVE_TPETRA_CORE
129  // clang-format off
130  int mpiTag_;
131 
132  Teuchos::Array<Teuchos::RCP<Teuchos::CommRequest<int>>> requests_;
133 
134 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
135  Teuchos::RCP<Teuchos::Time> timer_doPosts3KV_;
136  Teuchos::RCP<Teuchos::Time> timer_doPosts4KV_;
137  Teuchos::RCP<Teuchos::Time> timer_doWaits_;
138  Teuchos::RCP<Teuchos::Time> timer_doPosts3KV_recvs_;
139  Teuchos::RCP<Teuchos::Time> timer_doPosts4KV_recvs_;
140  Teuchos::RCP<Teuchos::Time> timer_doPosts3KV_barrier_;
141  Teuchos::RCP<Teuchos::Time> timer_doPosts4KV_barrier_;
142  Teuchos::RCP<Teuchos::Time> timer_doPosts3KV_sends_;
143  Teuchos::RCP<Teuchos::Time> timer_doPosts4KV_sends_;
144  Teuchos::RCP<Teuchos::Time> timer_doPosts3KV_sends_slow_;
145  Teuchos::RCP<Teuchos::Time> timer_doPosts4KV_sends_slow_;
146  Teuchos::RCP<Teuchos::Time> timer_doPosts3KV_sends_fast_;
147  Teuchos::RCP<Teuchos::Time> timer_doPosts4KV_sends_fast_;
148 
150  void makeTimers();
151 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
152 };
153 
154 template <class ExpView, class ImpView>
155 void DistributorActor::doPostsAndWaits(const DistributorPlan& plan,
156  const ExpView& exports,
157  size_t numPackets,
158  const ImpView& imports)
159 {
160  static_assert(areKokkosViews<ExpView, ImpView>,
161  "Data arrays for DistributorActor::doPostsAndWaits must be Kokkos::Views");
162  doPosts(plan, exports, numPackets, imports);
163  doWaits(plan);
164 }
165 
166 template <class ExpView, class ImpView>
167 void DistributorActor::doPostsAndWaits(const DistributorPlan& plan,
168  const ExpView& exports,
169  const Teuchos::ArrayView<const size_t>& numExportPacketsPerLID,
170  const ImpView& imports,
171  const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID)
172 {
173  static_assert(areKokkosViews<ExpView, ImpView>,
174  "Data arrays for DistributorActor::doPostsAndWaits must be Kokkos::Views");
175  doPosts(plan, exports, numExportPacketsPerLID, imports, numImportPacketsPerLID);
176  doWaits(plan);
177 }
178 
179 
180 template <class ExpView, class ExpPacketsView, class ImpView, class ImpPacketsView>
181 void DistributorActor::doPostsAndWaitsKokkos(const DistributorPlan& plan,
182  const ExpView &exports,
183  const ExpPacketsView &numExportPacketsPerLID,
184  const ImpView &imports,
185  const ImpPacketsView &numImportPacketsPerLID)
186 {
187  static_assert(areKokkosViews<ExpView, ImpView>,
188  "Data arrays for DistributorActor::doPostsAndWaitsKokkos must be Kokkos::Views");
189  static_assert(areKokkosViews<ExpPacketsView, ImpPacketsView>,
190  "Num packets arrays for DistributorActor::doPostsAndWaitsKokkos must be Kokkos::Views");
191  doPostsKokkos(plan, exports, numExportPacketsPerLID, imports, numImportPacketsPerLID);
192  doWaits(plan);
193 }
194 
195 template <typename ViewType>
196 using HostAccessibility = Kokkos::SpaceAccessibility<Kokkos::DefaultHostExecutionSpace, typename ViewType::memory_space>;
197 
198 template <typename DstViewType, typename SrcViewType>
199 using enableIfHostAccessible = std::enable_if_t<HostAccessibility<DstViewType>::accessible &&
200  HostAccessibility<SrcViewType>::accessible>;
201 
202 template <typename DstViewType, typename SrcViewType>
203 using enableIfNotHostAccessible = std::enable_if_t<!HostAccessibility<DstViewType>::accessible ||
204  !HostAccessibility<SrcViewType>::accessible>;
205 
206 template <typename DstViewType, typename SrcViewType>
207 enableIfHostAccessible<DstViewType, SrcViewType>
208 packOffset(const DstViewType& dst,
209  const SrcViewType& src,
210  const size_t dst_offset,
211  const size_t src_offset,
212  const size_t size)
213 {
214  memcpy((void*) (dst.data()+dst_offset), src.data()+src_offset, size*sizeof(typename DstViewType::value_type));
215 }
216 
217 template <typename DstViewType, typename SrcViewType>
218 enableIfNotHostAccessible<DstViewType, SrcViewType>
219 packOffset(const DstViewType& dst,
220  const SrcViewType& src,
221  const size_t dst_offset,
222  const size_t src_offset,
223  const size_t size)
224 {
225  Kokkos::Compat::deep_copy_offset(dst, src, dst_offset, src_offset, size);
226 }
227 
228 // clang-format on
229 #ifdef HAVE_TPETRA_MPI
230 template <class ExpView, class ImpView>
231 void DistributorActor::doPostsAllToAll(const DistributorPlan &plan,
232  const ExpView &exports,
233  size_t numPackets,
234  const ImpView &imports) {
235  using size_type = Teuchos::Array<size_t>::size_type;
236 
237  TEUCHOS_TEST_FOR_EXCEPTION(
238  !plan.getIndicesTo().is_null(), std::runtime_error,
239  "Send Type=\"Alltoall\" only works for fast-path communication.");
240 
241  auto comm = plan.getComm();
242  const int myRank = comm->getRank();
243  std::vector<int> sendcounts(comm->getSize(), 0);
244  std::vector<int> sdispls(comm->getSize(), 0);
245  std::vector<int> recvcounts(comm->getSize(), 0);
246  std::vector<int> rdispls(comm->getSize(), 0);
247 
248  size_t numBlocks = plan.getNumSends() + plan.hasSelfMessage();
249  for (size_t p = 0; p < numBlocks; ++p) {
250  sdispls[plan.getProcsTo()[p]] = plan.getStartsTo()[p] * numPackets;
251  size_t sendcount = plan.getLengthsTo()[p] * numPackets;
252  // sendcount is converted down to int, so make sure it can be represented
253  TEUCHOS_TEST_FOR_EXCEPTION(sendcount > size_t(INT_MAX), std::logic_error,
254  "Tpetra::Distributor::doPosts(3 args, Kokkos): "
255  "Send count for block "
256  << p << " (" << sendcount
257  << ") is too large "
258  "to be represented as int.");
259  sendcounts[plan.getProcsTo()[p]] = static_cast<int>(sendcount);
260  }
261 
262  const size_type actualNumReceives =
263  Teuchos::as<size_type>(plan.getNumReceives()) +
264  Teuchos::as<size_type>(plan.hasSelfMessage() ? 1 : 0);
265  size_t curBufferOffset = 0;
266  for (size_type i = 0; i < actualNumReceives; ++i) {
267  const size_t curBufLen = plan.getLengthsFrom()[i] * numPackets;
268  TEUCHOS_TEST_FOR_EXCEPTION(
269  curBufferOffset + curBufLen > static_cast<size_t>(imports.size()),
270  std::logic_error,
271  "Tpetra::Distributor::doPosts(3 args, Kokkos): "
272  "Exceeded size of 'imports' array in packing loop on Process "
273  << myRank << ". imports.size() = " << imports.size()
274  << " < "
275  "curBufferOffset("
276  << curBufferOffset << ") + curBufLen(" << curBufLen << ").");
277  rdispls[plan.getProcsFrom()[i]] = curBufferOffset;
278  // curBufLen is converted down to int, so make sure it can be represented
279  TEUCHOS_TEST_FOR_EXCEPTION(curBufLen > size_t(INT_MAX), std::logic_error,
280  "Tpetra::Distributor::doPosts(3 args, Kokkos): "
281  "Recv count for receive "
282  << i << " (" << curBufLen
283  << ") is too large "
284  "to be represented as int.");
285  recvcounts[plan.getProcsFrom()[i]] = static_cast<int>(curBufLen);
286  curBufferOffset += curBufLen;
287  }
288 
289  using T = typename ExpView::non_const_value_type;
290  MPI_Datatype rawType = ::Tpetra::Details::MpiTypeTraits<T>::getType(T());
291 
292 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
293  if (Details::DISTRIBUTOR_MPIADVANCE_ALLTOALL == plan.getSendType()) {
294  MPIX_Comm *mpixComm = *plan.getMPIXComm();
295  TEUCHOS_TEST_FOR_EXCEPTION(
296  !mpixComm, std::runtime_error,
297  "plan's MPIX_Comm null in doPostsAllToAll, but "
298  "DISTRIBUTOR_MPIADVANCE_ALLTOALL set: plan.howInitialized()="
299  << DistributorHowInitializedEnumToString(plan.howInitialized()));
300 
301  const int err = MPIX_Alltoallv(
302  exports.data(), sendcounts.data(), sdispls.data(), rawType,
303  imports.data(), recvcounts.data(), rdispls.data(), rawType, mpixComm);
304 
305  TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error,
306  "MPIX_Alltoallv failed with error \""
307  << Teuchos::mpiErrorCodeToString(err)
308  << "\".");
309 
310  return;
311  }
312 #endif
313  Teuchos::RCP<const Teuchos::MpiComm<int>> mpiComm =
314  Teuchos::rcp_dynamic_cast<const Teuchos::MpiComm<int>>(comm);
315  Teuchos::RCP<const Teuchos::OpaqueWrapper<MPI_Comm>> rawComm =
316  mpiComm->getRawMpiComm();
317 
318  const int err = MPI_Alltoallv(
319  exports.data(), sendcounts.data(), sdispls.data(), rawType,
320  imports.data(), recvcounts.data(), rdispls.data(), rawType, (*rawComm)());
321 
322  TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error,
323  "MPI_Alltoallv failed with error \""
324  << Teuchos::mpiErrorCodeToString(err)
325  << "\".");
326 
327  return;
328 }
329 
330 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
331 template <class ExpView, class ImpView>
332 void DistributorActor::doPostsNbrAllToAllV(const DistributorPlan &plan,
333  const ExpView &exports,
334  size_t numPackets,
335  const ImpView &imports) {
336  TEUCHOS_TEST_FOR_EXCEPTION(
337  !plan.getIndicesTo().is_null(), std::runtime_error,
338  "Send Type=\"Alltoall\" only works for fast-path communication.");
339 
340  const int myRank = plan.getComm()->getRank();
341  MPIX_Comm *mpixComm = *plan.getMPIXComm();
342 
343  const size_t numSends = plan.getNumSends() + plan.hasSelfMessage();
344  const size_t numRecvs = plan.getNumReceives() + plan.hasSelfMessage();
345  std::vector<int> sendcounts(numSends, 0);
346  std::vector<int> sdispls(numSends, 0);
347  std::vector<int> recvcounts(numRecvs, 0);
348  std::vector<int> rdispls(numRecvs, 0);
349 
350  for (size_t p = 0; p < numSends; ++p) {
351  sdispls[p] = plan.getStartsTo()[p] * numPackets;
352  const size_t sendcount = plan.getLengthsTo()[p] * numPackets;
353  // sendcount is converted down to int, so make sure it can be represented
354  TEUCHOS_TEST_FOR_EXCEPTION(sendcount > size_t(INT_MAX), std::logic_error,
355  "Tpetra::Distributor::doPosts(3 args, Kokkos): "
356  "Send count for block "
357  << p << " (" << sendcount
358  << ") is too large "
359  "to be represented as int.");
360  sendcounts[p] = static_cast<int>(sendcount);
361  }
362 
363  size_t curBufferOffset = 0;
364  for (size_t i = 0; i < numRecvs; ++i) {
365  const size_t curBufLen = plan.getLengthsFrom()[i] * numPackets;
366  TEUCHOS_TEST_FOR_EXCEPTION(
367  curBufferOffset + curBufLen > static_cast<size_t>(imports.size()),
368  std::logic_error,
369  "Tpetra::Distributor::doPosts(3 args, Kokkos): "
370  "Exceeded size of 'imports' array in packing loop on Process "
371  << myRank << ". imports.size() = " << imports.size()
372  << " < "
373  "curBufferOffset("
374  << curBufferOffset << ") + curBufLen(" << curBufLen << ").");
375  rdispls[i] = curBufferOffset;
376  // curBufLen is converted down to int, so make sure it can be represented
377  TEUCHOS_TEST_FOR_EXCEPTION(curBufLen > size_t(INT_MAX), std::logic_error,
378  "Tpetra::Distributor::doPosts(3 args, Kokkos): "
379  "Recv count for receive "
380  << i << " (" << curBufLen
381  << ") is too large "
382  "to be represented as int.");
383  recvcounts[i] = static_cast<int>(curBufLen);
384  curBufferOffset += curBufLen;
385  }
386 
387  using T = typename ExpView::non_const_value_type;
388  MPI_Datatype rawType = ::Tpetra::Details::MpiTypeTraits<T>::getType(T());
389 
390  const int err = MPIX_Neighbor_alltoallv(
391  exports.data(), sendcounts.data(), sdispls.data(), rawType,
392  imports.data(), recvcounts.data(), rdispls.data(), rawType, mpixComm);
393 
394  TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error,
395  "MPIX_Neighbor_alltoallv failed with error \""
396  << Teuchos::mpiErrorCodeToString(err)
397  << "\".");
398 }
399 #endif // HAVE_TPETRACORE_MPI_ADVANCE
400 #endif // HAVE_TPETRA_MPI
401 // clang-format off
402 
403 template <class ExpView, class ImpView>
404 void DistributorActor::doPosts(const DistributorPlan& plan,
405  const ExpView& exports,
406  size_t numPackets,
407  const ImpView& imports)
408 {
409  static_assert(areKokkosViews<ExpView, ImpView>,
410  "Data arrays for DistributorActor::doPosts must be Kokkos::Views");
411  using Teuchos::Array;
412  using Teuchos::as;
413  using Teuchos::FancyOStream;
414  using Teuchos::includesVerbLevel;
415  using Teuchos::ireceive;
416  using Teuchos::isend;
417  using Teuchos::send;
418  using Teuchos::TypeNameTraits;
419  using Teuchos::typeName;
420  using std::endl;
421  using Kokkos::Compat::create_const_view;
422  using Kokkos::Compat::create_view;
423  using Kokkos::Compat::subview_offset;
424  using Kokkos::Compat::deep_copy_offset;
425  typedef Array<size_t>::size_type size_type;
426  typedef ExpView exports_view_type;
427  typedef ImpView imports_view_type;
428 
429 #ifdef KOKKOS_ENABLE_CUDA
430  static_assert
431  (! std::is_same<typename ExpView::memory_space, Kokkos::CudaUVMSpace>::value &&
432  ! std::is_same<typename ImpView::memory_space, Kokkos::CudaUVMSpace>::value,
433  "Please do not use Tpetra::Distributor with UVM allocations. "
434  "See Trilinos GitHub issue #1088.");
435 #endif // KOKKOS_ENABLE_CUDA
436 
437 #ifdef KOKKOS_ENABLE_SYCL
438  static_assert
439  (! std::is_same<typename ExpView::memory_space, Kokkos::Experimental::SYCLSharedUSMSpace>::value &&
440  ! std::is_same<typename ImpView::memory_space, Kokkos::Experimental::SYCLSharedUSMSpace>::value,
441  "Please do not use Tpetra::Distributor with SharedUSM allocations. "
442  "See Trilinos GitHub issue #1088 (corresponding to CUDA).");
443 #endif // KOKKOS_ENABLE_SYCL
444 
445 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
446  Teuchos::TimeMonitor timeMon (*timer_doPosts3KV_);
447 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
448 
449  const int myRank = plan.getComm()->getRank ();
450  // Run-time configurable parameters that come from the input
451  // ParameterList set by setParameterList().
452  const Details::EDistributorSendType sendType = plan.getSendType();
453 
454 //clang-format on
455 #if defined(HAVE_TPETRA_MPI)
456  // All-to-all communication layout is quite different from
457  // point-to-point, so we handle it separately.
458 
459  if (sendType == Details::DISTRIBUTOR_ALLTOALL) {
460  doPostsAllToAll(plan, exports,numPackets, imports);
461  return;
462  }
463 #ifdef HAVE_TPETRACORE_MPI_ADVANCE
464  else if (sendType == Details::DISTRIBUTOR_MPIADVANCE_ALLTOALL) {
465  doPostsAllToAll(plan, exports,numPackets, imports);
466  return;
467  } else if (sendType == Details::DISTRIBUTOR_MPIADVANCE_NBRALLTOALLV) {
468  doPostsNbrAllToAllV(plan, exports,numPackets, imports);
469  return;
470  }
471 #endif // defined(HAVE_TPETRACORE_MPI_ADVANCE)
472 // clang-format off
473 
474 #else // HAVE_TPETRA_MPI
475  if (plan.hasSelfMessage()) {
476  // This is how we "send a message to ourself": we copy from
477  // the export buffer to the import buffer. That saves
478  // Teuchos::Comm implementations other than MpiComm (in
479  // particular, SerialComm) the trouble of implementing self
480  // messages correctly. (To do this right, SerialComm would
481  // need internal buffer space for messages, keyed on the
482  // message's tag.)
483  size_t selfReceiveOffset = 0;
484  deep_copy_offset(imports, exports, selfReceiveOffset,
485  plan.getStartsTo()[0]*numPackets,
486  plan.getLengthsTo()[0]*numPackets);
487  }
488  // should we just return here?
489  // likely not as comm could be a serial comm
490 #endif // HAVE_TPETRA_MPI
491 
492  size_t selfReceiveOffset = 0;
493 
494 #ifdef HAVE_TPETRA_DEBUG
495  TEUCHOS_TEST_FOR_EXCEPTION
496  (requests_.size () != 0,
497  std::logic_error,
498  "Tpetra::Distributor::doPosts(3 args, Kokkos): Process "
499  << myRank << ": requests_.size() = " << requests_.size () << " != 0.");
500 #endif // HAVE_TPETRA_DEBUG
501 
502  // Distributor uses requests_.size() as the number of outstanding
503  // nonblocking message requests, so we resize to zero to maintain
504  // this invariant.
505  //
506  // getNumReceives() does _not_ include the self message, if there is
507  // one. Here, we do actually send a message to ourselves, so we
508  // include any self message in the "actual" number of receives to
509  // post.
510  //
511  // NOTE (mfh 19 Mar 2012): Epetra_MpiDistributor::DoPosts()
512  // doesn't (re)allocate its array of requests. That happens in
513  // CreateFromSends(), ComputeRecvs_(), DoReversePosts() (on
514  // demand), or Resize_().
515  const size_type actualNumReceives = as<size_type> (plan.getNumReceives()) +
516  as<size_type> (plan.hasSelfMessage() ? 1 : 0);
517  requests_.resize (0);
518 
519  // Post the nonblocking receives. It's common MPI wisdom to post
520  // receives before sends. In MPI terms, this means favoring
521  // adding to the "posted queue" (of receive requests) over adding
522  // to the "unexpected queue" (of arrived messages not yet matched
523  // with a receive).
524  {
525 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
526  Teuchos::TimeMonitor timeMonRecvs (*timer_doPosts3KV_recvs_);
527 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
528 
529  size_t curBufferOffset = 0;
530  for (size_type i = 0; i < actualNumReceives; ++i) {
531  const size_t curBufLen = plan.getLengthsFrom()[i] * numPackets;
532  if (plan.getProcsFrom()[i] != myRank) {
533  // If my process is receiving these packet(s) from another
534  // process (not a self-receive):
535  //
536  // 1. Set up the persisting view (recvBuf) of the imports
537  // array, given the offset and size (total number of
538  // packets from process getProcsFrom()[i]).
539  // 2. Start the Irecv and save the resulting request.
540  TEUCHOS_TEST_FOR_EXCEPTION(
541  curBufferOffset + curBufLen > static_cast<size_t> (imports.size ()),
542  std::logic_error, "Tpetra::Distributor::doPosts(3 args, Kokkos): "
543  "Exceeded size of 'imports' array in packing loop on Process " <<
544  myRank << ". imports.size() = " << imports.size () << " < "
545  "curBufferOffset(" << curBufferOffset << ") + curBufLen(" <<
546  curBufLen << ").");
547  imports_view_type recvBuf =
548  subview_offset (imports, curBufferOffset, curBufLen);
549  requests_.push_back (ireceive<int> (recvBuf, plan.getProcsFrom()[i],
550  mpiTag_, *plan.getComm()));
551  }
552  else { // Receiving from myself
553  selfReceiveOffset = curBufferOffset; // Remember the self-recv offset
554  }
555  curBufferOffset += curBufLen;
556  }
557  }
558 
559 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
560  Teuchos::TimeMonitor timeMonSends (*timer_doPosts3KV_sends_);
561 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
562 
563  // setup scan through getProcsTo() list starting with higher numbered procs
564  // (should help balance message traffic)
565  //
566  // FIXME (mfh 20 Feb 2013) Why haven't we precomputed this?
567  // It doesn't depend on the input at all.
568  size_t numBlocks = plan.getNumSends() + plan.hasSelfMessage();
569  size_t procIndex = 0;
570  while ((procIndex < numBlocks) && (plan.getProcsTo()[procIndex] < myRank)) {
571  ++procIndex;
572  }
573  if (procIndex == numBlocks) {
574  procIndex = 0;
575  }
576 
577  size_t selfNum = 0;
578  size_t selfIndex = 0;
579 
580  if (plan.getIndicesTo().is_null()) {
581 
582 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
583  Teuchos::TimeMonitor timeMonSends2 (*timer_doPosts3KV_sends_fast_);
584 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
585 
586  // Data are already blocked (laid out) by process, so we don't
587  // need a separate send buffer (besides the exports array).
588  for (size_t i = 0; i < numBlocks; ++i) {
589  size_t p = i + procIndex;
590  if (p > (numBlocks - 1)) {
591  p -= numBlocks;
592  }
593 
594  if (plan.getProcsTo()[p] != myRank) {
595  exports_view_type tmpSend = subview_offset(
596  exports, plan.getStartsTo()[p]*numPackets, plan.getLengthsTo()[p]*numPackets);
597 
598  if (sendType == Details::DISTRIBUTOR_ISEND) {
599  // NOTE: This looks very similar to the tmpSend above, but removing
600  // tmpSendBuf and uses tmpSend leads to a performance hit on Arm
601  // SerialNode builds
602  exports_view_type tmpSendBuf =
603  subview_offset (exports, plan.getStartsTo()[p] * numPackets,
604  plan.getLengthsTo()[p] * numPackets);
605  requests_.push_back (isend<int> (tmpSendBuf, plan.getProcsTo()[p],
606  mpiTag_, *plan.getComm()));
607  }
608  else { // DISTRIBUTOR_SEND
609  send<int> (tmpSend,
610  as<int> (tmpSend.size ()),
611  plan.getProcsTo()[p], mpiTag_, *plan.getComm());
612  }
613  }
614  else { // "Sending" the message to myself
615  selfNum = p;
616  }
617  }
618 
619  if (plan.hasSelfMessage()) {
620  // This is how we "send a message to ourself": we copy from
621  // the export buffer to the import buffer. That saves
622  // Teuchos::Comm implementations other than MpiComm (in
623  // particular, SerialComm) the trouble of implementing self
624  // messages correctly. (To do this right, SerialComm would
625  // need internal buffer space for messages, keyed on the
626  // message's tag.)
627  deep_copy_offset(imports, exports, selfReceiveOffset,
628  plan.getStartsTo()[selfNum]*numPackets,
629  plan.getLengthsTo()[selfNum]*numPackets);
630  }
631 
632  }
633  else { // data are not blocked by proc, use send buffer
634 
635 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
636  Teuchos::TimeMonitor timeMonSends2 (*timer_doPosts3KV_sends_slow_);
637 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
638 
639  typedef typename ExpView::non_const_value_type Packet;
640  typedef typename ExpView::array_layout Layout;
641  typedef typename ExpView::device_type Device;
642  typedef typename ExpView::memory_traits Mem;
643 
644  // This buffer is long enough for only one message at a time.
645  // Thus, we use DISTRIBUTOR_SEND always in this case, regardless
646  // of sendType requested by user.
647  // This code path formerly errored out with message:
648  // Tpetra::Distributor::doPosts(3 args, Kokkos):
649  // The "send buffer" code path
650  // doesn't currently work with nonblocking sends.
651  // Now, we opt to just do the communication in a way that works.
652 #ifdef HAVE_TPETRA_DEBUG
653  if (sendType != Details::DISTRIBUTOR_SEND) {
654  if (plan.getComm()->getRank() == 0)
655  std::cout << "The requested Tpetra send type "
657  << " requires Distributor data to be ordered by"
658  << " the receiving processor rank. Since these"
659  << " data are not ordered, Tpetra will use Send"
660  << " instead." << std::endl;
661  }
662 #endif
663 
664  Kokkos::View<Packet*,Layout,Device,Mem> sendArray ("sendArray",
665  plan.getMaxSendLength() * numPackets);
666 
667  for (size_t i = 0; i < numBlocks; ++i) {
668  size_t p = i + procIndex;
669  if (p > (numBlocks - 1)) {
670  p -= numBlocks;
671  }
672 
673  if (plan.getProcsTo()[p] != myRank) {
674  size_t sendArrayOffset = 0;
675  size_t j = plan.getStartsTo()[p];
676  for (size_t k = 0; k < plan.getLengthsTo()[p]; ++k, ++j) {
677  packOffset(sendArray, exports, sendArrayOffset, plan.getIndicesTo()[j]*numPackets, numPackets);
678  sendArrayOffset += numPackets;
679  }
680  typename ExpView::execution_space().fence();
681 
682  ImpView tmpSend =
683  subview_offset(sendArray, size_t(0), plan.getLengthsTo()[p]*numPackets);
684 
685  send<int> (tmpSend,
686  as<int> (tmpSend.size ()),
687  plan.getProcsTo()[p], mpiTag_, *plan.getComm());
688  }
689  else { // "Sending" the message to myself
690  selfNum = p;
691  selfIndex = plan.getStartsTo()[p];
692  }
693  }
694 
695  if (plan.hasSelfMessage()) {
696  for (size_t k = 0; k < plan.getLengthsTo()[selfNum]; ++k) {
697  packOffset(imports, exports, selfReceiveOffset, plan.getIndicesTo()[selfIndex]*numPackets, numPackets);
698  ++selfIndex;
699  selfReceiveOffset += numPackets;
700  }
701  }
702  }
703 }
704 
705 // clang-format on
706 #ifdef HAVE_TPETRA_MPI
707 template <class ExpView, class ImpView>
708 void DistributorActor::doPostsAllToAll(
709  const DistributorPlan &plan, const ExpView &exports,
710  const Teuchos::ArrayView<const size_t> &numExportPacketsPerLID,
711  const ImpView &imports,
712  const Teuchos::ArrayView<const size_t> &numImportPacketsPerLID) {
713  TEUCHOS_TEST_FOR_EXCEPTION(
714  !plan.getIndicesTo().is_null(), std::runtime_error,
715  "Send Type=\"Alltoall\" only works for fast-path communication.");
716 
717  using size_type = Teuchos::Array<size_t>::size_type;
718 
719  auto comm = plan.getComm();
720  std::vector<int> sendcounts(comm->getSize(), 0);
721  std::vector<int> sdispls(comm->getSize(), 0);
722  std::vector<int> recvcounts(comm->getSize(), 0);
723  std::vector<int> rdispls(comm->getSize(), 0);
724 
725  size_t curPKToffset = 0;
726  for (size_t pp = 0; pp < plan.getNumSends(); ++pp) {
727  sdispls[plan.getProcsTo()[pp]] = curPKToffset;
728  size_t numPackets = 0;
729  for (size_t j = plan.getStartsTo()[pp];
730  j < plan.getStartsTo()[pp] + plan.getLengthsTo()[pp]; ++j) {
731  numPackets += numExportPacketsPerLID[j];
732  }
733  // numPackets is converted down to int, so make sure it can be represented
734  TEUCHOS_TEST_FOR_EXCEPTION(numPackets > size_t(INT_MAX), std::logic_error,
735  "Tpetra::Distributor::doPosts(4 args, Kokkos): "
736  "Send count for send "
737  << pp << " (" << numPackets
738  << ") is too large "
739  "to be represented as int.");
740  sendcounts[plan.getProcsTo()[pp]] = static_cast<int>(numPackets);
741  curPKToffset += numPackets;
742  }
743 
744  const size_type actualNumReceives =
745  Teuchos::as<size_type>(plan.getNumReceives()) +
746  Teuchos::as<size_type>(plan.hasSelfMessage() ? 1 : 0);
747 
748  size_t curBufferOffset = 0;
749  size_t curLIDoffset = 0;
750  for (size_type i = 0; i < actualNumReceives; ++i) {
751  size_t totalPacketsFrom_i = 0;
752  for (size_t j = 0; j < plan.getLengthsFrom()[i]; ++j) {
753  totalPacketsFrom_i += numImportPacketsPerLID[curLIDoffset + j];
754  }
755  curLIDoffset += plan.getLengthsFrom()[i];
756 
757  rdispls[plan.getProcsFrom()[i]] = curBufferOffset;
758  // totalPacketsFrom_i is converted down to int, so make sure it can be
759  // represented
760  TEUCHOS_TEST_FOR_EXCEPTION(totalPacketsFrom_i > size_t(INT_MAX),
761  std::logic_error,
762  "Tpetra::Distributor::doPosts(3 args, Kokkos): "
763  "Recv count for receive "
764  << i << " (" << totalPacketsFrom_i
765  << ") is too large "
766  "to be represented as int.");
767  recvcounts[plan.getProcsFrom()[i]] = static_cast<int>(totalPacketsFrom_i);
768  curBufferOffset += totalPacketsFrom_i;
769  }
770 
771  Teuchos::RCP<const Teuchos::MpiComm<int>> mpiComm =
772  Teuchos::rcp_dynamic_cast<const Teuchos::MpiComm<int>>(comm);
773  Teuchos::RCP<const Teuchos::OpaqueWrapper<MPI_Comm>> rawComm =
774  mpiComm->getRawMpiComm();
775  using T = typename ExpView::non_const_value_type;
776  MPI_Datatype rawType = ::Tpetra::Details::MpiTypeTraits<T>::getType(T());
777 
778 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
779  if (Details::DISTRIBUTOR_MPIADVANCE_ALLTOALL == plan.getSendType()) {
780  MPIX_Comm *mpixComm = *plan.getMPIXComm();
781  TEUCHOS_TEST_FOR_EXCEPTION(!mpixComm, std::runtime_error,
782  "MPIX_Comm is null in doPostsAllToAll \""
783  << __FILE__ << ":" << __LINE__);
784 
785  const int err = MPIX_Alltoallv(
786  exports.data(), sendcounts.data(), sdispls.data(), rawType,
787  imports.data(), recvcounts.data(), rdispls.data(), rawType, mpixComm);
788 
789  TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error,
790  "MPIX_Alltoallv failed with error \""
791  << Teuchos::mpiErrorCodeToString(err)
792  << "\".");
793 
794  return;
795  }
796 #endif // HAVE_TPETRACORE_MPI_ADVANCE
797 
798  const int err = MPI_Alltoallv(
799  exports.data(), sendcounts.data(), sdispls.data(), rawType,
800  imports.data(), recvcounts.data(), rdispls.data(), rawType, (*rawComm)());
801 
802  TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error,
803  "MPI_Alltoallv failed with error \""
804  << Teuchos::mpiErrorCodeToString(err)
805  << "\".");
806 }
807 
808 template <class ExpView, class ExpPacketsView, class ImpView, class ImpPacketsView>
809 void DistributorActor::doPostsAllToAllKokkos(
810  const DistributorPlan &plan, const ExpView &exports,
811  const ExpPacketsView &numExportPacketsPerLID,
812  const ImpView &imports,
813  const ImpPacketsView &numImportPacketsPerLID) {
814  TEUCHOS_TEST_FOR_EXCEPTION(
815  !plan.getIndicesTo().is_null(), std::runtime_error,
816  "Send Type=\"Alltoall\" only works for fast-path communication.");
817 
818  using size_type = Teuchos::Array<size_t>::size_type;
819  using ExpExecSpace = typename ExpPacketsView::execution_space;
820  using ImpExecSpace = typename ImpPacketsView::execution_space;
821 
822  auto comm = plan.getComm();
823  Kokkos::View<int*, Kokkos::DefaultHostExecutionSpace> sendcounts("sendcounts", comm->getSize());
824  Kokkos::View<int*, Kokkos::DefaultHostExecutionSpace> sdispls("sdispls", comm->getSize());
825  Kokkos::View<int*, Kokkos::DefaultHostExecutionSpace> recvcounts("recvcounts", comm->getSize());
826  Kokkos::View<int*, Kokkos::DefaultHostExecutionSpace> rdispls("rdispls", comm->getSize());
827 
828  auto sendcounts_d = Kokkos::create_mirror_view(ExpExecSpace(), sendcounts);
829  auto sdispls_d = Kokkos::create_mirror_view(ExpExecSpace(), sdispls);
830  auto recvcounts_d = Kokkos::create_mirror_view(ImpExecSpace(), recvcounts);
831  auto rdispls_d = Kokkos::create_mirror_view(ImpExecSpace(), rdispls);
832 
833  auto getStartsTo = Kokkos::Compat::getKokkosViewDeepCopy<ExpExecSpace>(plan.getStartsTo());
834  auto getLengthsTo = Kokkos::Compat::getKokkosViewDeepCopy<ExpExecSpace>(plan.getLengthsTo());
835  auto getProcsTo = Kokkos::Compat::getKokkosViewDeepCopy<ExpExecSpace>(plan.getProcsTo());
836 
837  size_t curPKToffset = 0;
838  Kokkos::parallel_scan(Kokkos::RangePolicy<ExpExecSpace>(0, plan.getNumSends()), KOKKOS_LAMBDA(const size_t pp, size_t& offset, bool is_final) {
839  sdispls_d(getProcsTo(pp)) = offset;
840  size_t numPackets = 0;
841  for (size_t j = getStartsTo(pp); j < getStartsTo(pp) + getLengthsTo(pp); ++j) {
842  numPackets += numExportPacketsPerLID(j);
843  }
844  sendcounts_d(getProcsTo(pp)) = static_cast<int>(numPackets);
845  offset += numPackets;
846  }, curPKToffset);
847 
848  int overflow;
849  Kokkos::parallel_reduce(Kokkos::RangePolicy<ExpExecSpace>(0, plan.getNumSends()), KOKKOS_LAMBDA(const size_t pp, int& index) {
850  if(sendcounts_d(getProcsTo(pp)) < 0) {
851  index = pp+1;
852  }
853  }, overflow);
854 
855  // numPackets is converted down to int, so make sure it can be represented
856  TEUCHOS_TEST_FOR_EXCEPTION(overflow, std::logic_error,
857  "Tpetra::Distributor::doPostsKokkos(4 args, Kokkos): "
858  "Send count for send "
859  << overflow-1 << " is too large "
860  "to be represented as int.");
861 
862  const size_type actualNumReceives =
863  Teuchos::as<size_type>(plan.getNumReceives()) +
864  Teuchos::as<size_type>(plan.hasSelfMessage() ? 1 : 0);
865 
866  auto getLengthsFrom = Kokkos::Compat::getKokkosViewDeepCopy<ImpExecSpace>(plan.getLengthsFrom());
867  auto getProcsFrom = Kokkos::Compat::getKokkosViewDeepCopy<ImpExecSpace>(plan.getProcsFrom());
868 
869  Kokkos::View<size_t*, ImpExecSpace> curLIDoffset("curLIDoffset", actualNumReceives);
870  Kokkos::parallel_scan(Kokkos::RangePolicy<ImpExecSpace>(0, actualNumReceives), KOKKOS_LAMBDA(const size_type i, size_t& offset, bool is_final) {
871  if(is_final) curLIDoffset(i) = offset;
872  offset += getLengthsFrom(i);
873  });
874 
875  Kokkos::parallel_scan(Kokkos::RangePolicy<ImpExecSpace>(0, actualNumReceives), KOKKOS_LAMBDA(const size_type i, size_t& curBufferOffset, bool is_final) {
876  size_t totalPacketsFrom_i = 0;
877  for(size_t j = 0; j < getLengthsFrom(i); j++) {
878  totalPacketsFrom_i += numImportPacketsPerLID(curLIDoffset(i) + j);
879  }
880 
881  if(is_final) rdispls_d(getProcsFrom(i)) = curBufferOffset;
882  if(is_final) recvcounts_d(getProcsFrom(i)) = static_cast<int>(totalPacketsFrom_i);
883  curBufferOffset += totalPacketsFrom_i;
884  });
885 
886  Kokkos::parallel_reduce(Kokkos::RangePolicy<ExpExecSpace>(0, actualNumReceives), KOKKOS_LAMBDA(const size_type i, int& index) {
887  if(recvcounts_d(getProcsFrom(i)) < 0) {
888  index = i+1;
889  }
890  }, overflow);
891 
892  // totalPacketsFrom_i is converted down to int, so make sure it can be
893  // represented
894  TEUCHOS_TEST_FOR_EXCEPTION(overflow, std::logic_error,
895  "Tpetra::Distributor::doPostsKokkos(4 args, Kokkos): "
896  "Recv count for receive "
897  << overflow-1 << " is too large "
898  "to be represented as int.");
899 
900  Kokkos::deep_copy(sendcounts, sendcounts_d);
901  Kokkos::deep_copy(sdispls, sdispls_d);
902  Kokkos::deep_copy(recvcounts, recvcounts_d);
903  Kokkos::deep_copy(rdispls, rdispls_d);
904 
905  Teuchos::RCP<const Teuchos::MpiComm<int>> mpiComm =
906  Teuchos::rcp_dynamic_cast<const Teuchos::MpiComm<int>>(comm);
907  Teuchos::RCP<const Teuchos::OpaqueWrapper<MPI_Comm>> rawComm =
908  mpiComm->getRawMpiComm();
909  using T = typename ExpView::non_const_value_type;
910  MPI_Datatype rawType = ::Tpetra::Details::MpiTypeTraits<T>::getType(T());
911 
912 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
913  if (Details::DISTRIBUTOR_MPIADVANCE_ALLTOALL == plan.getSendType()) {
914  MPIX_Comm *mpixComm = *plan.getMPIXComm();
915  TEUCHOS_TEST_FOR_EXCEPTION(!mpixComm, std::runtime_error,
916  "MPIX_Comm is null in doPostsAllToAll \""
917  << __FILE__ << ":" << __LINE__);
918 
919  const int err = MPIX_Alltoallv(
920  exports.data(), sendcounts.data(), sdispls.data(), rawType,
921  imports.data(), recvcounts.data(), rdispls.data(), rawType, mpixComm);
922 
923  TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error,
924  "MPIX_Alltoallv failed with error \""
925  << Teuchos::mpiErrorCodeToString(err)
926  << "\".");
927 
928  return;
929  }
930 #endif // HAVE_TPETRACORE_MPI_ADVANCE
931 
932  const int err = MPI_Alltoallv(
933  exports.data(), sendcounts.data(), sdispls.data(), rawType,
934  imports.data(), recvcounts.data(), rdispls.data(), rawType, (*rawComm)());
935 
936  TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error,
937  "MPI_Alltoallv failed with error \""
938  << Teuchos::mpiErrorCodeToString(err)
939  << "\".");
940 }
941 
942 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
943 template <class ExpView, class ImpView>
944 void DistributorActor::doPostsNbrAllToAllV(
945  const DistributorPlan &plan, const ExpView &exports,
946  const Teuchos::ArrayView<const size_t> &numExportPacketsPerLID,
947  const ImpView &imports,
948  const Teuchos::ArrayView<const size_t> &numImportPacketsPerLID) {
949  TEUCHOS_TEST_FOR_EXCEPTION(
950  !plan.getIndicesTo().is_null(), std::runtime_error,
951  "Send Type=\"Alltoall\" only works for fast-path communication.");
952 
953  const Teuchos_Ordinal numSends = plan.getProcsTo().size();
954  const Teuchos_Ordinal numRecvs = plan.getProcsFrom().size();
955 
956  auto comm = plan.getComm();
957  std::vector<int> sendcounts(numSends, 0);
958  std::vector<int> sdispls(numSends, 0);
959  std::vector<int> recvcounts(numRecvs, 0);
960  std::vector<int> rdispls(numRecvs, 0);
961 
962  Teuchos::RCP<const Teuchos::MpiComm<int>> mpiComm =
963  Teuchos::rcp_dynamic_cast<const Teuchos::MpiComm<int>>(comm);
964  Teuchos::RCP<const Teuchos::OpaqueWrapper<MPI_Comm>> rawComm =
965  mpiComm->getRawMpiComm();
966  using T = typename ExpView::non_const_value_type;
967  MPI_Datatype rawType = ::Tpetra::Details::MpiTypeTraits<T>::getType(T());
968 
969  // unlike standard alltoall, entry `i` in sdispls and sendcounts
970  // refer to the ith participating rank, rather than rank i
971  size_t curPKToffset = 0;
972  for (Teuchos_Ordinal pp = 0; pp < numSends; ++pp) {
973  sdispls[pp] = curPKToffset;
974  size_t numPackets = 0;
975  for (size_t j = plan.getStartsTo()[pp];
976  j < plan.getStartsTo()[pp] + plan.getLengthsTo()[pp]; ++j) {
977  numPackets += numExportPacketsPerLID[j];
978  }
979  // numPackets is converted down to int, so make sure it can be represented
980  TEUCHOS_TEST_FOR_EXCEPTION(numPackets > size_t(INT_MAX), std::logic_error,
981  "Tpetra::Distributor::doPosts(4 args, Kokkos): "
982  "Send count for send "
983  << pp << " (" << numPackets
984  << ") is too large "
985  "to be represented as int.");
986  sendcounts[pp] = static_cast<int>(numPackets);
987  curPKToffset += numPackets;
988  }
989  size_t curBufferOffset = 0;
990  size_t curLIDoffset = 0;
991  for (Teuchos_Ordinal i = 0; i < numRecvs; ++i) {
992  size_t totalPacketsFrom_i = 0;
993  for (size_t j = 0; j < plan.getLengthsFrom()[i]; ++j) {
994  totalPacketsFrom_i += numImportPacketsPerLID[curLIDoffset + j];
995  }
996  curLIDoffset += plan.getLengthsFrom()[i];
997 
998  rdispls[i] = curBufferOffset;
999  // totalPacketsFrom_i is converted down to int, so make sure it can be
1000  // represented
1001  TEUCHOS_TEST_FOR_EXCEPTION(totalPacketsFrom_i > size_t(INT_MAX),
1002  std::logic_error,
1003  "Tpetra::Distributor::doPosts(3 args, Kokkos): "
1004  "Recv count for receive "
1005  << i << " (" << totalPacketsFrom_i
1006  << ") is too large "
1007  "to be represented as int.");
1008  recvcounts[i] = static_cast<int>(totalPacketsFrom_i);
1009  curBufferOffset += totalPacketsFrom_i;
1010  }
1011 
1012  MPIX_Comm *mpixComm = *plan.getMPIXComm();
1013  const int err = MPIX_Neighbor_alltoallv(
1014  exports.data(), sendcounts.data(), sdispls.data(), rawType,
1015  imports.data(), recvcounts.data(), rdispls.data(), rawType, mpixComm);
1016 
1017  TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error,
1018  "MPIX_Neighbor_alltoallv failed with error \""
1019  << Teuchos::mpiErrorCodeToString(err)
1020  << "\".");
1021 }
1022 
1023 template <class ExpView, class ExpPacketsView, class ImpView, class ImpPacketsView>
1024 void DistributorActor::doPostsNbrAllToAllVKokkos(
1025  const DistributorPlan &plan, const ExpView &exports,
1026  const ExpPacketsView &numExportPacketsPerLID,
1027  const ImpView &imports,
1028  const ImpPacketsView &numImportPacketsPerLID) {
1029  TEUCHOS_TEST_FOR_EXCEPTION(
1030  !plan.getIndicesTo().is_null(), std::runtime_error,
1031  "Send Type=\"Alltoall\" only works for fast-path communication.");
1032 
1033  const Teuchos_Ordinal numSends = plan.getProcsTo().size();
1034  const Teuchos_Ordinal numRecvs = plan.getProcsFrom().size();
1035 
1036  auto comm = plan.getComm();
1037  Kokkos::View<int*, Kokkos::DefaultHostExecutionSpace> sendcounts("sendcounts", comm->getSize());
1038  Kokkos::View<int*, Kokkos::DefaultHostExecutionSpace> sdispls("sdispls", comm->getSize());
1039  Kokkos::View<int*, Kokkos::DefaultHostExecutionSpace> recvcounts("recvcounts", comm->getSize());
1040  Kokkos::View<int*, Kokkos::DefaultHostExecutionSpace> rdispls("rdispls", comm->getSize());
1041 
1042  auto sendcounts_d = Kokkos::create_mirror_view(ExpExecSpace(), sendcounts);
1043  auto sdispls_d = Kokkos::create_mirror_view(ExpExecSpace(), sdispls);
1044  auto recvcounts_d = Kokkos::create_mirror_view(ImpExecSpace(), recvcounts);
1045  auto rdispls_d = Kokkos::create_mirror_view(ImpExecSpace(), rdispls);
1046 
1047  auto getStartsTo = Kokkos::Compat::getKokkosViewDeepCopy<ExpExecSpace>(plan.getStartsTo());
1048  auto getLengthsTo = Kokkos::Compat::getKokkosViewDeepCopy<ExpExecSpace>(plan.getLengthsTo());
1049 
1050  Teuchos::RCP<const Teuchos::MpiComm<int>> mpiComm =
1051  Teuchos::rcp_dynamic_cast<const Teuchos::MpiComm<int>>(comm);
1052  Teuchos::RCP<const Teuchos::OpaqueWrapper<MPI_Comm>> rawComm =
1053  mpiComm->getRawMpiComm();
1054  using T = typename ExpView::non_const_value_type;
1055  using ExpExecSpace = typename ExpPacketsView::execution_space;
1056  using ImpExecSpace = typename ImpPacketsView::execution_space;
1057  MPI_Datatype rawType = ::Tpetra::Details::MpiTypeTraits<T>::getType(T());
1058 
1059  // unlike standard alltoall, entry `i` in sdispls and sendcounts
1060  // refer to the ith participating rank, rather than rank i
1061  Kokkos::parallel_scan(Kokkos::RangePolicy<ExpExecSpace>(0, numSends), KOKKOS_LAMBDA(const Teuchos_Ordinal pp, size_t& curPKToffset, bool is_final) {
1062  sdispls_d(pp) = curPKToffset;
1063  size_t numPackets = 0;
1064  for (size_t j = getStartsTo(pp); j < getStartsTo(pp) + getLengthsTo(pp); ++j) {
1065  numPackets += numExportPacketsPerLID(j);
1066  }
1067  sendcounts_d(pp) = static_cast<int>(numPackets);
1068  curPKToffset += numPackets;
1069  });
1070 
1071  int overflow;
1072  Kokkos::parallel_reduce(Kokkos::RangePolicy<ExpExecSpace>(0, numSends), KOKKOS_LAMBDA(const Teuchos_Ordinal pp, int& index) {
1073  if(sendcounts_d(pp) < 0) {
1074  index = i+1;
1075  }
1076  }, overflow);
1077 
1078  // numPackets is converted down to int, so make sure it can be represented
1079  TEUCHOS_TEST_FOR_EXCEPTION(overflow, std::logic_error,
1080  "Tpetra::Distributor::doPostsKokkos(4 args, Kokkos): "
1081  "Send count for send "
1082  << overflow-1 << " is too large "
1083  "to be represented as int.");
1084 
1085  auto getLengthsFrom = Kokkos::Compat::getKokkosViewDeepCopy<ImpExecSpace>(plan.getLengthsFrom());
1086 
1087  Kokkos::View<size_t*, ImpExecSpace> curLIDoffset("curLIDoffset", numRecvs);
1088  Kokkos::parallel_scan(Kokkos::RangePolicy<ImpExecSpace>(0, numRecvs), KOKKOS_LAMBDA(const Teuchos_Ordinal i, size_t& offset, bool is_final) {
1089  if(is_final) curLIDoffset(i) = offset;
1090  offset += getLengthsFrom(i);
1091  });
1092 
1093  Kokkos::parallel_scan(Kokkos::RangePolicy<ImpExecSpace>(0, numRecvs), KOKKOS_LAMBDA(const Teuchos_Ordinal i, size_t& curBufferOffset, bool is_final) {
1094  rdispls_d(i) = curBufferOffset;
1095  size_t totalPacketsFrom_i = 0;
1096  for(size_t j = 0; j < getLengthsFrom(i); j++) {
1097  totalPacketsFrom_i += numImportPacketsPerLID(curLIDoffset(i) + j);
1098  }
1099 
1100  recvcounts_d(i) = static_cast<int>(totalPacketsFrom_i);
1101  curBufferOffset += totalPacketsFrom_i;
1102  });
1103 
1104  Kokkos::parallel_reduce(Kokkos::RangePolicy<ImpExecSpace>(0, numRecvs), KOKKOS_LAMBDA(const Teuchos_Ordinal i, int& index) {
1105  if(recvcounts_d(pp) < 0) {
1106  index = i+1;
1107  }
1108  }, overflow);
1109 
1110  // totalPacketsFrom_i is converted down to int, so make sure it can be
1111  // represented
1112  TEUCHOS_TEST_FOR_EXCEPTION(overflow, std::logic_error,
1113  "Tpetra::Distributor::doPostsKokkos(4 args, Kokkos): "
1114  "Recv count for receive "
1115  << overflow-1 << ") is too large "
1116  "to be represented as int.");
1117 
1118  Kokkos::deep_copy(sendcounts, sendcounts_d);
1119  Kokkos::deep_copy(sdispls, sdispls_d);
1120  Kokkos::deep_copy(recvcounts, recvcounts_d);
1121  Kokkos::deep_copy(rdispls, rdispls_d);
1122 
1123  MPIX_Comm *mpixComm = *plan.getMPIXComm();
1124  const int err = MPIX_Neighbor_alltoallv(
1125  exports.data(), sendcounts.data(), sdispls.data(), rawType,
1126  imports.data(), recvcounts.data(), rdispls.data(), rawType, mpixComm);
1127 
1128  TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error,
1129  "MPIX_Neighbor_alltoallv failed with error \""
1130  << Teuchos::mpiErrorCodeToString(err)
1131  << "\".");
1132 }
1133 #endif // HAVE_TPETRACORE_MPI_ADVANCE
1134 #endif // HAVE_TPETRA_MPI
1135  // clang-format off
1136 
1137 template <class ExpView, class ImpView>
1138 void DistributorActor::doPosts(const DistributorPlan& plan,
1139  const ExpView &exports,
1140  const Teuchos::ArrayView<const size_t>& numExportPacketsPerLID,
1141  const ImpView &imports,
1142  const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID)
1143 {
1144  static_assert(areKokkosViews<ExpView, ImpView>,
1145  "Data arrays for DistributorActor::doPosts must be Kokkos::Views");
1146  using Teuchos::Array;
1147  using Teuchos::as;
1148  using Teuchos::ireceive;
1149  using Teuchos::isend;
1150  using Teuchos::send;
1151  using Teuchos::TypeNameTraits;
1152  using std::endl;
1153  using Kokkos::Compat::create_const_view;
1154  using Kokkos::Compat::create_view;
1155  using Kokkos::Compat::subview_offset;
1156  using Kokkos::Compat::deep_copy_offset;
1157  typedef Array<size_t>::size_type size_type;
1158  typedef ExpView exports_view_type;
1159  typedef ImpView imports_view_type;
1160 
1161 #ifdef KOKKOS_ENABLE_CUDA
1162  static_assert (! std::is_same<typename ExpView::memory_space, Kokkos::CudaUVMSpace>::value &&
1163  ! std::is_same<typename ImpView::memory_space, Kokkos::CudaUVMSpace>::value,
1164  "Please do not use Tpetra::Distributor with UVM "
1165  "allocations. See GitHub issue #1088.");
1166 #endif // KOKKOS_ENABLE_CUDA
1167 
1168 #ifdef KOKKOS_ENABLE_SYCL
1169  static_assert (! std::is_same<typename ExpView::memory_space, Kokkos::Experimental::SYCLSharedUSMSpace>::value &&
1170  ! std::is_same<typename ImpView::memory_space, Kokkos::Experimental::SYCLSharedUSMSpace>::value,
1171  "Please do not use Tpetra::Distributor with SharedUSM "
1172  "allocations. See GitHub issue #1088 (corresponding to CUDA).");
1173 #endif // KOKKOS_ENABLE_SYCL
1174 
1175 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
1176  Teuchos::TimeMonitor timeMon (*timer_doPosts4KV_);
1177 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
1178 
1179  // Run-time configurable parameters that come from the input
1180  // ParameterList set by setParameterList().
1181  const Details::EDistributorSendType sendType = plan.getSendType();
1182 
1183 #ifdef HAVE_TPETRA_MPI
1184  // All-to-all communication layout is quite different from
1185  // point-to-point, so we handle it separately.
1186  if (sendType == Details::DISTRIBUTOR_ALLTOALL) {
1187  doPostsAllToAll(plan, exports, numExportPacketsPerLID, imports, numImportPacketsPerLID);
1188  return;
1189  }
1190 #ifdef HAVE_TPETRACORE_MPI_ADVANCE
1191  else if (sendType == Details::DISTRIBUTOR_MPIADVANCE_ALLTOALL)
1192  {
1193  doPostsAllToAll(plan, exports, numExportPacketsPerLID, imports, numImportPacketsPerLID);
1194  return;
1195  } else if (sendType == Details::DISTRIBUTOR_MPIADVANCE_NBRALLTOALLV) {
1196  doPostsNbrAllToAllV(plan, exports, numExportPacketsPerLID, imports, numImportPacketsPerLID);
1197  return;
1198  }
1199 #endif
1200 
1201 #else // HAVE_TPETRA_MPI
1202  if (plan.hasSelfMessage()) {
1203 
1204  size_t selfReceiveOffset = 0;
1205 
1206  // setup arrays containing starting-offsets into exports for each send,
1207  // and num-packets-to-send for each send.
1208  Array<size_t> sendPacketOffsets(plan.getNumSends(),0), packetsPerSend(plan.getNumSends(),0);
1209  size_t maxNumPackets = 0;
1210  size_t curPKToffset = 0;
1211  for (size_t pp=0; pp<plan.getNumSends(); ++pp) {
1212  sendPacketOffsets[pp] = curPKToffset;
1213  size_t numPackets = 0;
1214  for (size_t j=plan.getStartsTo()[pp]; j<plan.getStartsTo()[pp]+plan.getLengthsTo()[pp]; ++j) {
1215  numPackets += numExportPacketsPerLID[j];
1216  }
1217  if (numPackets > maxNumPackets) maxNumPackets = numPackets;
1218  packetsPerSend[pp] = numPackets;
1219  curPKToffset += numPackets;
1220  }
1221 
1222  deep_copy_offset(imports, exports, selfReceiveOffset,
1223  sendPacketOffsets[0], packetsPerSend[0]);
1224  }
1225 #endif // HAVE_TPETRA_MPI
1226 
1227  const int myProcID = plan.getComm()->getRank ();
1228  size_t selfReceiveOffset = 0;
1229 
1230 #ifdef HAVE_TPETRA_DEBUG
1231  // Different messages may have different numbers of packets.
1232  size_t totalNumImportPackets = 0;
1233  for (size_type ii = 0; ii < numImportPacketsPerLID.size (); ++ii) {
1234  totalNumImportPackets += numImportPacketsPerLID[ii];
1235  }
1236  TEUCHOS_TEST_FOR_EXCEPTION(
1237  imports.extent (0) < totalNumImportPackets, std::runtime_error,
1238  "Tpetra::Distributor::doPosts(4 args, Kokkos): The 'imports' array must have "
1239  "enough entries to hold the expected number of import packets. "
1240  "imports.extent(0) = " << imports.extent (0) << " < "
1241  "totalNumImportPackets = " << totalNumImportPackets << ".");
1242  TEUCHOS_TEST_FOR_EXCEPTION
1243  (requests_.size () != 0, std::logic_error, "Tpetra::Distributor::"
1244  "doPosts(4 args, Kokkos): Process " << myProcID << ": requests_.size () = "
1245  << requests_.size () << " != 0.");
1246 #endif // HAVE_TPETRA_DEBUG
1247  // Distributor uses requests_.size() as the number of outstanding
1248  // nonblocking message requests, so we resize to zero to maintain
1249  // this invariant.
1250  //
1251  // getNumReceives() does _not_ include the self message, if there is
1252  // one. Here, we do actually send a message to ourselves, so we
1253  // include any self message in the "actual" number of receives to
1254  // post.
1255  //
1256  // NOTE (mfh 19 Mar 2012): Epetra_MpiDistributor::DoPosts()
1257  // doesn't (re)allocate its array of requests. That happens in
1258  // CreateFromSends(), ComputeRecvs_(), DoReversePosts() (on
1259  // demand), or Resize_().
1260  const size_type actualNumReceives = as<size_type> (plan.getNumReceives()) +
1261  as<size_type> (plan.hasSelfMessage() ? 1 : 0);
1262  requests_.resize (0);
1263 
1264  // Post the nonblocking receives. It's common MPI wisdom to post
1265  // receives before sends. In MPI terms, this means favoring
1266  // adding to the "posted queue" (of receive requests) over adding
1267  // to the "unexpected queue" (of arrived messages not yet matched
1268  // with a receive).
1269  {
1270 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
1271  Teuchos::TimeMonitor timeMonRecvs (*timer_doPosts4KV_recvs_);
1272 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
1273 
1274  size_t curBufferOffset = 0;
1275  size_t curLIDoffset = 0;
1276  for (size_type i = 0; i < actualNumReceives; ++i) {
1277  size_t totalPacketsFrom_i = 0;
1278  for (size_t j = 0; j < plan.getLengthsFrom()[i]; ++j) {
1279  totalPacketsFrom_i += numImportPacketsPerLID[curLIDoffset+j];
1280  }
1281  // totalPacketsFrom_i is converted down to int, so make sure it can be represented
1282  TEUCHOS_TEST_FOR_EXCEPTION(totalPacketsFrom_i > size_t(INT_MAX),
1283  std::logic_error, "Tpetra::Distributor::doPosts(3 args, Kokkos): "
1284  "Recv count for receive " << i << " (" << totalPacketsFrom_i << ") is too large "
1285  "to be represented as int.");
1286  curLIDoffset += plan.getLengthsFrom()[i];
1287  if (plan.getProcsFrom()[i] != myProcID && totalPacketsFrom_i) {
1288  // If my process is receiving these packet(s) from another
1289  // process (not a self-receive), and if there is at least
1290  // one packet to receive:
1291  //
1292  // 1. Set up the persisting view (recvBuf) into the imports
1293  // array, given the offset and size (total number of
1294  // packets from process getProcsFrom()[i]).
1295  // 2. Start the Irecv and save the resulting request.
1296  imports_view_type recvBuf =
1297  subview_offset (imports, curBufferOffset, totalPacketsFrom_i);
1298  requests_.push_back (ireceive<int> (recvBuf, plan.getProcsFrom()[i],
1299  mpiTag_, *plan.getComm()));
1300  }
1301  else { // Receiving these packet(s) from myself
1302  selfReceiveOffset = curBufferOffset; // Remember the offset
1303  }
1304  curBufferOffset += totalPacketsFrom_i;
1305  }
1306  }
1307 
1308 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
1309  Teuchos::TimeMonitor timeMonSends (*timer_doPosts4KV_sends_);
1310 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
1311 
1312  // setup arrays containing starting-offsets into exports for each send,
1313  // and num-packets-to-send for each send.
1314  Array<size_t> sendPacketOffsets(plan.getNumSends(),0), packetsPerSend(plan.getNumSends(),0);
1315  size_t maxNumPackets = 0;
1316  size_t curPKToffset = 0;
1317  for (size_t pp=0; pp<plan.getNumSends(); ++pp) {
1318  sendPacketOffsets[pp] = curPKToffset;
1319  size_t numPackets = 0;
1320  for (size_t j=plan.getStartsTo()[pp]; j<plan.getStartsTo()[pp]+plan.getLengthsTo()[pp]; ++j) {
1321  numPackets += numExportPacketsPerLID[j];
1322  }
1323  if (numPackets > maxNumPackets) maxNumPackets = numPackets;
1324  // numPackets will be used as a message length, so make sure it can be represented as int
1325  TEUCHOS_TEST_FOR_EXCEPTION(numPackets > size_t(INT_MAX),
1326  std::logic_error, "Tpetra::Distributor::doPosts(4 args, Kokkos): "
1327  "packetsPerSend[" << pp << "] = " << numPackets << " is too large "
1328  "to be represented as int.");
1329  packetsPerSend[pp] = numPackets;
1330  curPKToffset += numPackets;
1331  }
1332 
1333  // setup scan through getProcsTo() list starting with higher numbered procs
1334  // (should help balance message traffic)
1335  size_t numBlocks = plan.getNumSends() + plan.hasSelfMessage();
1336  size_t procIndex = 0;
1337  while ((procIndex < numBlocks) && (plan.getProcsTo()[procIndex] < myProcID)) {
1338  ++procIndex;
1339  }
1340  if (procIndex == numBlocks) {
1341  procIndex = 0;
1342  }
1343 
1344  size_t selfNum = 0;
1345  size_t selfIndex = 0;
1346  if (plan.getIndicesTo().is_null()) {
1347 
1348 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
1349  Teuchos::TimeMonitor timeMonSends2 (*timer_doPosts4KV_sends_fast_);
1350 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
1351 
1352  // Data are already blocked (laid out) by process, so we don't
1353  // need a separate send buffer (besides the exports array).
1354  for (size_t i = 0; i < numBlocks; ++i) {
1355  size_t p = i + procIndex;
1356  if (p > (numBlocks - 1)) {
1357  p -= numBlocks;
1358  }
1359 
1360  if (plan.getProcsTo()[p] != myProcID && packetsPerSend[p] > 0) {
1361  exports_view_type tmpSend =
1362  subview_offset(exports, sendPacketOffsets[p], packetsPerSend[p]);
1363 
1364  if (sendType == Details::DISTRIBUTOR_ISEND) {
1365  exports_view_type tmpSendBuf =
1366  subview_offset (exports, sendPacketOffsets[p], packetsPerSend[p]);
1367  requests_.push_back (isend<int> (tmpSendBuf, plan.getProcsTo()[p],
1368  mpiTag_, *plan.getComm()));
1369  }
1370  else { // DISTRIBUTOR_SEND
1371  send<int> (tmpSend,
1372  as<int> (tmpSend.size ()),
1373  plan.getProcsTo()[p], mpiTag_, *plan.getComm());
1374  }
1375  }
1376  else { // "Sending" the message to myself
1377  selfNum = p;
1378  }
1379  }
1380 
1381  if (plan.hasSelfMessage()) {
1382  deep_copy_offset(imports, exports, selfReceiveOffset,
1383  sendPacketOffsets[selfNum], packetsPerSend[selfNum]);
1384  }
1385  }
1386  else { // data are not blocked by proc, use send buffer
1387 
1388 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
1389  Teuchos::TimeMonitor timeMonSends2 (*timer_doPosts4KV_sends_slow_);
1390 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
1391 
1392  // FIXME (mfh 05 Mar 2013) This may be broken for Isend.
1393  typedef typename ExpView::non_const_value_type Packet;
1394  typedef typename ExpView::array_layout Layout;
1395  typedef typename ExpView::device_type Device;
1396  typedef typename ExpView::memory_traits Mem;
1397 
1398  // This buffer is long enough for only one message at a time.
1399  // Thus, we use DISTRIBUTOR_SEND always in this case, regardless
1400  // of sendType requested by user.
1401  // This code path formerly errored out with message:
1402  // Tpetra::Distributor::doPosts(4-arg, Kokkos):
1403  // The "send buffer" code path
1404  // doesn't currently work with nonblocking sends.
1405  // Now, we opt to just do the communication in a way that works.
1406 #ifdef HAVE_TPETRA_DEBUG
1407  if (sendType != Details::DISTRIBUTOR_SEND) {
1408  if (plan.getComm()->getRank() == 0)
1409  std::cout << "The requested Tpetra send type "
1410  << DistributorSendTypeEnumToString(sendType)
1411  << " requires Distributor data to be ordered by"
1412  << " the receiving processor rank. Since these"
1413  << " data are not ordered, Tpetra will use Send"
1414  << " instead." << std::endl;
1415  }
1416 #endif
1417 
1418  Kokkos::View<Packet*,Layout,Device,Mem> sendArray ("sendArray",
1419  maxNumPackets);
1420 
1421  Array<size_t> indicesOffsets (numExportPacketsPerLID.size(), 0);
1422  size_t ioffset = 0;
1423  for (int j=0; j<numExportPacketsPerLID.size(); ++j) {
1424  indicesOffsets[j] = ioffset;
1425  ioffset += numExportPacketsPerLID[j];
1426  }
1427 
1428  for (size_t i = 0; i < numBlocks; ++i) {
1429  size_t p = i + procIndex;
1430  if (p > (numBlocks - 1)) {
1431  p -= numBlocks;
1432  }
1433 
1434  if (plan.getProcsTo()[p] != myProcID) {
1435  size_t sendArrayOffset = 0;
1436  size_t j = plan.getStartsTo()[p];
1437  size_t numPacketsTo_p = 0;
1438  for (size_t k = 0; k < plan.getLengthsTo()[p]; ++k, ++j) {
1439  numPacketsTo_p += numExportPacketsPerLID[j];
1440  deep_copy_offset(sendArray, exports, sendArrayOffset,
1441  indicesOffsets[j], numExportPacketsPerLID[j]);
1442  sendArrayOffset += numExportPacketsPerLID[j];
1443  }
1444  typename ExpView::execution_space().fence();
1445 
1446  if (numPacketsTo_p > 0) {
1447  ImpView tmpSend =
1448  subview_offset(sendArray, size_t(0), numPacketsTo_p);
1449 
1450  send<int> (tmpSend,
1451  as<int> (tmpSend.size ()),
1452  plan.getProcsTo()[p], mpiTag_, *plan.getComm());
1453  }
1454  }
1455  else { // "Sending" the message to myself
1456  selfNum = p;
1457  selfIndex = plan.getStartsTo()[p];
1458  }
1459  }
1460 
1461  if (plan.hasSelfMessage()) {
1462  for (size_t k = 0; k < plan.getLengthsTo()[selfNum]; ++k) {
1463  deep_copy_offset(imports, exports, selfReceiveOffset,
1464  indicesOffsets[selfIndex],
1465  numExportPacketsPerLID[selfIndex]);
1466  selfReceiveOffset += numExportPacketsPerLID[selfIndex];
1467  ++selfIndex;
1468  }
1469  }
1470  }
1471 }
1472 
1473 template <class ExpView, class ExpPacketsView, class ImpView, class ImpPacketsView>
1474 void DistributorActor::doPostsKokkos(const DistributorPlan& plan,
1475  const ExpView &exports,
1476  const ExpPacketsView &numExportPacketsPerLID,
1477  const ImpView &imports,
1478  const ImpPacketsView &numImportPacketsPerLID)
1479 {
1480  static_assert(areKokkosViews<ExpView, ImpView>,
1481  "Data arrays for DistributorActor::doPostsKokkos must be Kokkos::Views");
1482  static_assert(areKokkosViews<ExpPacketsView, ImpPacketsView>,
1483  "Num packets arrays for DistributorActor::doPostsKokkos must be Kokkos::Views");
1484  using Teuchos::Array;
1485  using Teuchos::as;
1486  using Teuchos::ireceive;
1487  using Teuchos::isend;
1488  using Teuchos::send;
1489  using Teuchos::TypeNameTraits;
1490  using std::endl;
1491  using Kokkos::Compat::create_const_view;
1492  using Kokkos::Compat::create_view;
1493  using Kokkos::Compat::subview_offset;
1494  using Kokkos::Compat::deep_copy_offset;
1495  using ExpExecSpace = typename ExpPacketsView::execution_space;
1496  using ImpExecSpace = typename ImpPacketsView::execution_space;
1497  typedef Array<size_t>::size_type size_type;
1498  typedef ExpView exports_view_type;
1499  typedef ImpView imports_view_type;
1500 
1501 #ifdef KOKKOS_ENABLE_CUDA
1502  static_assert (! std::is_same<typename ExpView::memory_space, Kokkos::CudaUVMSpace>::value &&
1503  ! std::is_same<typename ImpView::memory_space, Kokkos::CudaUVMSpace>::value,
1504  "Please do not use Tpetra::Distributor with UVM "
1505  "allocations. See GitHub issue #1088.");
1506 #endif // KOKKOS_ENABLE_CUDA
1507 
1508 #ifdef KOKKOS_ENABLE_SYCL
1509  static_assert (! std::is_same<typename ExpView::memory_space, Kokkos::Experimental::SYCLSharedUSMSpace>::value &&
1510  ! std::is_same<typename ImpView::memory_space, Kokkos::Experimental::SYCLSharedUSMSpace>::value,
1511  "Please do not use Tpetra::Distributor with SharedUSM "
1512  "allocations. See GitHub issue #1088 (corresponding to CUDA).");
1513 #endif // KOKKOS_ENABLE_SYCL
1514 
1515 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
1516  Teuchos::TimeMonitor timeMon (*timer_doPosts4KV_);
1517 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
1518 
1519  // Run-time configurable parameters that come from the input
1520  // ParameterList set by setParameterList().
1521  const Details::EDistributorSendType sendType = plan.getSendType();
1522 
1523 #ifdef HAVE_TPETRA_MPI
1524  // All-to-all communication layout is quite different from
1525  // point-to-point, so we handle it separately.
1526  if (sendType == Details::DISTRIBUTOR_ALLTOALL) {
1527  doPostsAllToAllKokkos(plan, exports, numExportPacketsPerLID, imports, numImportPacketsPerLID);
1528  return;
1529  }
1530 #ifdef HAVE_TPETRACORE_MPI_ADVANCE
1531  else if (sendType == Details::DISTRIBUTOR_MPIADVANCE_ALLTOALL)
1532  {
1533  doPostsAllToAllKokkos(plan, exports, numExportPacketsPerLID, imports, numImportPacketsPerLID);
1534  return;
1535  } else if (sendType == Details::DISTRIBUTOR_MPIADVANCE_NBRALLTOALLV) {
1536  doPostsNbrAllToAllVKokkos(plan, exports, numExportPacketsPerLID, imports, numImportPacketsPerLID);
1537  return;
1538  }
1539 #endif
1540 
1541 #else // HAVE_TPETRA_MPI
1542  if (plan.hasSelfMessage()) {
1543  size_t packetsPerSend;
1544  Kokkos::parallel_reduce(Kokkos::RangePolicy<ExpExecSpace>(plan.getStartsTo()[0], plan.getStartsTo()[0]+plan.getLengthsTo()[0]), KOKKOS_LAMBDA(const size_t j, size_t& packets) {
1545  packets += numExportPacketsPerLID(j);
1546  }, packetsPerSend);
1547 
1548  deep_copy_offset(imports, exports, (size_t)0, (size_t)0, packetsPerSend);
1549  }
1550 #endif // HAVE_TPETRA_MPI
1551 
1552  const int myProcID = plan.getComm()->getRank ();
1553  size_t selfReceiveOffset = 0;
1554 
1555 #ifdef HAVE_TPETRA_DEBUG
1556  // Different messages may have different numbers of packets.
1557  size_t totalNumImportPackets = Kokkos::Experimental::reduce(ImpExecSpace(), numImportPacketsPerLID);
1558  TEUCHOS_TEST_FOR_EXCEPTION(
1559  imports.extent (0) < totalNumImportPackets, std::runtime_error,
1560  "Tpetra::Distributor::doPostsKokkos(4 args, Kokkos): The 'imports' array must have "
1561  "enough entries to hold the expected number of import packets. "
1562  "imports.extent(0) = " << imports.extent (0) << " < "
1563  "totalNumImportPackets = " << totalNumImportPackets << ".");
1564  TEUCHOS_TEST_FOR_EXCEPTION
1565  (requests_.size () != 0, std::logic_error, "Tpetra::Distributor::"
1566  "doPostsKokkos(4 args, Kokkos): Process " << myProcID << ": requests_.size () = "
1567  << requests_.size () << " != 0.");
1568 #endif // HAVE_TPETRA_DEBUG
1569  // Distributor uses requests_.size() as the number of outstanding
1570  // nonblocking message requests, so we resize to zero to maintain
1571  // this invariant.
1572  //
1573  // getNumReceives() does _not_ include the self message, if there is
1574  // one. Here, we do actually send a message to ourselves, so we
1575  // include any self message in the "actual" number of receives to
1576  // post.
1577  //
1578  // NOTE (mfh 19 Mar 2012): Epetra_MpiDistributor::DoPosts()
1579  // doesn't (re)allocate its array of requests. That happens in
1580  // CreateFromSends(), ComputeRecvs_(), DoReversePosts() (on
1581  // demand), or Resize_().
1582  const size_type actualNumReceives = as<size_type> (plan.getNumReceives()) +
1583  as<size_type> (plan.hasSelfMessage() ? 1 : 0);
1584  requests_.resize (0);
1585 
1586  // Post the nonblocking receives. It's common MPI wisdom to post
1587  // receives before sends. In MPI terms, this means favoring
1588  // adding to the "posted queue" (of receive requests) over adding
1589  // to the "unexpected queue" (of arrived messages not yet matched
1590  // with a receive).
1591  {
1592 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
1593  Teuchos::TimeMonitor timeMonRecvs (*timer_doPosts4KV_recvs_);
1594 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
1595 
1596  size_t curBufferOffset = 0;
1597  size_t curLIDoffset = 0;
1598  for (size_type i = 0; i < actualNumReceives; ++i) {
1599  size_t totalPacketsFrom_i = 0;
1600  Kokkos::parallel_reduce(Kokkos::RangePolicy<ImpExecSpace>(0, plan.getLengthsFrom()[i]), KOKKOS_LAMBDA(const size_t j, size_t& total) {
1601  total += numImportPacketsPerLID(curLIDoffset+j);
1602  }, totalPacketsFrom_i);
1603  // totalPacketsFrom_i is converted down to int, so make sure it can be represented
1604  TEUCHOS_TEST_FOR_EXCEPTION(totalPacketsFrom_i > size_t(INT_MAX),
1605  std::logic_error, "Tpetra::Distributor::doPostsKokkos(3 args, Kokkos): "
1606  "Recv count for receive " << i << " (" << totalPacketsFrom_i << ") is too large "
1607  "to be represented as int.");
1608  curLIDoffset += plan.getLengthsFrom()[i];
1609  if (plan.getProcsFrom()[i] != myProcID && totalPacketsFrom_i) {
1610  // If my process is receiving these packet(s) from another
1611  // process (not a self-receive), and if there is at least
1612  // one packet to receive:
1613  //
1614  // 1. Set up the persisting view (recvBuf) into the imports
1615  // array, given the offset and size (total number of
1616  // packets from process getProcsFrom()[i]).
1617  // 2. Start the Irecv and save the resulting request.
1618  imports_view_type recvBuf =
1619  subview_offset (imports, curBufferOffset, totalPacketsFrom_i);
1620  requests_.push_back (ireceive<int> (recvBuf, plan.getProcsFrom()[i],
1621  mpiTag_, *plan.getComm()));
1622  }
1623  else { // Receiving these packet(s) from myself
1624  selfReceiveOffset = curBufferOffset; // Remember the offset
1625  }
1626  curBufferOffset += totalPacketsFrom_i;
1627  }
1628  }
1629 
1630 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
1631  Teuchos::TimeMonitor timeMonSends (*timer_doPosts4KV_sends_);
1632 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
1633 
1634  // setup views containing starting-offsets into exports for each send,
1635  // and num-packets-to-send for each send.
1636  Kokkos::View<size_t*, Kokkos::DefaultHostExecutionSpace> sendPacketOffsets("sendPacketOffsets", plan.getNumSends());
1637  Kokkos::View<size_t*, Kokkos::DefaultHostExecutionSpace> packetsPerSend("packetsPerSend", plan.getNumSends());
1638  auto sendPacketOffsets_d = Kokkos::create_mirror_view(ExpExecSpace(), sendPacketOffsets);
1639  auto packetsPerSend_d = Kokkos::create_mirror_view(ExpExecSpace(), packetsPerSend);
1640 
1641  auto starts = Kokkos::Compat::getKokkosViewDeepCopy<ExpExecSpace>(plan.getStartsTo());
1642  auto lengths = Kokkos::Compat::getKokkosViewDeepCopy<ExpExecSpace>(plan.getLengthsTo());
1643 
1644  Kokkos::parallel_scan(Kokkos::RangePolicy<ExpExecSpace>(0, plan.getNumSends()), KOKKOS_LAMBDA(const size_t pp, size_t& curPKToffset, bool final_pass) {
1645  if(final_pass) sendPacketOffsets_d(pp) = curPKToffset;
1646  size_t numPackets = 0;
1647  for(size_t j = starts(pp); j < starts(pp) + lengths(pp); j++) {
1648  numPackets += numExportPacketsPerLID(j);
1649  }
1650  if(final_pass) packetsPerSend_d(pp) = numPackets;
1651  curPKToffset += numPackets;
1652  });
1653 
1654  size_t maxNumPackets;
1655  Kokkos::parallel_reduce(Kokkos::RangePolicy<ExpExecSpace>(0, plan.getNumSends()), KOKKOS_LAMBDA(const size_t pp, size_t& max) {
1656  if(packetsPerSend_d(pp) > max) {
1657  max = packetsPerSend_d(pp);
1658  }
1659  }, Kokkos::Max<size_t>(maxNumPackets));
1660 
1661  // numPackets will be used as a message length, so make sure it can be represented as int
1662  TEUCHOS_TEST_FOR_EXCEPTION(maxNumPackets > size_t(INT_MAX),
1663  std::logic_error, "Tpetra::Distributor::doPostsKokkos(4 args, Kokkos): "
1664  "numPackets = " << maxNumPackets << " is too large "
1665  "to be represented as int.");
1666 
1667  Kokkos::deep_copy(sendPacketOffsets, sendPacketOffsets_d);
1668  Kokkos::deep_copy(packetsPerSend, packetsPerSend_d);
1669 
1670  // setup scan through getProcsTo() list starting with higher numbered procs
1671  // (should help balance message traffic)
1672  size_t numBlocks = plan.getNumSends() + plan.hasSelfMessage();
1673  size_t procIndex = 0;
1674  while ((procIndex < numBlocks) && (plan.getProcsTo()[procIndex] < myProcID)) {
1675  ++procIndex;
1676  }
1677  if (procIndex == numBlocks) {
1678  procIndex = 0;
1679  }
1680 
1681  size_t selfNum = 0;
1682  size_t selfIndex = 0;
1683  if (plan.getIndicesTo().is_null()) {
1684 
1685 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
1686  Teuchos::TimeMonitor timeMonSends2 (*timer_doPosts4KV_sends_fast_);
1687 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
1688 
1689  // Data are already blocked (laid out) by process, so we don't
1690  // need a separate send buffer (besides the exports array).
1691  for (size_t i = 0; i < numBlocks; ++i) {
1692  size_t p = i + procIndex;
1693  if (p > (numBlocks - 1)) {
1694  p -= numBlocks;
1695  }
1696 
1697  if (plan.getProcsTo()[p] != myProcID && packetsPerSend[p] > 0) {
1698  exports_view_type tmpSend =
1699  subview_offset(exports, sendPacketOffsets[p], packetsPerSend[p]);
1700 
1701  if (sendType == Details::DISTRIBUTOR_ISEND) {
1702  exports_view_type tmpSendBuf =
1703  subview_offset (exports, sendPacketOffsets[p], packetsPerSend[p]);
1704  requests_.push_back (isend<int> (tmpSendBuf, plan.getProcsTo()[p],
1705  mpiTag_, *plan.getComm()));
1706  }
1707  else { // DISTRIBUTOR_SEND
1708  send<int> (tmpSend,
1709  as<int> (tmpSend.size ()),
1710  plan.getProcsTo()[p], mpiTag_, *plan.getComm());
1711  }
1712  }
1713  else { // "Sending" the message to myself
1714  selfNum = p;
1715  }
1716  }
1717 
1718  if (plan.hasSelfMessage()) {
1719  deep_copy_offset(imports, exports, selfReceiveOffset,
1720  sendPacketOffsets[selfNum], packetsPerSend[selfNum]);
1721  }
1722  }
1723  else { // data are not blocked by proc, use send buffer
1724 
1725 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
1726  Teuchos::TimeMonitor timeMonSends2 (*timer_doPosts4KV_sends_slow_);
1727 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
1728 
1729  // FIXME (mfh 05 Mar 2013) This may be broken for Isend.
1730  typedef typename ExpView::non_const_value_type Packet;
1731  typedef typename ExpView::array_layout Layout;
1732  typedef typename ExpView::device_type Device;
1733  typedef typename ExpView::memory_traits Mem;
1734 
1735  // This buffer is long enough for only one message at a time.
1736  // Thus, we use DISTRIBUTOR_SEND always in this case, regardless
1737  // of sendType requested by user.
1738  // This code path formerly errored out with message:
1739  // Tpetra::Distributor::doPostsKokkos(4-arg, Kokkos):
1740  // The "send buffer" code path
1741  // doesn't currently work with nonblocking sends.
1742  // Now, we opt to just do the communication in a way that works.
1743 #ifdef HAVE_TPETRA_DEBUG
1744  if (sendType != Details::DISTRIBUTOR_SEND) {
1745  if (plan.getComm()->getRank() == 0)
1746  std::cout << "The requested Tpetra send type "
1747  << DistributorSendTypeEnumToString(sendType)
1748  << " requires Distributor data to be ordered by"
1749  << " the receiving processor rank. Since these"
1750  << " data are not ordered, Tpetra will use Send"
1751  << " instead." << std::endl;
1752  }
1753 #endif
1754 
1755  Kokkos::View<Packet*,Layout,Device,Mem> sendArray ("sendArray",
1756  maxNumPackets);
1757 
1758  Kokkos::View<size_t*, ExpExecSpace> indicesOffsets ("indicesOffsets", numExportPacketsPerLID.extent(0));
1759  size_t ioffset = 0;
1760  Kokkos::parallel_scan(Kokkos::RangePolicy<ExpExecSpace>(0, numExportPacketsPerLID.extent(0)), KOKKOS_LAMBDA(const size_t j, size_t& offset, bool is_final) {
1761  if(is_final) indicesOffsets(j) = offset;
1762  offset += numExportPacketsPerLID(j);
1763  }, ioffset);
1764 
1765  for (size_t i = 0; i < numBlocks; ++i) {
1766  size_t p = i + procIndex;
1767  if (p > (numBlocks - 1)) {
1768  p -= numBlocks;
1769  }
1770 
1771  if (plan.getProcsTo()[p] != myProcID) {
1772  size_t j = plan.getStartsTo()[p];
1773  size_t numPacketsTo_p = 0;
1774  //mirror in case execspaces are different
1775  auto sendArrayMirror = Kokkos::create_mirror_view_and_copy(ExpExecSpace(), sendArray);
1776  auto exportsMirror = Kokkos::create_mirror_view_and_copy(ExpExecSpace(), exports);
1777  Kokkos::parallel_scan(Kokkos::RangePolicy<ExpExecSpace>(0, plan.getLengthsTo()[p]), KOKKOS_LAMBDA(const size_t k, size_t& offset, bool is_final) {
1778  if(is_final) {
1779  const size_t dst_end = offset + numExportPacketsPerLID(j + k);
1780  const size_t src_end = indicesOffsets(j + k) + numExportPacketsPerLID(j + k);
1781  auto dst_sub = Kokkos::subview(sendArrayMirror, Kokkos::make_pair(offset, dst_end));
1782  auto src_sub = Kokkos::subview(exportsMirror, Kokkos::make_pair(indicesOffsets(j + k), src_end));
1783  Kokkos::Experimental::local_deep_copy(dst_sub, src_sub);
1784  }
1785  offset += numExportPacketsPerLID(j + k);
1786  }, numPacketsTo_p);
1787  Kokkos::deep_copy(sendArray, sendArrayMirror);
1788  typename ExpView::execution_space().fence();
1789 
1790  if (numPacketsTo_p > 0) {
1791  ImpView tmpSend =
1792  subview_offset(sendArray, size_t(0), numPacketsTo_p);
1793 
1794  send<int> (tmpSend,
1795  as<int> (tmpSend.size ()),
1796  plan.getProcsTo()[p], mpiTag_, *plan.getComm());
1797  }
1798  }
1799  else { // "Sending" the message to myself
1800  selfNum = p;
1801  selfIndex = plan.getStartsTo()[p];
1802  }
1803  }
1804 
1805  if (plan.hasSelfMessage()) {
1806  //mirror in case execspaces are different
1807  auto importsMirror = Kokkos::create_mirror_view_and_copy(ExpExecSpace(), imports);
1808  auto exportsMirror = Kokkos::create_mirror_view_and_copy(ExpExecSpace(), exports);
1809  size_t temp;
1810  Kokkos::parallel_scan(Kokkos::RangePolicy<ExpExecSpace>(0, plan.getLengthsTo()[selfNum]), KOKKOS_LAMBDA(const size_t k, size_t& offset, bool is_final) {
1811  if(is_final) {
1812  const size_t dst_end = selfReceiveOffset + offset + numExportPacketsPerLID(selfIndex + k);
1813  const size_t src_end = indicesOffsets(selfIndex + k) + numExportPacketsPerLID(selfIndex + k);
1814  auto dst_sub = Kokkos::subview(importsMirror, Kokkos::make_pair(selfReceiveOffset + offset, dst_end));
1815  auto src_sub = Kokkos::subview(exportsMirror, Kokkos::make_pair(indicesOffsets(selfIndex + k), src_end));
1816  Kokkos::Experimental::local_deep_copy(dst_sub, src_sub);
1817  }
1818  offset += numExportPacketsPerLID(selfIndex + k);
1819  }, temp);
1820  Kokkos::deep_copy(imports, importsMirror);
1821  selfIndex += plan.getLengthsTo()[selfNum];
1822  selfReceiveOffset += temp;
1823  }
1824  }
1825 }
1826 
1827 }
1828 }
1829 
1830 #endif
Add specializations of Teuchos::Details::MpiTypeTraits for Kokkos::complex&lt;float&gt; and Kokkos::complex...
std::string DistributorSendTypeEnumToString(EDistributorSendType sendType)
Convert an EDistributorSendType enum value to a string.
void deep_copy(MultiVector< DS, DL, DG, DN > &dst, const MultiVector< SS, SL, SG, SN > &src)
Copy the contents of the MultiVector src into dst.
std::string DistributorHowInitializedEnumToString(EDistributorHowInitialized how)
Convert an EDistributorHowInitialized enum value to a string.
Stand-alone utility functions and macros.
EDistributorSendType
The type of MPI send that Distributor should use.