Tpetra parallel linear algebra  Version of the Day
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
Tpetra_Details_DistributorActor.hpp
1 // @HEADER
2 // *****************************************************************************
3 // Tpetra: Templated Linear Algebra Services Package
4 //
5 // Copyright 2008 NTESS and the Tpetra contributors.
6 // SPDX-License-Identifier: BSD-3-Clause
7 // *****************************************************************************
8 // @HEADER
9 
10 // clang-format off
11 
12 #ifndef TPETRA_DETAILS_DISTRIBUTOR_ACTOR_HPP
13 #define TPETRA_DETAILS_DISTRIBUTOR_ACTOR_HPP
14 
16 #include "Tpetra_Util.hpp"
17 
18 #include "Teuchos_Array.hpp"
19 #include "Teuchos_Comm.hpp"
21 #include "Teuchos_RCP.hpp"
22 #include "Teuchos_Time.hpp"
23 
24 #include "Kokkos_TeuchosCommAdapters.hpp"
25 
26 #ifdef HAVE_TPETRA_MPI
27 #include "mpi.h"
28 #endif
29 
30 namespace Tpetra {
31 namespace Details {
32 
33 template <class View1, class View2>
34 constexpr bool areKokkosViews = Kokkos::is_view<View1>::value && Kokkos::is_view<View2>::value;
35 
36 class DistributorActor {
37  static constexpr int DEFAULT_MPI_TAG = 1;
38 
39 public:
40  DistributorActor();
41  DistributorActor(const DistributorActor& otherActor);
42 
43  template <class ExpView, class ImpView>
44  void doPostsAndWaits(const DistributorPlan& plan,
45  const ExpView &exports,
46  size_t numPackets,
47  const ImpView &imports);
48 
49  template <class ExpView, class ImpView>
50  void doPostsAndWaits(const DistributorPlan& plan,
51  const ExpView &exports,
52  const Teuchos::ArrayView<const size_t>& numExportPacketsPerLID,
53  const ImpView &imports,
54  const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID);
55 
56  template <class ExpView, class ImpView>
57  void doPosts(const DistributorPlan& plan,
58  const ExpView& exports,
59  size_t numPackets,
60  const ImpView& imports);
61 
62  template <class ExpView, class ImpView>
63  void doPosts(const DistributorPlan& plan,
64  const ExpView &exports,
65  const Teuchos::ArrayView<const size_t>& numExportPacketsPerLID,
66  const ImpView &imports,
67  const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID);
68 
69  void doWaits(const DistributorPlan& plan);
70 
71  void doWaitsRecv(const DistributorPlan& plan);
72 
73  void doWaitsSend(const DistributorPlan& plan);
74 
75  bool isReady() const;
76 
77 private:
78 // clang-format on
79 #ifdef HAVE_TPETRA_MPI
80  template <class ExpView, class ImpView>
81  void doPostsAllToAll(const DistributorPlan &plan, const ExpView &exports,
82  size_t numPackets, const ImpView &imports);
83 
84  template <class ExpView, class ImpView>
85  void doPostsAllToAll(
86  const DistributorPlan &plan, const ExpView &exports,
87  const Teuchos::ArrayView<const size_t> &numExportPacketsPerLID,
88  const ImpView &imports,
89  const Teuchos::ArrayView<const size_t> &numImportPacketsPerLID);
90 
91 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
92  template <class ExpView, class ImpView>
93  void doPostsNbrAllToAllV(const DistributorPlan &plan, const ExpView &exports,
94  size_t numPackets, const ImpView &imports);
95 
96  template <class ExpView, class ImpView>
97  void doPostsNbrAllToAllV(
98  const DistributorPlan &plan, const ExpView &exports,
99  const Teuchos::ArrayView<const size_t> &numExportPacketsPerLID,
100  const ImpView &imports,
101  const Teuchos::ArrayView<const size_t> &numImportPacketsPerLID);
102 #endif // HAVE_TPETRACORE_MPI_ADVANCE
103 #endif // HAVE_TPETRA_CORE
104  // clang-format off
105  int mpiTag_;
106 
107  Teuchos::Array<Teuchos::RCP<Teuchos::CommRequest<int>>> requestsRecv_;
108  Teuchos::Array<Teuchos::RCP<Teuchos::CommRequest<int>>> requestsSend_;
109 
110 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
111  Teuchos::RCP<Teuchos::Time> timer_doPosts3KV_;
112  Teuchos::RCP<Teuchos::Time> timer_doPosts4KV_;
113  Teuchos::RCP<Teuchos::Time> timer_doWaitsRecv_;
114  Teuchos::RCP<Teuchos::Time> timer_doWaitsSend_;
115  Teuchos::RCP<Teuchos::Time> timer_doPosts3KV_recvs_;
116  Teuchos::RCP<Teuchos::Time> timer_doPosts4KV_recvs_;
117  Teuchos::RCP<Teuchos::Time> timer_doPosts3KV_barrier_;
118  Teuchos::RCP<Teuchos::Time> timer_doPosts4KV_barrier_;
119  Teuchos::RCP<Teuchos::Time> timer_doPosts3KV_sends_;
120  Teuchos::RCP<Teuchos::Time> timer_doPosts4KV_sends_;
121  Teuchos::RCP<Teuchos::Time> timer_doPosts3KV_sends_slow_;
122  Teuchos::RCP<Teuchos::Time> timer_doPosts4KV_sends_slow_;
123  Teuchos::RCP<Teuchos::Time> timer_doPosts3KV_sends_fast_;
124  Teuchos::RCP<Teuchos::Time> timer_doPosts4KV_sends_fast_;
125 
127  void makeTimers();
128 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
129 };
130 
131 template <class ExpView, class ImpView>
132 void DistributorActor::doPostsAndWaits(const DistributorPlan& plan,
133  const ExpView& exports,
134  size_t numPackets,
135  const ImpView& imports)
136 {
137  static_assert(areKokkosViews<ExpView, ImpView>,
138  "Data arrays for DistributorActor::doPostsAndWaits must be Kokkos::Views");
139  doPosts(plan, exports, numPackets, imports);
140  doWaits(plan);
141 }
142 
143 template <class ExpView, class ImpView>
144 void DistributorActor::doPostsAndWaits(const DistributorPlan& plan,
145  const ExpView& exports,
146  const Teuchos::ArrayView<const size_t>& numExportPacketsPerLID,
147  const ImpView& imports,
148  const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID)
149 {
150  static_assert(areKokkosViews<ExpView, ImpView>,
151  "Data arrays for DistributorActor::doPostsAndWaits must be Kokkos::Views");
152  doPosts(plan, exports, numExportPacketsPerLID, imports, numImportPacketsPerLID);
153  doWaits(plan);
154 }
155 
156 template <typename ViewType>
157 using HostAccessibility = Kokkos::SpaceAccessibility<Kokkos::DefaultHostExecutionSpace, typename ViewType::memory_space>;
158 
159 template <typename DstViewType, typename SrcViewType>
160 using enableIfHostAccessible = std::enable_if_t<HostAccessibility<DstViewType>::accessible &&
161  HostAccessibility<SrcViewType>::accessible>;
162 
163 template <typename DstViewType, typename SrcViewType>
164 using enableIfNotHostAccessible = std::enable_if_t<!HostAccessibility<DstViewType>::accessible ||
165  !HostAccessibility<SrcViewType>::accessible>;
166 
167 template <typename DstViewType, typename SrcViewType>
168 enableIfHostAccessible<DstViewType, SrcViewType>
169 packOffset(const DstViewType& dst,
170  const SrcViewType& src,
171  const size_t dst_offset,
172  const size_t src_offset,
173  const size_t size)
174 {
175  memcpy((void*) (dst.data()+dst_offset), src.data()+src_offset, size*sizeof(typename DstViewType::value_type));
176 }
177 
178 template <typename DstViewType, typename SrcViewType>
179 enableIfNotHostAccessible<DstViewType, SrcViewType>
180 packOffset(const DstViewType& dst,
181  const SrcViewType& src,
182  const size_t dst_offset,
183  const size_t src_offset,
184  const size_t size)
185 {
186  Kokkos::Compat::deep_copy_offset(dst, src, dst_offset, src_offset, size);
187 }
188 
189 // clang-format on
190 #ifdef HAVE_TPETRA_MPI
191 template <class ExpView, class ImpView>
192 void DistributorActor::doPostsAllToAll(const DistributorPlan &plan,
193  const ExpView &exports,
194  size_t numPackets,
195  const ImpView &imports) {
196  using size_type = Teuchos::Array<size_t>::size_type;
197 
198  TEUCHOS_TEST_FOR_EXCEPTION(
199  !plan.getIndicesTo().is_null(), std::runtime_error,
200  "Send Type=\"Alltoall\" only works for fast-path communication.");
201 
202  auto comm = plan.getComm();
203  const int myRank = comm->getRank();
204  std::vector<int> sendcounts(comm->getSize(), 0);
205  std::vector<int> sdispls(comm->getSize(), 0);
206  std::vector<int> recvcounts(comm->getSize(), 0);
207  std::vector<int> rdispls(comm->getSize(), 0);
208 
209  size_t numBlocks = plan.getNumSends() + plan.hasSelfMessage();
210  for (size_t p = 0; p < numBlocks; ++p) {
211  sdispls[plan.getProcsTo()[p]] = plan.getStartsTo()[p] * numPackets;
212  size_t sendcount = plan.getLengthsTo()[p] * numPackets;
213  // sendcount is converted down to int, so make sure it can be represented
214  TEUCHOS_TEST_FOR_EXCEPTION(sendcount > size_t(INT_MAX), std::logic_error,
215  "Tpetra::Distributor::doPosts(3 args, Kokkos): "
216  "Send count for block "
217  << p << " (" << sendcount
218  << ") is too large "
219  "to be represented as int.");
220  sendcounts[plan.getProcsTo()[p]] = static_cast<int>(sendcount);
221  }
222 
223  const size_type actualNumReceives =
224  Teuchos::as<size_type>(plan.getNumReceives()) +
225  Teuchos::as<size_type>(plan.hasSelfMessage() ? 1 : 0);
226  size_t curBufferOffset = 0;
227  for (size_type i = 0; i < actualNumReceives; ++i) {
228  const size_t curBufLen = plan.getLengthsFrom()[i] * numPackets;
229  TEUCHOS_TEST_FOR_EXCEPTION(
230  curBufferOffset + curBufLen > static_cast<size_t>(imports.size()),
231  std::logic_error,
232  "Tpetra::Distributor::doPosts(3 args, Kokkos): "
233  "Exceeded size of 'imports' array in packing loop on Process "
234  << myRank << ". imports.size() = " << imports.size()
235  << " < "
236  "curBufferOffset("
237  << curBufferOffset << ") + curBufLen(" << curBufLen << ").");
238  rdispls[plan.getProcsFrom()[i]] = curBufferOffset;
239  // curBufLen is converted down to int, so make sure it can be represented
240  TEUCHOS_TEST_FOR_EXCEPTION(curBufLen > size_t(INT_MAX), std::logic_error,
241  "Tpetra::Distributor::doPosts(3 args, Kokkos): "
242  "Recv count for receive "
243  << i << " (" << curBufLen
244  << ") is too large "
245  "to be represented as int.");
246  recvcounts[plan.getProcsFrom()[i]] = static_cast<int>(curBufLen);
247  curBufferOffset += curBufLen;
248  }
249 
250  using T = typename ExpView::non_const_value_type;
251  MPI_Datatype rawType = ::Tpetra::Details::MpiTypeTraits<T>::getType(T());
252 
253 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
254  if (Details::DISTRIBUTOR_MPIADVANCE_ALLTOALL == plan.getSendType()) {
255  MPIX_Comm *mpixComm = *plan.getMPIXComm();
256  TEUCHOS_TEST_FOR_EXCEPTION(
257  !mpixComm, std::runtime_error,
258  "plan's MPIX_Comm null in doPostsAllToAll, but "
259  "DISTRIBUTOR_MPIADVANCE_ALLTOALL set: plan.howInitialized()="
260  << DistributorHowInitializedEnumToString(plan.howInitialized()));
261 
262  const int err = MPIX_Alltoallv(
263  exports.data(), sendcounts.data(), sdispls.data(), rawType,
264  imports.data(), recvcounts.data(), rdispls.data(), rawType, mpixComm);
265 
266  TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error,
267  "MPIX_Alltoallv failed with error \""
268  << Teuchos::mpiErrorCodeToString(err)
269  << "\".");
270 
271  return;
272  }
273 #endif
274  Teuchos::RCP<const Teuchos::MpiComm<int>> mpiComm =
275  Teuchos::rcp_dynamic_cast<const Teuchos::MpiComm<int>>(comm);
276  Teuchos::RCP<const Teuchos::OpaqueWrapper<MPI_Comm>> rawComm =
277  mpiComm->getRawMpiComm();
278 
279  const int err = MPI_Alltoallv(
280  exports.data(), sendcounts.data(), sdispls.data(), rawType,
281  imports.data(), recvcounts.data(), rdispls.data(), rawType, (*rawComm)());
282 
283  TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error,
284  "MPI_Alltoallv failed with error \""
285  << Teuchos::mpiErrorCodeToString(err)
286  << "\".");
287 
288  return;
289 }
290 
291 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
292 template <class ExpView, class ImpView>
293 void DistributorActor::doPostsNbrAllToAllV(const DistributorPlan &plan,
294  const ExpView &exports,
295  size_t numPackets,
296  const ImpView &imports) {
297  TEUCHOS_TEST_FOR_EXCEPTION(
298  !plan.getIndicesTo().is_null(), std::runtime_error,
299  "Send Type=\"Alltoall\" only works for fast-path communication.");
300 
301  const int myRank = plan.getComm()->getRank();
302  MPIX_Comm *mpixComm = *plan.getMPIXComm();
303 
304  const size_t numSends = plan.getNumSends() + plan.hasSelfMessage();
305  const size_t numRecvs = plan.getNumReceives() + plan.hasSelfMessage();
306  std::vector<int> sendcounts(numSends, 0);
307  std::vector<int> sdispls(numSends, 0);
308  std::vector<int> recvcounts(numRecvs, 0);
309  std::vector<int> rdispls(numRecvs, 0);
310 
311  for (size_t p = 0; p < numSends; ++p) {
312  sdispls[p] = plan.getStartsTo()[p] * numPackets;
313  const size_t sendcount = plan.getLengthsTo()[p] * numPackets;
314  // sendcount is converted down to int, so make sure it can be represented
315  TEUCHOS_TEST_FOR_EXCEPTION(sendcount > size_t(INT_MAX), std::logic_error,
316  "Tpetra::Distributor::doPosts(3 args, Kokkos): "
317  "Send count for block "
318  << p << " (" << sendcount
319  << ") is too large "
320  "to be represented as int.");
321  sendcounts[p] = static_cast<int>(sendcount);
322  }
323 
324  size_t curBufferOffset = 0;
325  for (size_t i = 0; i < numRecvs; ++i) {
326  const size_t curBufLen = plan.getLengthsFrom()[i] * numPackets;
327  TEUCHOS_TEST_FOR_EXCEPTION(
328  curBufferOffset + curBufLen > static_cast<size_t>(imports.size()),
329  std::logic_error,
330  "Tpetra::Distributor::doPosts(3 args, Kokkos): "
331  "Exceeded size of 'imports' array in packing loop on Process "
332  << myRank << ". imports.size() = " << imports.size()
333  << " < "
334  "curBufferOffset("
335  << curBufferOffset << ") + curBufLen(" << curBufLen << ").");
336  rdispls[i] = curBufferOffset;
337  // curBufLen is converted down to int, so make sure it can be represented
338  TEUCHOS_TEST_FOR_EXCEPTION(curBufLen > size_t(INT_MAX), std::logic_error,
339  "Tpetra::Distributor::doPosts(3 args, Kokkos): "
340  "Recv count for receive "
341  << i << " (" << curBufLen
342  << ") is too large "
343  "to be represented as int.");
344  recvcounts[i] = static_cast<int>(curBufLen);
345  curBufferOffset += curBufLen;
346  }
347 
348  using T = typename ExpView::non_const_value_type;
349  MPI_Datatype rawType = ::Tpetra::Details::MpiTypeTraits<T>::getType(T());
350 
351  const int err = MPIX_Neighbor_alltoallv(
352  exports.data(), sendcounts.data(), sdispls.data(), rawType,
353  imports.data(), recvcounts.data(), rdispls.data(), rawType, mpixComm);
354 
355  TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error,
356  "MPIX_Neighbor_alltoallv failed with error \""
357  << Teuchos::mpiErrorCodeToString(err)
358  << "\".");
359 }
360 #endif // HAVE_TPETRACORE_MPI_ADVANCE
361 #endif // HAVE_TPETRA_MPI
362 // clang-format off
363 
364 template <class ExpView, class ImpView>
365 void DistributorActor::doPosts(const DistributorPlan& plan,
366  const ExpView& exports,
367  size_t numPackets,
368  const ImpView& imports)
369 {
370  static_assert(areKokkosViews<ExpView, ImpView>,
371  "Data arrays for DistributorActor::doPosts must be Kokkos::Views");
372  using Teuchos::Array;
373  using Teuchos::as;
374  using Teuchos::FancyOStream;
375  using Teuchos::includesVerbLevel;
376  using Teuchos::ireceive;
377  using Teuchos::isend;
378  using Teuchos::send;
379  using Teuchos::TypeNameTraits;
380  using Teuchos::typeName;
381  using std::endl;
382  using Kokkos::Compat::create_const_view;
383  using Kokkos::Compat::create_view;
384  using Kokkos::Compat::subview_offset;
385  using Kokkos::Compat::deep_copy_offset;
386  typedef Array<size_t>::size_type size_type;
387  typedef ExpView exports_view_type;
388  typedef ImpView imports_view_type;
389 
390 #ifdef KOKKOS_ENABLE_CUDA
391  static_assert
392  (! std::is_same<typename ExpView::memory_space, Kokkos::CudaUVMSpace>::value &&
393  ! std::is_same<typename ImpView::memory_space, Kokkos::CudaUVMSpace>::value,
394  "Please do not use Tpetra::Distributor with UVM allocations. "
395  "See Trilinos GitHub issue #1088.");
396 #endif // KOKKOS_ENABLE_CUDA
397 
398 #ifdef KOKKOS_ENABLE_SYCL
399  static_assert
400  (! std::is_same<typename ExpView::memory_space, Kokkos::Experimental::SYCLSharedUSMSpace>::value &&
401  ! std::is_same<typename ImpView::memory_space, Kokkos::Experimental::SYCLSharedUSMSpace>::value,
402  "Please do not use Tpetra::Distributor with SharedUSM allocations. "
403  "See Trilinos GitHub issue #1088 (corresponding to CUDA).");
404 #endif // KOKKOS_ENABLE_SYCL
405 
406 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
407  Teuchos::TimeMonitor timeMon (*timer_doPosts3KV_);
408 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
409 
410  const int myRank = plan.getComm()->getRank ();
411  // Run-time configurable parameters that come from the input
412  // ParameterList set by setParameterList().
413  const Details::EDistributorSendType sendType = plan.getSendType();
414 
415 //clang-format on
416 #if defined(HAVE_TPETRA_MPI)
417  // All-to-all communication layout is quite different from
418  // point-to-point, so we handle it separately.
419 
420  if (sendType == Details::DISTRIBUTOR_ALLTOALL) {
421  doPostsAllToAll(plan, exports,numPackets, imports);
422  return;
423  }
424 #ifdef HAVE_TPETRACORE_MPI_ADVANCE
425  else if (sendType == Details::DISTRIBUTOR_MPIADVANCE_ALLTOALL) {
426  doPostsAllToAll(plan, exports,numPackets, imports);
427  return;
428  } else if (sendType == Details::DISTRIBUTOR_MPIADVANCE_NBRALLTOALLV) {
429  doPostsNbrAllToAllV(plan, exports,numPackets, imports);
430  return;
431  }
432 #endif // defined(HAVE_TPETRACORE_MPI_ADVANCE)
433 // clang-format off
434 
435 #else // HAVE_TPETRA_MPI
436  if (plan.hasSelfMessage()) {
437  // This is how we "send a message to ourself": we copy from
438  // the export buffer to the import buffer. That saves
439  // Teuchos::Comm implementations other than MpiComm (in
440  // particular, SerialComm) the trouble of implementing self
441  // messages correctly. (To do this right, SerialComm would
442  // need internal buffer space for messages, keyed on the
443  // message's tag.)
444  size_t selfReceiveOffset = 0;
445  deep_copy_offset(imports, exports, selfReceiveOffset,
446  plan.getStartsTo()[0]*numPackets,
447  plan.getLengthsTo()[0]*numPackets);
448  }
449  // should we just return here?
450  // likely not as comm could be a serial comm
451 #endif // HAVE_TPETRA_MPI
452 
453  size_t selfReceiveOffset = 0;
454 
455 #ifdef HAVE_TPETRA_DEBUG
456  TEUCHOS_TEST_FOR_EXCEPTION
457  (requestsRecv_.size () != 0,
458  std::logic_error,
459  "Tpetra::Distributor::doPosts(3 args, Kokkos): Process "
460  << myRank << ": requestsRecv_.size() = " << requestsRecv_.size () << " != 0.");
461  TEUCHOS_TEST_FOR_EXCEPTION
462  (requestsSend_.size () != 0,
463  std::logic_error,
464  "Tpetra::Distributor::doPosts(3 args, Kokkos): Process "
465  << myRank << ": requestsSend_.size() = " << requestsSend_.size () << " != 0.");
466 #endif // HAVE_TPETRA_DEBUG
467 
468  // Distributor uses requestsRecv_.size() and requestsSend_.size()
469  // as the number of outstanding nonblocking message requests, so
470  // we resize to zero to maintain this invariant.
471  //
472  // getNumReceives() does _not_ include the self message, if there is
473  // one. Here, we do actually send a message to ourselves, so we
474  // include any self message in the "actual" number of receives to
475  // post.
476  //
477  // NOTE (mfh 19 Mar 2012): Epetra_MpiDistributor::DoPosts()
478  // doesn't (re)allocate its array of requests. That happens in
479  // CreateFromSends(), ComputeRecvs_(), DoReversePosts() (on
480  // demand), or Resize_().
481  const size_type actualNumReceives = as<size_type> (plan.getNumReceives()) +
482  as<size_type> (plan.hasSelfMessage() ? 1 : 0);
483  requestsRecv_.resize (0);
484  requestsSend_.resize (0);
485 
486  // Post the nonblocking receives. It's common MPI wisdom to post
487  // receives before sends. In MPI terms, this means favoring
488  // adding to the "posted queue" (of receive requests) over adding
489  // to the "unexpected queue" (of arrived messages not yet matched
490  // with a receive).
491  {
492 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
493  Teuchos::TimeMonitor timeMonRecvs (*timer_doPosts3KV_recvs_);
494 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
495 
496  size_t curBufferOffset = 0;
497  for (size_type i = 0; i < actualNumReceives; ++i) {
498  const size_t curBufLen = plan.getLengthsFrom()[i] * numPackets;
499  if (plan.getProcsFrom()[i] != myRank) {
500  // If my process is receiving these packet(s) from another
501  // process (not a self-receive):
502  //
503  // 1. Set up the persisting view (recvBuf) of the imports
504  // array, given the offset and size (total number of
505  // packets from process getProcsFrom()[i]).
506  // 2. Start the Irecv and save the resulting request.
507  TEUCHOS_TEST_FOR_EXCEPTION(
508  curBufferOffset + curBufLen > static_cast<size_t> (imports.size ()),
509  std::logic_error, "Tpetra::Distributor::doPosts(3 args, Kokkos): "
510  "Exceeded size of 'imports' array in packing loop on Process " <<
511  myRank << ". imports.size() = " << imports.size () << " < "
512  "curBufferOffset(" << curBufferOffset << ") + curBufLen(" <<
513  curBufLen << ").");
514  imports_view_type recvBuf =
515  subview_offset (imports, curBufferOffset, curBufLen);
516  requestsRecv_.push_back (ireceive<int> (recvBuf, plan.getProcsFrom()[i],
517  mpiTag_, *plan.getComm()));
518  }
519  else { // Receiving from myself
520  selfReceiveOffset = curBufferOffset; // Remember the self-recv offset
521  }
522  curBufferOffset += curBufLen;
523  }
524  }
525 
526 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
527  Teuchos::TimeMonitor timeMonSends (*timer_doPosts3KV_sends_);
528 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
529 
530  // setup scan through getProcsTo() list starting with higher numbered procs
531  // (should help balance message traffic)
532  //
533  // FIXME (mfh 20 Feb 2013) Why haven't we precomputed this?
534  // It doesn't depend on the input at all.
535  size_t numBlocks = plan.getNumSends() + plan.hasSelfMessage();
536  size_t procIndex = 0;
537  while ((procIndex < numBlocks) && (plan.getProcsTo()[procIndex] < myRank)) {
538  ++procIndex;
539  }
540  if (procIndex == numBlocks) {
541  procIndex = 0;
542  }
543 
544  size_t selfNum = 0;
545  size_t selfIndex = 0;
546 
547  if (plan.getIndicesTo().is_null()) {
548 
549 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
550  Teuchos::TimeMonitor timeMonSends2 (*timer_doPosts3KV_sends_fast_);
551 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
552 
553  // Data are already blocked (laid out) by process, so we don't
554  // need a separate send buffer (besides the exports array).
555  for (size_t i = 0; i < numBlocks; ++i) {
556  size_t p = i + procIndex;
557  if (p > (numBlocks - 1)) {
558  p -= numBlocks;
559  }
560 
561  if (plan.getProcsTo()[p] != myRank) {
562  exports_view_type tmpSend = subview_offset(
563  exports, plan.getStartsTo()[p]*numPackets, plan.getLengthsTo()[p]*numPackets);
564 
565  if (sendType == Details::DISTRIBUTOR_ISEND) {
566  // NOTE: This looks very similar to the tmpSend above, but removing
567  // tmpSendBuf and uses tmpSend leads to a performance hit on Arm
568  // SerialNode builds
569  exports_view_type tmpSendBuf =
570  subview_offset (exports, plan.getStartsTo()[p] * numPackets,
571  plan.getLengthsTo()[p] * numPackets);
572  requestsSend_.push_back (isend<int> (tmpSendBuf, plan.getProcsTo()[p],
573  mpiTag_, *plan.getComm()));
574  }
575  else { // DISTRIBUTOR_SEND
576  send<int> (tmpSend,
577  as<int> (tmpSend.size ()),
578  plan.getProcsTo()[p], mpiTag_, *plan.getComm());
579  }
580  }
581  else { // "Sending" the message to myself
582  selfNum = p;
583  }
584  }
585 
586  if (plan.hasSelfMessage()) {
587  // This is how we "send a message to ourself": we copy from
588  // the export buffer to the import buffer. That saves
589  // Teuchos::Comm implementations other than MpiComm (in
590  // particular, SerialComm) the trouble of implementing self
591  // messages correctly. (To do this right, SerialComm would
592  // need internal buffer space for messages, keyed on the
593  // message's tag.)
594  deep_copy_offset(imports, exports, selfReceiveOffset,
595  plan.getStartsTo()[selfNum]*numPackets,
596  plan.getLengthsTo()[selfNum]*numPackets);
597  }
598 
599  }
600  else { // data are not blocked by proc, use send buffer
601 
602 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
603  Teuchos::TimeMonitor timeMonSends2 (*timer_doPosts3KV_sends_slow_);
604 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
605 
606  typedef typename ExpView::non_const_value_type Packet;
607  typedef typename ExpView::array_layout Layout;
608  typedef typename ExpView::device_type Device;
609  typedef typename ExpView::memory_traits Mem;
610 
611  // This buffer is long enough for only one message at a time.
612  // Thus, we use DISTRIBUTOR_SEND always in this case, regardless
613  // of sendType requested by user.
614  // This code path formerly errored out with message:
615  // Tpetra::Distributor::doPosts(3 args, Kokkos):
616  // The "send buffer" code path
617  // doesn't currently work with nonblocking sends.
618  // Now, we opt to just do the communication in a way that works.
619 #ifdef HAVE_TPETRA_DEBUG
620  if (sendType != Details::DISTRIBUTOR_SEND) {
621  if (plan.getComm()->getRank() == 0)
622  std::cout << "The requested Tpetra send type "
624  << " requires Distributor data to be ordered by"
625  << " the receiving processor rank. Since these"
626  << " data are not ordered, Tpetra will use Send"
627  << " instead." << std::endl;
628  }
629 #endif
630 
631  Kokkos::View<Packet*,Layout,Device,Mem> sendArray ("sendArray",
632  plan.getMaxSendLength() * numPackets);
633 
634  for (size_t i = 0; i < numBlocks; ++i) {
635  size_t p = i + procIndex;
636  if (p > (numBlocks - 1)) {
637  p -= numBlocks;
638  }
639 
640  if (plan.getProcsTo()[p] != myRank) {
641  size_t sendArrayOffset = 0;
642  size_t j = plan.getStartsTo()[p];
643  for (size_t k = 0; k < plan.getLengthsTo()[p]; ++k, ++j) {
644  packOffset(sendArray, exports, sendArrayOffset, plan.getIndicesTo()[j]*numPackets, numPackets);
645  sendArrayOffset += numPackets;
646  }
647  typename ExpView::execution_space().fence();
648 
649  ImpView tmpSend =
650  subview_offset(sendArray, size_t(0), plan.getLengthsTo()[p]*numPackets);
651 
652  send<int> (tmpSend,
653  as<int> (tmpSend.size ()),
654  plan.getProcsTo()[p], mpiTag_, *plan.getComm());
655  }
656  else { // "Sending" the message to myself
657  selfNum = p;
658  selfIndex = plan.getStartsTo()[p];
659  }
660  }
661 
662  if (plan.hasSelfMessage()) {
663  for (size_t k = 0; k < plan.getLengthsTo()[selfNum]; ++k) {
664  packOffset(imports, exports, selfReceiveOffset, plan.getIndicesTo()[selfIndex]*numPackets, numPackets);
665  ++selfIndex;
666  selfReceiveOffset += numPackets;
667  }
668  }
669  }
670 }
671 
672 // clang-format on
673 #ifdef HAVE_TPETRA_MPI
674 template <class ExpView, class ImpView>
675 void DistributorActor::doPostsAllToAll(
676  const DistributorPlan &plan, const ExpView &exports,
677  const Teuchos::ArrayView<const size_t> &numExportPacketsPerLID,
678  const ImpView &imports,
679  const Teuchos::ArrayView<const size_t> &numImportPacketsPerLID) {
680  TEUCHOS_TEST_FOR_EXCEPTION(
681  !plan.getIndicesTo().is_null(), std::runtime_error,
682  "Send Type=\"Alltoall\" only works for fast-path communication.");
683 
684  using size_type = Teuchos::Array<size_t>::size_type;
685 
686  auto comm = plan.getComm();
687  std::vector<int> sendcounts(comm->getSize(), 0);
688  std::vector<int> sdispls(comm->getSize(), 0);
689  std::vector<int> recvcounts(comm->getSize(), 0);
690  std::vector<int> rdispls(comm->getSize(), 0);
691 
692  size_t curPKToffset = 0;
693  for (size_t pp = 0; pp < plan.getNumSends(); ++pp) {
694  sdispls[plan.getProcsTo()[pp]] = curPKToffset;
695  size_t numPackets = 0;
696  for (size_t j = plan.getStartsTo()[pp];
697  j < plan.getStartsTo()[pp] + plan.getLengthsTo()[pp]; ++j) {
698  numPackets += numExportPacketsPerLID[j];
699  }
700  // numPackets is converted down to int, so make sure it can be represented
701  TEUCHOS_TEST_FOR_EXCEPTION(numPackets > size_t(INT_MAX), std::logic_error,
702  "Tpetra::Distributor::doPosts(4 args, Kokkos): "
703  "Send count for send "
704  << pp << " (" << numPackets
705  << ") is too large "
706  "to be represented as int.");
707  sendcounts[plan.getProcsTo()[pp]] = static_cast<int>(numPackets);
708  curPKToffset += numPackets;
709  }
710 
711  const size_type actualNumReceives =
712  Teuchos::as<size_type>(plan.getNumReceives()) +
713  Teuchos::as<size_type>(plan.hasSelfMessage() ? 1 : 0);
714 
715  size_t curBufferOffset = 0;
716  size_t curLIDoffset = 0;
717  for (size_type i = 0; i < actualNumReceives; ++i) {
718  size_t totalPacketsFrom_i = 0;
719  for (size_t j = 0; j < plan.getLengthsFrom()[i]; ++j) {
720  totalPacketsFrom_i += numImportPacketsPerLID[curLIDoffset + j];
721  }
722  curLIDoffset += plan.getLengthsFrom()[i];
723 
724  rdispls[plan.getProcsFrom()[i]] = curBufferOffset;
725  // totalPacketsFrom_i is converted down to int, so make sure it can be
726  // represented
727  TEUCHOS_TEST_FOR_EXCEPTION(totalPacketsFrom_i > size_t(INT_MAX),
728  std::logic_error,
729  "Tpetra::Distributor::doPosts(3 args, Kokkos): "
730  "Recv count for receive "
731  << i << " (" << totalPacketsFrom_i
732  << ") is too large "
733  "to be represented as int.");
734  recvcounts[plan.getProcsFrom()[i]] = static_cast<int>(totalPacketsFrom_i);
735  curBufferOffset += totalPacketsFrom_i;
736  }
737 
738  Teuchos::RCP<const Teuchos::MpiComm<int>> mpiComm =
739  Teuchos::rcp_dynamic_cast<const Teuchos::MpiComm<int>>(comm);
740  Teuchos::RCP<const Teuchos::OpaqueWrapper<MPI_Comm>> rawComm =
741  mpiComm->getRawMpiComm();
742  using T = typename ExpView::non_const_value_type;
743  MPI_Datatype rawType = ::Tpetra::Details::MpiTypeTraits<T>::getType(T());
744 
745 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
746  if (Details::DISTRIBUTOR_MPIADVANCE_ALLTOALL == plan.getSendType()) {
747  MPIX_Comm *mpixComm = *plan.getMPIXComm();
748  TEUCHOS_TEST_FOR_EXCEPTION(!mpixComm, std::runtime_error,
749  "MPIX_Comm is null in doPostsAllToAll \""
750  << __FILE__ << ":" << __LINE__);
751 
752  const int err = MPIX_Alltoallv(
753  exports.data(), sendcounts.data(), sdispls.data(), rawType,
754  imports.data(), recvcounts.data(), rdispls.data(), rawType, mpixComm);
755 
756  TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error,
757  "MPIX_Alltoallv failed with error \""
758  << Teuchos::mpiErrorCodeToString(err)
759  << "\".");
760 
761  return;
762  }
763 #endif // HAVE_TPETRACORE_MPI_ADVANCE
764 
765  const int err = MPI_Alltoallv(
766  exports.data(), sendcounts.data(), sdispls.data(), rawType,
767  imports.data(), recvcounts.data(), rdispls.data(), rawType, (*rawComm)());
768 
769  TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error,
770  "MPI_Alltoallv failed with error \""
771  << Teuchos::mpiErrorCodeToString(err)
772  << "\".");
773 }
774 
775 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
776 template <class ExpView, class ImpView>
777 void DistributorActor::doPostsNbrAllToAllV(
778  const DistributorPlan &plan, const ExpView &exports,
779  const Teuchos::ArrayView<const size_t> &numExportPacketsPerLID,
780  const ImpView &imports,
781  const Teuchos::ArrayView<const size_t> &numImportPacketsPerLID) {
782  TEUCHOS_TEST_FOR_EXCEPTION(
783  !plan.getIndicesTo().is_null(), std::runtime_error,
784  "Send Type=\"Alltoall\" only works for fast-path communication.");
785 
786  const Teuchos_Ordinal numSends = plan.getProcsTo().size();
787  const Teuchos_Ordinal numRecvs = plan.getProcsFrom().size();
788 
789  auto comm = plan.getComm();
790  std::vector<int> sendcounts(numSends, 0);
791  std::vector<int> sdispls(numSends, 0);
792  std::vector<int> recvcounts(numRecvs, 0);
793  std::vector<int> rdispls(numRecvs, 0);
794 
795  Teuchos::RCP<const Teuchos::MpiComm<int>> mpiComm =
796  Teuchos::rcp_dynamic_cast<const Teuchos::MpiComm<int>>(comm);
797  Teuchos::RCP<const Teuchos::OpaqueWrapper<MPI_Comm>> rawComm =
798  mpiComm->getRawMpiComm();
799  using T = typename ExpView::non_const_value_type;
800  MPI_Datatype rawType = ::Tpetra::Details::MpiTypeTraits<T>::getType(T());
801 
802  // unlike standard alltoall, entry `i` in sdispls and sendcounts
803  // refer to the ith participating rank, rather than rank i
804  size_t curPKToffset = 0;
805  for (Teuchos_Ordinal pp = 0; pp < numSends; ++pp) {
806  sdispls[pp] = curPKToffset;
807  size_t numPackets = 0;
808  for (size_t j = plan.getStartsTo()[pp];
809  j < plan.getStartsTo()[pp] + plan.getLengthsTo()[pp]; ++j) {
810  numPackets += numExportPacketsPerLID[j];
811  }
812  // numPackets is converted down to int, so make sure it can be represented
813  TEUCHOS_TEST_FOR_EXCEPTION(numPackets > size_t(INT_MAX), std::logic_error,
814  "Tpetra::Distributor::doPosts(4 args, Kokkos): "
815  "Send count for send "
816  << pp << " (" << numPackets
817  << ") is too large "
818  "to be represented as int.");
819  sendcounts[pp] = static_cast<int>(numPackets);
820  curPKToffset += numPackets;
821  }
822  size_t curBufferOffset = 0;
823  size_t curLIDoffset = 0;
824  for (Teuchos_Ordinal i = 0; i < numRecvs; ++i) {
825  size_t totalPacketsFrom_i = 0;
826  for (size_t j = 0; j < plan.getLengthsFrom()[i]; ++j) {
827  totalPacketsFrom_i += numImportPacketsPerLID[curLIDoffset + j];
828  }
829  curLIDoffset += plan.getLengthsFrom()[i];
830 
831  rdispls[i] = curBufferOffset;
832  // totalPacketsFrom_i is converted down to int, so make sure it can be
833  // represented
834  TEUCHOS_TEST_FOR_EXCEPTION(totalPacketsFrom_i > size_t(INT_MAX),
835  std::logic_error,
836  "Tpetra::Distributor::doPosts(3 args, Kokkos): "
837  "Recv count for receive "
838  << i << " (" << totalPacketsFrom_i
839  << ") is too large "
840  "to be represented as int.");
841  recvcounts[i] = static_cast<int>(totalPacketsFrom_i);
842  curBufferOffset += totalPacketsFrom_i;
843  }
844 
845  MPIX_Comm *mpixComm = *plan.getMPIXComm();
846  const int err = MPIX_Neighbor_alltoallv(
847  exports.data(), sendcounts.data(), sdispls.data(), rawType,
848  imports.data(), recvcounts.data(), rdispls.data(), rawType, mpixComm);
849 
850  TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error,
851  "MPIX_Neighbor_alltoallv failed with error \""
852  << Teuchos::mpiErrorCodeToString(err)
853  << "\".");
854 }
855 #endif // HAVE_TPETRACORE_MPI_ADVANCE
856 #endif // HAVE_TPETRA_MPI
857  // clang-format off
858 
859 template <class ExpView, class ImpView>
860 void DistributorActor::doPosts(const DistributorPlan& plan,
861  const ExpView &exports,
862  const Teuchos::ArrayView<const size_t>& numExportPacketsPerLID,
863  const ImpView &imports,
864  const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID)
865 {
866  static_assert(areKokkosViews<ExpView, ImpView>,
867  "Data arrays for DistributorActor::doPosts must be Kokkos::Views");
868  using Teuchos::Array;
869  using Teuchos::as;
870  using Teuchos::ireceive;
871  using Teuchos::isend;
872  using Teuchos::send;
873  using Teuchos::TypeNameTraits;
874  using std::endl;
875  using Kokkos::Compat::create_const_view;
876  using Kokkos::Compat::create_view;
877  using Kokkos::Compat::subview_offset;
878  using Kokkos::Compat::deep_copy_offset;
879  typedef Array<size_t>::size_type size_type;
880  typedef ExpView exports_view_type;
881  typedef ImpView imports_view_type;
882 
883 #ifdef KOKKOS_ENABLE_CUDA
884  static_assert (! std::is_same<typename ExpView::memory_space, Kokkos::CudaUVMSpace>::value &&
885  ! std::is_same<typename ImpView::memory_space, Kokkos::CudaUVMSpace>::value,
886  "Please do not use Tpetra::Distributor with UVM "
887  "allocations. See GitHub issue #1088.");
888 #endif // KOKKOS_ENABLE_CUDA
889 
890 #ifdef KOKKOS_ENABLE_SYCL
891  static_assert (! std::is_same<typename ExpView::memory_space, Kokkos::Experimental::SYCLSharedUSMSpace>::value &&
892  ! std::is_same<typename ImpView::memory_space, Kokkos::Experimental::SYCLSharedUSMSpace>::value,
893  "Please do not use Tpetra::Distributor with SharedUSM "
894  "allocations. See GitHub issue #1088 (corresponding to CUDA).");
895 #endif // KOKKOS_ENABLE_SYCL
896 
897 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
898  Teuchos::TimeMonitor timeMon (*timer_doPosts4KV_);
899 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
900 
901  // Run-time configurable parameters that come from the input
902  // ParameterList set by setParameterList().
903  const Details::EDistributorSendType sendType = plan.getSendType();
904 
905 #ifdef HAVE_TPETRA_MPI
906  // All-to-all communication layout is quite different from
907  // point-to-point, so we handle it separately.
908  if (sendType == Details::DISTRIBUTOR_ALLTOALL) {
909  doPostsAllToAll(plan, exports, numExportPacketsPerLID, imports, numImportPacketsPerLID);
910  return;
911  }
912 #ifdef HAVE_TPETRACORE_MPI_ADVANCE
913  else if (sendType == Details::DISTRIBUTOR_MPIADVANCE_ALLTOALL)
914  {
915  doPostsAllToAll(plan, exports, numExportPacketsPerLID, imports, numImportPacketsPerLID);
916  return;
917  } else if (sendType == Details::DISTRIBUTOR_MPIADVANCE_NBRALLTOALLV) {
918  doPostsNbrAllToAllV(plan, exports, numExportPacketsPerLID, imports, numImportPacketsPerLID);
919  return;
920  }
921 #endif
922 
923 #else // HAVE_TPETRA_MPI
924  if (plan.hasSelfMessage()) {
925 
926  size_t selfReceiveOffset = 0;
927 
928  // setup arrays containing starting-offsets into exports for each send,
929  // and num-packets-to-send for each send.
930  Array<size_t> sendPacketOffsets(plan.getNumSends(),0), packetsPerSend(plan.getNumSends(),0);
931  size_t maxNumPackets = 0;
932  size_t curPKToffset = 0;
933  for (size_t pp=0; pp<plan.getNumSends(); ++pp) {
934  sendPacketOffsets[pp] = curPKToffset;
935  size_t numPackets = 0;
936  for (size_t j=plan.getStartsTo()[pp]; j<plan.getStartsTo()[pp]+plan.getLengthsTo()[pp]; ++j) {
937  numPackets += numExportPacketsPerLID[j];
938  }
939  if (numPackets > maxNumPackets) maxNumPackets = numPackets;
940  packetsPerSend[pp] = numPackets;
941  curPKToffset += numPackets;
942  }
943 
944  deep_copy_offset(imports, exports, selfReceiveOffset,
945  sendPacketOffsets[0], packetsPerSend[0]);
946  }
947 #endif // HAVE_TPETRA_MPI
948 
949  const int myProcID = plan.getComm()->getRank ();
950  size_t selfReceiveOffset = 0;
951 
952 #ifdef HAVE_TPETRA_DEBUG
953  // Different messages may have different numbers of packets.
954  size_t totalNumImportPackets = 0;
955  for (size_type ii = 0; ii < numImportPacketsPerLID.size (); ++ii) {
956  totalNumImportPackets += numImportPacketsPerLID[ii];
957  }
958  TEUCHOS_TEST_FOR_EXCEPTION(
959  imports.extent (0) < totalNumImportPackets, std::runtime_error,
960  "Tpetra::Distributor::doPosts(4 args, Kokkos): The 'imports' array must have "
961  "enough entries to hold the expected number of import packets. "
962  "imports.extent(0) = " << imports.extent (0) << " < "
963  "totalNumImportPackets = " << totalNumImportPackets << ".");
964  TEUCHOS_TEST_FOR_EXCEPTION
965  (requestsRecv_.size () != 0, std::logic_error, "Tpetra::Distributor::"
966  "doPosts(4 args, Kokkos): Process " << myProcID << ": requestsRecv_.size () = "
967  << requestsRecv_.size () << " != 0.");
968  TEUCHOS_TEST_FOR_EXCEPTION
969  (requestsSend_.size () != 0, std::logic_error, "Tpetra::Distributor::"
970  "doPosts(4 args, Kokkos): Process " << myProcID << ": requestsSend_.size () = "
971  << requestsSend_.size () << " != 0.");
972 #endif // HAVE_TPETRA_DEBUG
973  // Distributor uses requestsRecv_.size() and requestsSend_.size()
974  // as the number of outstanding nonblocking message requests, so
975  // we resize to zero to maintain this invariant.
976  //
977  // getNumReceives() does _not_ include the self message, if there is
978  // one. Here, we do actually send a message to ourselves, so we
979  // include any self message in the "actual" number of receives to
980  // post.
981  //
982  // NOTE (mfh 19 Mar 2012): Epetra_MpiDistributor::DoPosts()
983  // doesn't (re)allocate its array of requests. That happens in
984  // CreateFromSends(), ComputeRecvs_(), DoReversePosts() (on
985  // demand), or Resize_().
986  const size_type actualNumReceives = as<size_type> (plan.getNumReceives()) +
987  as<size_type> (plan.hasSelfMessage() ? 1 : 0);
988  requestsRecv_.resize (0);
989  requestsSend_.resize (0);
990 
991  // Post the nonblocking receives. It's common MPI wisdom to post
992  // receives before sends. In MPI terms, this means favoring
993  // adding to the "posted queue" (of receive requests) over adding
994  // to the "unexpected queue" (of arrived messages not yet matched
995  // with a receive).
996  {
997 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
998  Teuchos::TimeMonitor timeMonRecvs (*timer_doPosts4KV_recvs_);
999 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
1000 
1001  size_t curBufferOffset = 0;
1002  size_t curLIDoffset = 0;
1003  for (size_type i = 0; i < actualNumReceives; ++i) {
1004  size_t totalPacketsFrom_i = 0;
1005  for (size_t j = 0; j < plan.getLengthsFrom()[i]; ++j) {
1006  totalPacketsFrom_i += numImportPacketsPerLID[curLIDoffset+j];
1007  }
1008  // totalPacketsFrom_i is converted down to int, so make sure it can be represented
1009  TEUCHOS_TEST_FOR_EXCEPTION(totalPacketsFrom_i > size_t(INT_MAX),
1010  std::logic_error, "Tpetra::Distributor::doPosts(3 args, Kokkos): "
1011  "Recv count for receive " << i << " (" << totalPacketsFrom_i << ") is too large "
1012  "to be represented as int.");
1013  curLIDoffset += plan.getLengthsFrom()[i];
1014  if (plan.getProcsFrom()[i] != myProcID && totalPacketsFrom_i) {
1015  // If my process is receiving these packet(s) from another
1016  // process (not a self-receive), and if there is at least
1017  // one packet to receive:
1018  //
1019  // 1. Set up the persisting view (recvBuf) into the imports
1020  // array, given the offset and size (total number of
1021  // packets from process getProcsFrom()[i]).
1022  // 2. Start the Irecv and save the resulting request.
1023  imports_view_type recvBuf =
1024  subview_offset (imports, curBufferOffset, totalPacketsFrom_i);
1025  requestsRecv_.push_back (ireceive<int> (recvBuf, plan.getProcsFrom()[i],
1026  mpiTag_, *plan.getComm()));
1027  }
1028  else { // Receiving these packet(s) from myself
1029  selfReceiveOffset = curBufferOffset; // Remember the offset
1030  }
1031  curBufferOffset += totalPacketsFrom_i;
1032  }
1033  }
1034 
1035 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
1036  Teuchos::TimeMonitor timeMonSends (*timer_doPosts4KV_sends_);
1037 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
1038 
1039  // setup arrays containing starting-offsets into exports for each send,
1040  // and num-packets-to-send for each send.
1041  Array<size_t> sendPacketOffsets(plan.getNumSends(),0), packetsPerSend(plan.getNumSends(),0);
1042  size_t maxNumPackets = 0;
1043  size_t curPKToffset = 0;
1044  for (size_t pp=0; pp<plan.getNumSends(); ++pp) {
1045  sendPacketOffsets[pp] = curPKToffset;
1046  size_t numPackets = 0;
1047  for (size_t j=plan.getStartsTo()[pp]; j<plan.getStartsTo()[pp]+plan.getLengthsTo()[pp]; ++j) {
1048  numPackets += numExportPacketsPerLID[j];
1049  }
1050  if (numPackets > maxNumPackets) maxNumPackets = numPackets;
1051  // numPackets will be used as a message length, so make sure it can be represented as int
1052  TEUCHOS_TEST_FOR_EXCEPTION(numPackets > size_t(INT_MAX),
1053  std::logic_error, "Tpetra::Distributor::doPosts(4 args, Kokkos): "
1054  "packetsPerSend[" << pp << "] = " << numPackets << " is too large "
1055  "to be represented as int.");
1056  packetsPerSend[pp] = numPackets;
1057  curPKToffset += numPackets;
1058  }
1059 
1060  // setup scan through getProcsTo() list starting with higher numbered procs
1061  // (should help balance message traffic)
1062  size_t numBlocks = plan.getNumSends() + plan.hasSelfMessage();
1063  size_t procIndex = 0;
1064  while ((procIndex < numBlocks) && (plan.getProcsTo()[procIndex] < myProcID)) {
1065  ++procIndex;
1066  }
1067  if (procIndex == numBlocks) {
1068  procIndex = 0;
1069  }
1070 
1071  size_t selfNum = 0;
1072  size_t selfIndex = 0;
1073  if (plan.getIndicesTo().is_null()) {
1074 
1075 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
1076  Teuchos::TimeMonitor timeMonSends2 (*timer_doPosts4KV_sends_fast_);
1077 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
1078 
1079  // Data are already blocked (laid out) by process, so we don't
1080  // need a separate send buffer (besides the exports array).
1081  for (size_t i = 0; i < numBlocks; ++i) {
1082  size_t p = i + procIndex;
1083  if (p > (numBlocks - 1)) {
1084  p -= numBlocks;
1085  }
1086 
1087  if (plan.getProcsTo()[p] != myProcID && packetsPerSend[p] > 0) {
1088  exports_view_type tmpSend =
1089  subview_offset(exports, sendPacketOffsets[p], packetsPerSend[p]);
1090 
1091  if (sendType == Details::DISTRIBUTOR_ISEND) {
1092  exports_view_type tmpSendBuf =
1093  subview_offset (exports, sendPacketOffsets[p], packetsPerSend[p]);
1094  requestsSend_.push_back (isend<int> (tmpSendBuf, plan.getProcsTo()[p],
1095  mpiTag_, *plan.getComm()));
1096  }
1097  else { // DISTRIBUTOR_SEND
1098  send<int> (tmpSend,
1099  as<int> (tmpSend.size ()),
1100  plan.getProcsTo()[p], mpiTag_, *plan.getComm());
1101  }
1102  }
1103  else { // "Sending" the message to myself
1104  selfNum = p;
1105  }
1106  }
1107 
1108  if (plan.hasSelfMessage()) {
1109  deep_copy_offset(imports, exports, selfReceiveOffset,
1110  sendPacketOffsets[selfNum], packetsPerSend[selfNum]);
1111  }
1112  }
1113  else { // data are not blocked by proc, use send buffer
1114 
1115 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
1116  Teuchos::TimeMonitor timeMonSends2 (*timer_doPosts4KV_sends_slow_);
1117 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
1118 
1119  // FIXME (mfh 05 Mar 2013) This may be broken for Isend.
1120  typedef typename ExpView::non_const_value_type Packet;
1121  typedef typename ExpView::array_layout Layout;
1122  typedef typename ExpView::device_type Device;
1123  typedef typename ExpView::memory_traits Mem;
1124 
1125  // This buffer is long enough for only one message at a time.
1126  // Thus, we use DISTRIBUTOR_SEND always in this case, regardless
1127  // of sendType requested by user.
1128  // This code path formerly errored out with message:
1129  // Tpetra::Distributor::doPosts(4-arg, Kokkos):
1130  // The "send buffer" code path
1131  // doesn't currently work with nonblocking sends.
1132  // Now, we opt to just do the communication in a way that works.
1133 #ifdef HAVE_TPETRA_DEBUG
1134  if (sendType != Details::DISTRIBUTOR_SEND) {
1135  if (plan.getComm()->getRank() == 0)
1136  std::cout << "The requested Tpetra send type "
1137  << DistributorSendTypeEnumToString(sendType)
1138  << " requires Distributor data to be ordered by"
1139  << " the receiving processor rank. Since these"
1140  << " data are not ordered, Tpetra will use Send"
1141  << " instead." << std::endl;
1142  }
1143 #endif
1144 
1145  Kokkos::View<Packet*,Layout,Device,Mem> sendArray ("sendArray",
1146  maxNumPackets);
1147 
1148  Array<size_t> indicesOffsets (numExportPacketsPerLID.size(), 0);
1149  size_t ioffset = 0;
1150  for (int j=0; j<numExportPacketsPerLID.size(); ++j) {
1151  indicesOffsets[j] = ioffset;
1152  ioffset += numExportPacketsPerLID[j];
1153  }
1154 
1155  for (size_t i = 0; i < numBlocks; ++i) {
1156  size_t p = i + procIndex;
1157  if (p > (numBlocks - 1)) {
1158  p -= numBlocks;
1159  }
1160 
1161  if (plan.getProcsTo()[p] != myProcID) {
1162  size_t sendArrayOffset = 0;
1163  size_t j = plan.getStartsTo()[p];
1164  size_t numPacketsTo_p = 0;
1165  for (size_t k = 0; k < plan.getLengthsTo()[p]; ++k, ++j) {
1166  numPacketsTo_p += numExportPacketsPerLID[j];
1167  deep_copy_offset(sendArray, exports, sendArrayOffset,
1168  indicesOffsets[j], numExportPacketsPerLID[j]);
1169  sendArrayOffset += numExportPacketsPerLID[j];
1170  }
1171  typename ExpView::execution_space().fence();
1172 
1173  if (numPacketsTo_p > 0) {
1174  ImpView tmpSend =
1175  subview_offset(sendArray, size_t(0), numPacketsTo_p);
1176 
1177  send<int> (tmpSend,
1178  as<int> (tmpSend.size ()),
1179  plan.getProcsTo()[p], mpiTag_, *plan.getComm());
1180  }
1181  }
1182  else { // "Sending" the message to myself
1183  selfNum = p;
1184  selfIndex = plan.getStartsTo()[p];
1185  }
1186  }
1187 
1188  if (plan.hasSelfMessage()) {
1189  for (size_t k = 0; k < plan.getLengthsTo()[selfNum]; ++k) {
1190  deep_copy_offset(imports, exports, selfReceiveOffset,
1191  indicesOffsets[selfIndex],
1192  numExportPacketsPerLID[selfIndex]);
1193  selfReceiveOffset += numExportPacketsPerLID[selfIndex];
1194  ++selfIndex;
1195  }
1196  }
1197  }
1198 }
1199 
1200 }
1201 }
1202 
1203 #endif
Add specializations of Teuchos::Details::MpiTypeTraits for Kokkos::complex&lt;float&gt; and Kokkos::complex...
std::string DistributorSendTypeEnumToString(EDistributorSendType sendType)
Convert an EDistributorSendType enum value to a string.
std::string DistributorHowInitializedEnumToString(EDistributorHowInitialized how)
Convert an EDistributorHowInitialized enum value to a string.
Stand-alone utility functions and macros.
EDistributorSendType
The type of MPI send that Distributor should use.