Tpetra parallel linear algebra  Version of the Day
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
Tpetra_Details_DistributorActor.hpp
1 // @HEADER
2 // *****************************************************************************
3 // Tpetra: Templated Linear Algebra Services Package
4 //
5 // Copyright 2008 NTESS and the Tpetra contributors.
6 // SPDX-License-Identifier: BSD-3-Clause
7 // *****************************************************************************
8 // @HEADER
9 
10 // clang-format off
11 
12 #ifndef TPETRA_DETAILS_DISTRIBUTOR_ACTOR_HPP
13 #define TPETRA_DETAILS_DISTRIBUTOR_ACTOR_HPP
14 
16 #include "Tpetra_Util.hpp"
17 
18 #include "Teuchos_Array.hpp"
19 #include "Teuchos_Comm.hpp"
21 #include "Teuchos_RCP.hpp"
22 #include "Teuchos_Time.hpp"
23 
24 #include "Kokkos_TeuchosCommAdapters.hpp"
25 
26 #ifdef HAVE_TPETRA_MPI
27 #include "mpi.h"
28 #endif
29 
30 namespace Tpetra {
31 namespace Details {
32 
33 template <class View1, class View2>
34 constexpr bool areKokkosViews = Kokkos::is_view<View1>::value && Kokkos::is_view<View2>::value;
35 
36 class DistributorActor {
37  static constexpr int DEFAULT_MPI_TAG = 1;
38 
39 public:
40  DistributorActor();
41  DistributorActor(const DistributorActor& otherActor);
42 
43  template <class ExpView, class ImpView>
44  void doPostsAndWaits(const DistributorPlan& plan,
45  const ExpView &exports,
46  size_t numPackets,
47  const ImpView &imports);
48 
49  template <class ExpView, class ImpView>
50  void doPostsAndWaits(const DistributorPlan& plan,
51  const ExpView &exports,
52  const Teuchos::ArrayView<const size_t>& numExportPacketsPerLID,
53  const ImpView &imports,
54  const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID);
55 
56  template <class ExpView, class ImpView>
57  void doPosts(const DistributorPlan& plan,
58  const ExpView& exports,
59  size_t numPackets,
60  const ImpView& imports);
61 
62  template <class ExpView, class ImpView>
63  void doPosts(const DistributorPlan& plan,
64  const ExpView &exports,
65  const Teuchos::ArrayView<const size_t>& numExportPacketsPerLID,
66  const ImpView &imports,
67  const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID);
68 
69  void doWaits(const DistributorPlan& plan);
70 
71  bool isReady() const;
72 
73 private:
74 // clang-format on
75 #ifdef HAVE_TPETRA_MPI
76  template <class ExpView, class ImpView>
77  void doPostsAllToAll(const DistributorPlan &plan, const ExpView &exports,
78  size_t numPackets, const ImpView &imports);
79 
80  template <class ExpView, class ImpView>
81  void doPostsAllToAll(
82  const DistributorPlan &plan, const ExpView &exports,
83  const Teuchos::ArrayView<const size_t> &numExportPacketsPerLID,
84  const ImpView &imports,
85  const Teuchos::ArrayView<const size_t> &numImportPacketsPerLID);
86 
87 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
88  template <class ExpView, class ImpView>
89  void doPostsNbrAllToAllV(const DistributorPlan &plan, const ExpView &exports,
90  size_t numPackets, const ImpView &imports);
91 
92  template <class ExpView, class ImpView>
93  void doPostsNbrAllToAllV(
94  const DistributorPlan &plan, const ExpView &exports,
95  const Teuchos::ArrayView<const size_t> &numExportPacketsPerLID,
96  const ImpView &imports,
97  const Teuchos::ArrayView<const size_t> &numImportPacketsPerLID);
98 #endif // HAVE_TPETRACORE_MPI_ADVANCE
99 #endif // HAVE_TPETRA_CORE
100  // clang-format off
101  int mpiTag_;
102 
103  Teuchos::Array<Teuchos::RCP<Teuchos::CommRequest<int>>> requests_;
104 
105 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
106  Teuchos::RCP<Teuchos::Time> timer_doPosts3KV_;
107  Teuchos::RCP<Teuchos::Time> timer_doPosts4KV_;
108  Teuchos::RCP<Teuchos::Time> timer_doWaits_;
109  Teuchos::RCP<Teuchos::Time> timer_doPosts3KV_recvs_;
110  Teuchos::RCP<Teuchos::Time> timer_doPosts4KV_recvs_;
111  Teuchos::RCP<Teuchos::Time> timer_doPosts3KV_barrier_;
112  Teuchos::RCP<Teuchos::Time> timer_doPosts4KV_barrier_;
113  Teuchos::RCP<Teuchos::Time> timer_doPosts3KV_sends_;
114  Teuchos::RCP<Teuchos::Time> timer_doPosts4KV_sends_;
115  Teuchos::RCP<Teuchos::Time> timer_doPosts3KV_sends_slow_;
116  Teuchos::RCP<Teuchos::Time> timer_doPosts4KV_sends_slow_;
117  Teuchos::RCP<Teuchos::Time> timer_doPosts3KV_sends_fast_;
118  Teuchos::RCP<Teuchos::Time> timer_doPosts4KV_sends_fast_;
119 
121  void makeTimers();
122 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
123 };
124 
125 template <class ExpView, class ImpView>
126 void DistributorActor::doPostsAndWaits(const DistributorPlan& plan,
127  const ExpView& exports,
128  size_t numPackets,
129  const ImpView& imports)
130 {
131  static_assert(areKokkosViews<ExpView, ImpView>,
132  "Data arrays for DistributorActor::doPostsAndWaits must be Kokkos::Views");
133  doPosts(plan, exports, numPackets, imports);
134  doWaits(plan);
135 }
136 
137 template <class ExpView, class ImpView>
138 void DistributorActor::doPostsAndWaits(const DistributorPlan& plan,
139  const ExpView& exports,
140  const Teuchos::ArrayView<const size_t>& numExportPacketsPerLID,
141  const ImpView& imports,
142  const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID)
143 {
144  static_assert(areKokkosViews<ExpView, ImpView>,
145  "Data arrays for DistributorActor::doPostsAndWaits must be Kokkos::Views");
146  doPosts(plan, exports, numExportPacketsPerLID, imports, numImportPacketsPerLID);
147  doWaits(plan);
148 }
149 
150 template <typename ViewType>
151 using HostAccessibility = Kokkos::SpaceAccessibility<Kokkos::DefaultHostExecutionSpace, typename ViewType::memory_space>;
152 
153 template <typename DstViewType, typename SrcViewType>
154 using enableIfHostAccessible = std::enable_if_t<HostAccessibility<DstViewType>::accessible &&
155  HostAccessibility<SrcViewType>::accessible>;
156 
157 template <typename DstViewType, typename SrcViewType>
158 using enableIfNotHostAccessible = std::enable_if_t<!HostAccessibility<DstViewType>::accessible ||
159  !HostAccessibility<SrcViewType>::accessible>;
160 
161 template <typename DstViewType, typename SrcViewType>
162 enableIfHostAccessible<DstViewType, SrcViewType>
163 packOffset(const DstViewType& dst,
164  const SrcViewType& src,
165  const size_t dst_offset,
166  const size_t src_offset,
167  const size_t size)
168 {
169  memcpy((void*) (dst.data()+dst_offset), src.data()+src_offset, size*sizeof(typename DstViewType::value_type));
170 }
171 
172 template <typename DstViewType, typename SrcViewType>
173 enableIfNotHostAccessible<DstViewType, SrcViewType>
174 packOffset(const DstViewType& dst,
175  const SrcViewType& src,
176  const size_t dst_offset,
177  const size_t src_offset,
178  const size_t size)
179 {
180  Kokkos::Compat::deep_copy_offset(dst, src, dst_offset, src_offset, size);
181 }
182 
183 // clang-format on
184 #ifdef HAVE_TPETRA_MPI
185 template <class ExpView, class ImpView>
186 void DistributorActor::doPostsAllToAll(const DistributorPlan &plan,
187  const ExpView &exports,
188  size_t numPackets,
189  const ImpView &imports) {
190  using size_type = Teuchos::Array<size_t>::size_type;
191 
192  TEUCHOS_TEST_FOR_EXCEPTION(
193  !plan.getIndicesTo().is_null(), std::runtime_error,
194  "Send Type=\"Alltoall\" only works for fast-path communication.");
195 
196  auto comm = plan.getComm();
197  const int myRank = comm->getRank();
198  std::vector<int> sendcounts(comm->getSize(), 0);
199  std::vector<int> sdispls(comm->getSize(), 0);
200  std::vector<int> recvcounts(comm->getSize(), 0);
201  std::vector<int> rdispls(comm->getSize(), 0);
202 
203  size_t numBlocks = plan.getNumSends() + plan.hasSelfMessage();
204  for (size_t p = 0; p < numBlocks; ++p) {
205  sdispls[plan.getProcsTo()[p]] = plan.getStartsTo()[p] * numPackets;
206  size_t sendcount = plan.getLengthsTo()[p] * numPackets;
207  // sendcount is converted down to int, so make sure it can be represented
208  TEUCHOS_TEST_FOR_EXCEPTION(sendcount > size_t(INT_MAX), std::logic_error,
209  "Tpetra::Distributor::doPosts(3 args, Kokkos): "
210  "Send count for block "
211  << p << " (" << sendcount
212  << ") is too large "
213  "to be represented as int.");
214  sendcounts[plan.getProcsTo()[p]] = static_cast<int>(sendcount);
215  }
216 
217  const size_type actualNumReceives =
218  Teuchos::as<size_type>(plan.getNumReceives()) +
219  Teuchos::as<size_type>(plan.hasSelfMessage() ? 1 : 0);
220  size_t curBufferOffset = 0;
221  for (size_type i = 0; i < actualNumReceives; ++i) {
222  const size_t curBufLen = plan.getLengthsFrom()[i] * numPackets;
223  TEUCHOS_TEST_FOR_EXCEPTION(
224  curBufferOffset + curBufLen > static_cast<size_t>(imports.size()),
225  std::logic_error,
226  "Tpetra::Distributor::doPosts(3 args, Kokkos): "
227  "Exceeded size of 'imports' array in packing loop on Process "
228  << myRank << ". imports.size() = " << imports.size()
229  << " < "
230  "curBufferOffset("
231  << curBufferOffset << ") + curBufLen(" << curBufLen << ").");
232  rdispls[plan.getProcsFrom()[i]] = curBufferOffset;
233  // curBufLen is converted down to int, so make sure it can be represented
234  TEUCHOS_TEST_FOR_EXCEPTION(curBufLen > size_t(INT_MAX), std::logic_error,
235  "Tpetra::Distributor::doPosts(3 args, Kokkos): "
236  "Recv count for receive "
237  << i << " (" << curBufLen
238  << ") is too large "
239  "to be represented as int.");
240  recvcounts[plan.getProcsFrom()[i]] = static_cast<int>(curBufLen);
241  curBufferOffset += curBufLen;
242  }
243 
244  using T = typename ExpView::non_const_value_type;
245  MPI_Datatype rawType = ::Tpetra::Details::MpiTypeTraits<T>::getType(T());
246 
247 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
248  if (Details::DISTRIBUTOR_MPIADVANCE_ALLTOALL == plan.getSendType()) {
249  MPIX_Comm *mpixComm = *plan.getMPIXComm();
250  TEUCHOS_TEST_FOR_EXCEPTION(
251  !mpixComm, std::runtime_error,
252  "plan's MPIX_Comm null in doPostsAllToAll, but "
253  "DISTRIBUTOR_MPIADVANCE_ALLTOALL set: plan.howInitialized()="
254  << DistributorHowInitializedEnumToString(plan.howInitialized()));
255 
256  const int err = MPIX_Alltoallv(
257  exports.data(), sendcounts.data(), sdispls.data(), rawType,
258  imports.data(), recvcounts.data(), rdispls.data(), rawType, mpixComm);
259 
260  TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error,
261  "MPIX_Alltoallv failed with error \""
262  << Teuchos::mpiErrorCodeToString(err)
263  << "\".");
264 
265  return;
266  }
267 #endif
268  Teuchos::RCP<const Teuchos::MpiComm<int>> mpiComm =
269  Teuchos::rcp_dynamic_cast<const Teuchos::MpiComm<int>>(comm);
270  Teuchos::RCP<const Teuchos::OpaqueWrapper<MPI_Comm>> rawComm =
271  mpiComm->getRawMpiComm();
272 
273  const int err = MPI_Alltoallv(
274  exports.data(), sendcounts.data(), sdispls.data(), rawType,
275  imports.data(), recvcounts.data(), rdispls.data(), rawType, (*rawComm)());
276 
277  TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error,
278  "MPI_Alltoallv failed with error \""
279  << Teuchos::mpiErrorCodeToString(err)
280  << "\".");
281 
282  return;
283 }
284 
285 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
286 template <class ExpView, class ImpView>
287 void DistributorActor::doPostsNbrAllToAllV(const DistributorPlan &plan,
288  const ExpView &exports,
289  size_t numPackets,
290  const ImpView &imports) {
291  TEUCHOS_TEST_FOR_EXCEPTION(
292  !plan.getIndicesTo().is_null(), std::runtime_error,
293  "Send Type=\"Alltoall\" only works for fast-path communication.");
294 
295  const int myRank = plan.getComm()->getRank();
296  MPIX_Comm *mpixComm = *plan.getMPIXComm();
297 
298  const size_t numSends = plan.getNumSends() + plan.hasSelfMessage();
299  const size_t numRecvs = plan.getNumReceives() + plan.hasSelfMessage();
300  std::vector<int> sendcounts(numSends, 0);
301  std::vector<int> sdispls(numSends, 0);
302  std::vector<int> recvcounts(numRecvs, 0);
303  std::vector<int> rdispls(numRecvs, 0);
304 
305  for (size_t p = 0; p < numSends; ++p) {
306  sdispls[p] = plan.getStartsTo()[p] * numPackets;
307  const size_t sendcount = plan.getLengthsTo()[p] * numPackets;
308  // sendcount is converted down to int, so make sure it can be represented
309  TEUCHOS_TEST_FOR_EXCEPTION(sendcount > size_t(INT_MAX), std::logic_error,
310  "Tpetra::Distributor::doPosts(3 args, Kokkos): "
311  "Send count for block "
312  << p << " (" << sendcount
313  << ") is too large "
314  "to be represented as int.");
315  sendcounts[p] = static_cast<int>(sendcount);
316  }
317 
318  size_t curBufferOffset = 0;
319  for (size_t i = 0; i < numRecvs; ++i) {
320  const size_t curBufLen = plan.getLengthsFrom()[i] * numPackets;
321  TEUCHOS_TEST_FOR_EXCEPTION(
322  curBufferOffset + curBufLen > static_cast<size_t>(imports.size()),
323  std::logic_error,
324  "Tpetra::Distributor::doPosts(3 args, Kokkos): "
325  "Exceeded size of 'imports' array in packing loop on Process "
326  << myRank << ". imports.size() = " << imports.size()
327  << " < "
328  "curBufferOffset("
329  << curBufferOffset << ") + curBufLen(" << curBufLen << ").");
330  rdispls[i] = curBufferOffset;
331  // curBufLen is converted down to int, so make sure it can be represented
332  TEUCHOS_TEST_FOR_EXCEPTION(curBufLen > size_t(INT_MAX), std::logic_error,
333  "Tpetra::Distributor::doPosts(3 args, Kokkos): "
334  "Recv count for receive "
335  << i << " (" << curBufLen
336  << ") is too large "
337  "to be represented as int.");
338  recvcounts[i] = static_cast<int>(curBufLen);
339  curBufferOffset += curBufLen;
340  }
341 
342  using T = typename ExpView::non_const_value_type;
343  MPI_Datatype rawType = ::Tpetra::Details::MpiTypeTraits<T>::getType(T());
344 
345  const int err = MPIX_Neighbor_alltoallv(
346  exports.data(), sendcounts.data(), sdispls.data(), rawType,
347  imports.data(), recvcounts.data(), rdispls.data(), rawType, mpixComm);
348 
349  TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error,
350  "MPIX_Neighbor_alltoallv failed with error \""
351  << Teuchos::mpiErrorCodeToString(err)
352  << "\".");
353 }
354 #endif // HAVE_TPETRACORE_MPI_ADVANCE
355 #endif // HAVE_TPETRA_MPI
356 // clang-format off
357 
358 template <class ExpView, class ImpView>
359 void DistributorActor::doPosts(const DistributorPlan& plan,
360  const ExpView& exports,
361  size_t numPackets,
362  const ImpView& imports)
363 {
364  static_assert(areKokkosViews<ExpView, ImpView>,
365  "Data arrays for DistributorActor::doPosts must be Kokkos::Views");
366  using Teuchos::Array;
367  using Teuchos::as;
368  using Teuchos::FancyOStream;
369  using Teuchos::includesVerbLevel;
370  using Teuchos::ireceive;
371  using Teuchos::isend;
372  using Teuchos::send;
373  using Teuchos::TypeNameTraits;
374  using Teuchos::typeName;
375  using std::endl;
376  using Kokkos::Compat::create_const_view;
377  using Kokkos::Compat::create_view;
378  using Kokkos::Compat::subview_offset;
379  using Kokkos::Compat::deep_copy_offset;
380  typedef Array<size_t>::size_type size_type;
381  typedef ExpView exports_view_type;
382  typedef ImpView imports_view_type;
383 
384 #ifdef KOKKOS_ENABLE_CUDA
385  static_assert
386  (! std::is_same<typename ExpView::memory_space, Kokkos::CudaUVMSpace>::value &&
387  ! std::is_same<typename ImpView::memory_space, Kokkos::CudaUVMSpace>::value,
388  "Please do not use Tpetra::Distributor with UVM allocations. "
389  "See Trilinos GitHub issue #1088.");
390 #endif // KOKKOS_ENABLE_CUDA
391 
392 #ifdef KOKKOS_ENABLE_SYCL
393  static_assert
394  (! std::is_same<typename ExpView::memory_space, Kokkos::Experimental::SYCLSharedUSMSpace>::value &&
395  ! std::is_same<typename ImpView::memory_space, Kokkos::Experimental::SYCLSharedUSMSpace>::value,
396  "Please do not use Tpetra::Distributor with SharedUSM allocations. "
397  "See Trilinos GitHub issue #1088 (corresponding to CUDA).");
398 #endif // KOKKOS_ENABLE_SYCL
399 
400 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
401  Teuchos::TimeMonitor timeMon (*timer_doPosts3KV_);
402 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
403 
404  const int myRank = plan.getComm()->getRank ();
405  // Run-time configurable parameters that come from the input
406  // ParameterList set by setParameterList().
407  const Details::EDistributorSendType sendType = plan.getSendType();
408 
409 //clang-format on
410 #if defined(HAVE_TPETRA_MPI)
411  // All-to-all communication layout is quite different from
412  // point-to-point, so we handle it separately.
413 
414  if (sendType == Details::DISTRIBUTOR_ALLTOALL) {
415  doPostsAllToAll(plan, exports,numPackets, imports);
416  return;
417  }
418 #ifdef HAVE_TPETRACORE_MPI_ADVANCE
419  else if (sendType == Details::DISTRIBUTOR_MPIADVANCE_ALLTOALL) {
420  doPostsAllToAll(plan, exports,numPackets, imports);
421  return;
422  } else if (sendType == Details::DISTRIBUTOR_MPIADVANCE_NBRALLTOALLV) {
423  doPostsNbrAllToAllV(plan, exports,numPackets, imports);
424  return;
425  }
426 #endif // defined(HAVE_TPETRACORE_MPI_ADVANCE)
427 // clang-format off
428 
429 #else // HAVE_TPETRA_MPI
430  if (plan.hasSelfMessage()) {
431  // This is how we "send a message to ourself": we copy from
432  // the export buffer to the import buffer. That saves
433  // Teuchos::Comm implementations other than MpiComm (in
434  // particular, SerialComm) the trouble of implementing self
435  // messages correctly. (To do this right, SerialComm would
436  // need internal buffer space for messages, keyed on the
437  // message's tag.)
438  size_t selfReceiveOffset = 0;
439  deep_copy_offset(imports, exports, selfReceiveOffset,
440  plan.getStartsTo()[0]*numPackets,
441  plan.getLengthsTo()[0]*numPackets);
442  }
443  // should we just return here?
444  // likely not as comm could be a serial comm
445 #endif // HAVE_TPETRA_MPI
446 
447  size_t selfReceiveOffset = 0;
448 
449 #ifdef HAVE_TPETRA_DEBUG
450  TEUCHOS_TEST_FOR_EXCEPTION
451  (requests_.size () != 0,
452  std::logic_error,
453  "Tpetra::Distributor::doPosts(3 args, Kokkos): Process "
454  << myRank << ": requests_.size() = " << requests_.size () << " != 0.");
455 #endif // HAVE_TPETRA_DEBUG
456 
457  // Distributor uses requests_.size() as the number of outstanding
458  // nonblocking message requests, so we resize to zero to maintain
459  // this invariant.
460  //
461  // getNumReceives() does _not_ include the self message, if there is
462  // one. Here, we do actually send a message to ourselves, so we
463  // include any self message in the "actual" number of receives to
464  // post.
465  //
466  // NOTE (mfh 19 Mar 2012): Epetra_MpiDistributor::DoPosts()
467  // doesn't (re)allocate its array of requests. That happens in
468  // CreateFromSends(), ComputeRecvs_(), DoReversePosts() (on
469  // demand), or Resize_().
470  const size_type actualNumReceives = as<size_type> (plan.getNumReceives()) +
471  as<size_type> (plan.hasSelfMessage() ? 1 : 0);
472  requests_.resize (0);
473 
474  // Post the nonblocking receives. It's common MPI wisdom to post
475  // receives before sends. In MPI terms, this means favoring
476  // adding to the "posted queue" (of receive requests) over adding
477  // to the "unexpected queue" (of arrived messages not yet matched
478  // with a receive).
479  {
480 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
481  Teuchos::TimeMonitor timeMonRecvs (*timer_doPosts3KV_recvs_);
482 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
483 
484  size_t curBufferOffset = 0;
485  for (size_type i = 0; i < actualNumReceives; ++i) {
486  const size_t curBufLen = plan.getLengthsFrom()[i] * numPackets;
487  if (plan.getProcsFrom()[i] != myRank) {
488  // If my process is receiving these packet(s) from another
489  // process (not a self-receive):
490  //
491  // 1. Set up the persisting view (recvBuf) of the imports
492  // array, given the offset and size (total number of
493  // packets from process getProcsFrom()[i]).
494  // 2. Start the Irecv and save the resulting request.
495  TEUCHOS_TEST_FOR_EXCEPTION(
496  curBufferOffset + curBufLen > static_cast<size_t> (imports.size ()),
497  std::logic_error, "Tpetra::Distributor::doPosts(3 args, Kokkos): "
498  "Exceeded size of 'imports' array in packing loop on Process " <<
499  myRank << ". imports.size() = " << imports.size () << " < "
500  "curBufferOffset(" << curBufferOffset << ") + curBufLen(" <<
501  curBufLen << ").");
502  imports_view_type recvBuf =
503  subview_offset (imports, curBufferOffset, curBufLen);
504  requests_.push_back (ireceive<int> (recvBuf, plan.getProcsFrom()[i],
505  mpiTag_, *plan.getComm()));
506  }
507  else { // Receiving from myself
508  selfReceiveOffset = curBufferOffset; // Remember the self-recv offset
509  }
510  curBufferOffset += curBufLen;
511  }
512  }
513 
514 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
515  Teuchos::TimeMonitor timeMonSends (*timer_doPosts3KV_sends_);
516 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
517 
518  // setup scan through getProcsTo() list starting with higher numbered procs
519  // (should help balance message traffic)
520  //
521  // FIXME (mfh 20 Feb 2013) Why haven't we precomputed this?
522  // It doesn't depend on the input at all.
523  size_t numBlocks = plan.getNumSends() + plan.hasSelfMessage();
524  size_t procIndex = 0;
525  while ((procIndex < numBlocks) && (plan.getProcsTo()[procIndex] < myRank)) {
526  ++procIndex;
527  }
528  if (procIndex == numBlocks) {
529  procIndex = 0;
530  }
531 
532  size_t selfNum = 0;
533  size_t selfIndex = 0;
534 
535  if (plan.getIndicesTo().is_null()) {
536 
537 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
538  Teuchos::TimeMonitor timeMonSends2 (*timer_doPosts3KV_sends_fast_);
539 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
540 
541  // Data are already blocked (laid out) by process, so we don't
542  // need a separate send buffer (besides the exports array).
543  for (size_t i = 0; i < numBlocks; ++i) {
544  size_t p = i + procIndex;
545  if (p > (numBlocks - 1)) {
546  p -= numBlocks;
547  }
548 
549  if (plan.getProcsTo()[p] != myRank) {
550  exports_view_type tmpSend = subview_offset(
551  exports, plan.getStartsTo()[p]*numPackets, plan.getLengthsTo()[p]*numPackets);
552 
553  if (sendType == Details::DISTRIBUTOR_ISEND) {
554  // NOTE: This looks very similar to the tmpSend above, but removing
555  // tmpSendBuf and uses tmpSend leads to a performance hit on Arm
556  // SerialNode builds
557  exports_view_type tmpSendBuf =
558  subview_offset (exports, plan.getStartsTo()[p] * numPackets,
559  plan.getLengthsTo()[p] * numPackets);
560  requests_.push_back (isend<int> (tmpSendBuf, plan.getProcsTo()[p],
561  mpiTag_, *plan.getComm()));
562  }
563  else { // DISTRIBUTOR_SEND
564  send<int> (tmpSend,
565  as<int> (tmpSend.size ()),
566  plan.getProcsTo()[p], mpiTag_, *plan.getComm());
567  }
568  }
569  else { // "Sending" the message to myself
570  selfNum = p;
571  }
572  }
573 
574  if (plan.hasSelfMessage()) {
575  // This is how we "send a message to ourself": we copy from
576  // the export buffer to the import buffer. That saves
577  // Teuchos::Comm implementations other than MpiComm (in
578  // particular, SerialComm) the trouble of implementing self
579  // messages correctly. (To do this right, SerialComm would
580  // need internal buffer space for messages, keyed on the
581  // message's tag.)
582  deep_copy_offset(imports, exports, selfReceiveOffset,
583  plan.getStartsTo()[selfNum]*numPackets,
584  plan.getLengthsTo()[selfNum]*numPackets);
585  }
586 
587  }
588  else { // data are not blocked by proc, use send buffer
589 
590 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
591  Teuchos::TimeMonitor timeMonSends2 (*timer_doPosts3KV_sends_slow_);
592 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
593 
594  typedef typename ExpView::non_const_value_type Packet;
595  typedef typename ExpView::array_layout Layout;
596  typedef typename ExpView::device_type Device;
597  typedef typename ExpView::memory_traits Mem;
598 
599  // This buffer is long enough for only one message at a time.
600  // Thus, we use DISTRIBUTOR_SEND always in this case, regardless
601  // of sendType requested by user.
602  // This code path formerly errored out with message:
603  // Tpetra::Distributor::doPosts(3 args, Kokkos):
604  // The "send buffer" code path
605  // doesn't currently work with nonblocking sends.
606  // Now, we opt to just do the communication in a way that works.
607 #ifdef HAVE_TPETRA_DEBUG
608  if (sendType != Details::DISTRIBUTOR_SEND) {
609  if (plan.getComm()->getRank() == 0)
610  std::cout << "The requested Tpetra send type "
612  << " requires Distributor data to be ordered by"
613  << " the receiving processor rank. Since these"
614  << " data are not ordered, Tpetra will use Send"
615  << " instead." << std::endl;
616  }
617 #endif
618 
619  Kokkos::View<Packet*,Layout,Device,Mem> sendArray ("sendArray",
620  plan.getMaxSendLength() * numPackets);
621 
622  for (size_t i = 0; i < numBlocks; ++i) {
623  size_t p = i + procIndex;
624  if (p > (numBlocks - 1)) {
625  p -= numBlocks;
626  }
627 
628  if (plan.getProcsTo()[p] != myRank) {
629  size_t sendArrayOffset = 0;
630  size_t j = plan.getStartsTo()[p];
631  for (size_t k = 0; k < plan.getLengthsTo()[p]; ++k, ++j) {
632  packOffset(sendArray, exports, sendArrayOffset, plan.getIndicesTo()[j]*numPackets, numPackets);
633  sendArrayOffset += numPackets;
634  }
635  typename ExpView::execution_space().fence();
636 
637  ImpView tmpSend =
638  subview_offset(sendArray, size_t(0), plan.getLengthsTo()[p]*numPackets);
639 
640  send<int> (tmpSend,
641  as<int> (tmpSend.size ()),
642  plan.getProcsTo()[p], mpiTag_, *plan.getComm());
643  }
644  else { // "Sending" the message to myself
645  selfNum = p;
646  selfIndex = plan.getStartsTo()[p];
647  }
648  }
649 
650  if (plan.hasSelfMessage()) {
651  for (size_t k = 0; k < plan.getLengthsTo()[selfNum]; ++k) {
652  packOffset(imports, exports, selfReceiveOffset, plan.getIndicesTo()[selfIndex]*numPackets, numPackets);
653  ++selfIndex;
654  selfReceiveOffset += numPackets;
655  }
656  }
657  }
658 }
659 
660 // clang-format on
661 #ifdef HAVE_TPETRA_MPI
662 template <class ExpView, class ImpView>
663 void DistributorActor::doPostsAllToAll(
664  const DistributorPlan &plan, const ExpView &exports,
665  const Teuchos::ArrayView<const size_t> &numExportPacketsPerLID,
666  const ImpView &imports,
667  const Teuchos::ArrayView<const size_t> &numImportPacketsPerLID) {
668  TEUCHOS_TEST_FOR_EXCEPTION(
669  !plan.getIndicesTo().is_null(), std::runtime_error,
670  "Send Type=\"Alltoall\" only works for fast-path communication.");
671 
672  using size_type = Teuchos::Array<size_t>::size_type;
673 
674  auto comm = plan.getComm();
675  std::vector<int> sendcounts(comm->getSize(), 0);
676  std::vector<int> sdispls(comm->getSize(), 0);
677  std::vector<int> recvcounts(comm->getSize(), 0);
678  std::vector<int> rdispls(comm->getSize(), 0);
679 
680  size_t curPKToffset = 0;
681  for (size_t pp = 0; pp < plan.getNumSends(); ++pp) {
682  sdispls[plan.getProcsTo()[pp]] = curPKToffset;
683  size_t numPackets = 0;
684  for (size_t j = plan.getStartsTo()[pp];
685  j < plan.getStartsTo()[pp] + plan.getLengthsTo()[pp]; ++j) {
686  numPackets += numExportPacketsPerLID[j];
687  }
688  // numPackets is converted down to int, so make sure it can be represented
689  TEUCHOS_TEST_FOR_EXCEPTION(numPackets > size_t(INT_MAX), std::logic_error,
690  "Tpetra::Distributor::doPosts(4 args, Kokkos): "
691  "Send count for send "
692  << pp << " (" << numPackets
693  << ") is too large "
694  "to be represented as int.");
695  sendcounts[plan.getProcsTo()[pp]] = static_cast<int>(numPackets);
696  curPKToffset += numPackets;
697  }
698 
699  const size_type actualNumReceives =
700  Teuchos::as<size_type>(plan.getNumReceives()) +
701  Teuchos::as<size_type>(plan.hasSelfMessage() ? 1 : 0);
702 
703  size_t curBufferOffset = 0;
704  size_t curLIDoffset = 0;
705  for (size_type i = 0; i < actualNumReceives; ++i) {
706  size_t totalPacketsFrom_i = 0;
707  for (size_t j = 0; j < plan.getLengthsFrom()[i]; ++j) {
708  totalPacketsFrom_i += numImportPacketsPerLID[curLIDoffset + j];
709  }
710  curLIDoffset += plan.getLengthsFrom()[i];
711 
712  rdispls[plan.getProcsFrom()[i]] = curBufferOffset;
713  // totalPacketsFrom_i is converted down to int, so make sure it can be
714  // represented
715  TEUCHOS_TEST_FOR_EXCEPTION(totalPacketsFrom_i > size_t(INT_MAX),
716  std::logic_error,
717  "Tpetra::Distributor::doPosts(3 args, Kokkos): "
718  "Recv count for receive "
719  << i << " (" << totalPacketsFrom_i
720  << ") is too large "
721  "to be represented as int.");
722  recvcounts[plan.getProcsFrom()[i]] = static_cast<int>(totalPacketsFrom_i);
723  curBufferOffset += totalPacketsFrom_i;
724  }
725 
726  Teuchos::RCP<const Teuchos::MpiComm<int>> mpiComm =
727  Teuchos::rcp_dynamic_cast<const Teuchos::MpiComm<int>>(comm);
728  Teuchos::RCP<const Teuchos::OpaqueWrapper<MPI_Comm>> rawComm =
729  mpiComm->getRawMpiComm();
730  using T = typename ExpView::non_const_value_type;
731  MPI_Datatype rawType = ::Tpetra::Details::MpiTypeTraits<T>::getType(T());
732 
733 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
734  if (Details::DISTRIBUTOR_MPIADVANCE_ALLTOALL == plan.getSendType()) {
735  MPIX_Comm *mpixComm = *plan.getMPIXComm();
736  TEUCHOS_TEST_FOR_EXCEPTION(!mpixComm, std::runtime_error,
737  "MPIX_Comm is null in doPostsAllToAll \""
738  << __FILE__ << ":" << __LINE__);
739 
740  const int err = MPIX_Alltoallv(
741  exports.data(), sendcounts.data(), sdispls.data(), rawType,
742  imports.data(), recvcounts.data(), rdispls.data(), rawType, mpixComm);
743 
744  TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error,
745  "MPIX_Alltoallv failed with error \""
746  << Teuchos::mpiErrorCodeToString(err)
747  << "\".");
748 
749  return;
750  }
751 #endif // HAVE_TPETRACORE_MPI_ADVANCE
752 
753  const int err = MPI_Alltoallv(
754  exports.data(), sendcounts.data(), sdispls.data(), rawType,
755  imports.data(), recvcounts.data(), rdispls.data(), rawType, (*rawComm)());
756 
757  TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error,
758  "MPI_Alltoallv failed with error \""
759  << Teuchos::mpiErrorCodeToString(err)
760  << "\".");
761 }
762 
763 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
764 template <class ExpView, class ImpView>
765 void DistributorActor::doPostsNbrAllToAllV(
766  const DistributorPlan &plan, const ExpView &exports,
767  const Teuchos::ArrayView<const size_t> &numExportPacketsPerLID,
768  const ImpView &imports,
769  const Teuchos::ArrayView<const size_t> &numImportPacketsPerLID) {
770  TEUCHOS_TEST_FOR_EXCEPTION(
771  !plan.getIndicesTo().is_null(), std::runtime_error,
772  "Send Type=\"Alltoall\" only works for fast-path communication.");
773 
774  const Teuchos_Ordinal numSends = plan.getProcsTo().size();
775  const Teuchos_Ordinal numRecvs = plan.getProcsFrom().size();
776 
777  auto comm = plan.getComm();
778  std::vector<int> sendcounts(numSends, 0);
779  std::vector<int> sdispls(numSends, 0);
780  std::vector<int> recvcounts(numRecvs, 0);
781  std::vector<int> rdispls(numRecvs, 0);
782 
783  Teuchos::RCP<const Teuchos::MpiComm<int>> mpiComm =
784  Teuchos::rcp_dynamic_cast<const Teuchos::MpiComm<int>>(comm);
785  Teuchos::RCP<const Teuchos::OpaqueWrapper<MPI_Comm>> rawComm =
786  mpiComm->getRawMpiComm();
787  using T = typename ExpView::non_const_value_type;
788  MPI_Datatype rawType = ::Tpetra::Details::MpiTypeTraits<T>::getType(T());
789 
790  // unlike standard alltoall, entry `i` in sdispls and sendcounts
791  // refer to the ith participating rank, rather than rank i
792  size_t curPKToffset = 0;
793  for (Teuchos_Ordinal pp = 0; pp < numSends; ++pp) {
794  sdispls[pp] = curPKToffset;
795  size_t numPackets = 0;
796  for (size_t j = plan.getStartsTo()[pp];
797  j < plan.getStartsTo()[pp] + plan.getLengthsTo()[pp]; ++j) {
798  numPackets += numExportPacketsPerLID[j];
799  }
800  // numPackets is converted down to int, so make sure it can be represented
801  TEUCHOS_TEST_FOR_EXCEPTION(numPackets > size_t(INT_MAX), std::logic_error,
802  "Tpetra::Distributor::doPosts(4 args, Kokkos): "
803  "Send count for send "
804  << pp << " (" << numPackets
805  << ") is too large "
806  "to be represented as int.");
807  sendcounts[pp] = static_cast<int>(numPackets);
808  curPKToffset += numPackets;
809  }
810  size_t curBufferOffset = 0;
811  size_t curLIDoffset = 0;
812  for (Teuchos_Ordinal i = 0; i < numRecvs; ++i) {
813  size_t totalPacketsFrom_i = 0;
814  for (size_t j = 0; j < plan.getLengthsFrom()[i]; ++j) {
815  totalPacketsFrom_i += numImportPacketsPerLID[curLIDoffset + j];
816  }
817  curLIDoffset += plan.getLengthsFrom()[i];
818 
819  rdispls[i] = curBufferOffset;
820  // totalPacketsFrom_i is converted down to int, so make sure it can be
821  // represented
822  TEUCHOS_TEST_FOR_EXCEPTION(totalPacketsFrom_i > size_t(INT_MAX),
823  std::logic_error,
824  "Tpetra::Distributor::doPosts(3 args, Kokkos): "
825  "Recv count for receive "
826  << i << " (" << totalPacketsFrom_i
827  << ") is too large "
828  "to be represented as int.");
829  recvcounts[i] = static_cast<int>(totalPacketsFrom_i);
830  curBufferOffset += totalPacketsFrom_i;
831  }
832 
833  MPIX_Comm *mpixComm = *plan.getMPIXComm();
834  const int err = MPIX_Neighbor_alltoallv(
835  exports.data(), sendcounts.data(), sdispls.data(), rawType,
836  imports.data(), recvcounts.data(), rdispls.data(), rawType, mpixComm);
837 
838  TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error,
839  "MPIX_Neighbor_alltoallv failed with error \""
840  << Teuchos::mpiErrorCodeToString(err)
841  << "\".");
842 }
843 #endif // HAVE_TPETRACORE_MPI_ADVANCE
844 #endif // HAVE_TPETRA_MPI
845  // clang-format off
846 
847 template <class ExpView, class ImpView>
848 void DistributorActor::doPosts(const DistributorPlan& plan,
849  const ExpView &exports,
850  const Teuchos::ArrayView<const size_t>& numExportPacketsPerLID,
851  const ImpView &imports,
852  const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID)
853 {
854  static_assert(areKokkosViews<ExpView, ImpView>,
855  "Data arrays for DistributorActor::doPosts must be Kokkos::Views");
856  using Teuchos::Array;
857  using Teuchos::as;
858  using Teuchos::ireceive;
859  using Teuchos::isend;
860  using Teuchos::send;
861  using Teuchos::TypeNameTraits;
862  using std::endl;
863  using Kokkos::Compat::create_const_view;
864  using Kokkos::Compat::create_view;
865  using Kokkos::Compat::subview_offset;
866  using Kokkos::Compat::deep_copy_offset;
867  typedef Array<size_t>::size_type size_type;
868  typedef ExpView exports_view_type;
869  typedef ImpView imports_view_type;
870 
871 #ifdef KOKKOS_ENABLE_CUDA
872  static_assert (! std::is_same<typename ExpView::memory_space, Kokkos::CudaUVMSpace>::value &&
873  ! std::is_same<typename ImpView::memory_space, Kokkos::CudaUVMSpace>::value,
874  "Please do not use Tpetra::Distributor with UVM "
875  "allocations. See GitHub issue #1088.");
876 #endif // KOKKOS_ENABLE_CUDA
877 
878 #ifdef KOKKOS_ENABLE_SYCL
879  static_assert (! std::is_same<typename ExpView::memory_space, Kokkos::Experimental::SYCLSharedUSMSpace>::value &&
880  ! std::is_same<typename ImpView::memory_space, Kokkos::Experimental::SYCLSharedUSMSpace>::value,
881  "Please do not use Tpetra::Distributor with SharedUSM "
882  "allocations. See GitHub issue #1088 (corresponding to CUDA).");
883 #endif // KOKKOS_ENABLE_SYCL
884 
885 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
886  Teuchos::TimeMonitor timeMon (*timer_doPosts4KV_);
887 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
888 
889  // Run-time configurable parameters that come from the input
890  // ParameterList set by setParameterList().
891  const Details::EDistributorSendType sendType = plan.getSendType();
892 
893 #ifdef HAVE_TPETRA_MPI
894  // All-to-all communication layout is quite different from
895  // point-to-point, so we handle it separately.
896  if (sendType == Details::DISTRIBUTOR_ALLTOALL) {
897  doPostsAllToAll(plan, exports, numExportPacketsPerLID, imports, numImportPacketsPerLID);
898  return;
899  }
900 #ifdef HAVE_TPETRACORE_MPI_ADVANCE
901  else if (sendType == Details::DISTRIBUTOR_MPIADVANCE_ALLTOALL)
902  {
903  doPostsAllToAll(plan, exports, numExportPacketsPerLID, imports, numImportPacketsPerLID);
904  return;
905  } else if (sendType == Details::DISTRIBUTOR_MPIADVANCE_NBRALLTOALLV) {
906  doPostsNbrAllToAllV(plan, exports, numExportPacketsPerLID, imports, numImportPacketsPerLID);
907  return;
908  }
909 #endif
910 
911 #else // HAVE_TPETRA_MPI
912  if (plan.hasSelfMessage()) {
913 
914  size_t selfReceiveOffset = 0;
915 
916  // setup arrays containing starting-offsets into exports for each send,
917  // and num-packets-to-send for each send.
918  Array<size_t> sendPacketOffsets(plan.getNumSends(),0), packetsPerSend(plan.getNumSends(),0);
919  size_t maxNumPackets = 0;
920  size_t curPKToffset = 0;
921  for (size_t pp=0; pp<plan.getNumSends(); ++pp) {
922  sendPacketOffsets[pp] = curPKToffset;
923  size_t numPackets = 0;
924  for (size_t j=plan.getStartsTo()[pp]; j<plan.getStartsTo()[pp]+plan.getLengthsTo()[pp]; ++j) {
925  numPackets += numExportPacketsPerLID[j];
926  }
927  if (numPackets > maxNumPackets) maxNumPackets = numPackets;
928  packetsPerSend[pp] = numPackets;
929  curPKToffset += numPackets;
930  }
931 
932  deep_copy_offset(imports, exports, selfReceiveOffset,
933  sendPacketOffsets[0], packetsPerSend[0]);
934  }
935 #endif // HAVE_TPETRA_MPI
936 
937  const int myProcID = plan.getComm()->getRank ();
938  size_t selfReceiveOffset = 0;
939 
940 #ifdef HAVE_TPETRA_DEBUG
941  // Different messages may have different numbers of packets.
942  size_t totalNumImportPackets = 0;
943  for (size_type ii = 0; ii < numImportPacketsPerLID.size (); ++ii) {
944  totalNumImportPackets += numImportPacketsPerLID[ii];
945  }
946  TEUCHOS_TEST_FOR_EXCEPTION(
947  imports.extent (0) < totalNumImportPackets, std::runtime_error,
948  "Tpetra::Distributor::doPosts(4 args, Kokkos): The 'imports' array must have "
949  "enough entries to hold the expected number of import packets. "
950  "imports.extent(0) = " << imports.extent (0) << " < "
951  "totalNumImportPackets = " << totalNumImportPackets << ".");
952  TEUCHOS_TEST_FOR_EXCEPTION
953  (requests_.size () != 0, std::logic_error, "Tpetra::Distributor::"
954  "doPosts(4 args, Kokkos): Process " << myProcID << ": requests_.size () = "
955  << requests_.size () << " != 0.");
956 #endif // HAVE_TPETRA_DEBUG
957  // Distributor uses requests_.size() as the number of outstanding
958  // nonblocking message requests, so we resize to zero to maintain
959  // this invariant.
960  //
961  // getNumReceives() does _not_ include the self message, if there is
962  // one. Here, we do actually send a message to ourselves, so we
963  // include any self message in the "actual" number of receives to
964  // post.
965  //
966  // NOTE (mfh 19 Mar 2012): Epetra_MpiDistributor::DoPosts()
967  // doesn't (re)allocate its array of requests. That happens in
968  // CreateFromSends(), ComputeRecvs_(), DoReversePosts() (on
969  // demand), or Resize_().
970  const size_type actualNumReceives = as<size_type> (plan.getNumReceives()) +
971  as<size_type> (plan.hasSelfMessage() ? 1 : 0);
972  requests_.resize (0);
973 
974  // Post the nonblocking receives. It's common MPI wisdom to post
975  // receives before sends. In MPI terms, this means favoring
976  // adding to the "posted queue" (of receive requests) over adding
977  // to the "unexpected queue" (of arrived messages not yet matched
978  // with a receive).
979  {
980 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
981  Teuchos::TimeMonitor timeMonRecvs (*timer_doPosts4KV_recvs_);
982 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
983 
984  size_t curBufferOffset = 0;
985  size_t curLIDoffset = 0;
986  for (size_type i = 0; i < actualNumReceives; ++i) {
987  size_t totalPacketsFrom_i = 0;
988  for (size_t j = 0; j < plan.getLengthsFrom()[i]; ++j) {
989  totalPacketsFrom_i += numImportPacketsPerLID[curLIDoffset+j];
990  }
991  // totalPacketsFrom_i is converted down to int, so make sure it can be represented
992  TEUCHOS_TEST_FOR_EXCEPTION(totalPacketsFrom_i > size_t(INT_MAX),
993  std::logic_error, "Tpetra::Distributor::doPosts(3 args, Kokkos): "
994  "Recv count for receive " << i << " (" << totalPacketsFrom_i << ") is too large "
995  "to be represented as int.");
996  curLIDoffset += plan.getLengthsFrom()[i];
997  if (plan.getProcsFrom()[i] != myProcID && totalPacketsFrom_i) {
998  // If my process is receiving these packet(s) from another
999  // process (not a self-receive), and if there is at least
1000  // one packet to receive:
1001  //
1002  // 1. Set up the persisting view (recvBuf) into the imports
1003  // array, given the offset and size (total number of
1004  // packets from process getProcsFrom()[i]).
1005  // 2. Start the Irecv and save the resulting request.
1006  imports_view_type recvBuf =
1007  subview_offset (imports, curBufferOffset, totalPacketsFrom_i);
1008  requests_.push_back (ireceive<int> (recvBuf, plan.getProcsFrom()[i],
1009  mpiTag_, *plan.getComm()));
1010  }
1011  else { // Receiving these packet(s) from myself
1012  selfReceiveOffset = curBufferOffset; // Remember the offset
1013  }
1014  curBufferOffset += totalPacketsFrom_i;
1015  }
1016  }
1017 
1018 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
1019  Teuchos::TimeMonitor timeMonSends (*timer_doPosts4KV_sends_);
1020 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
1021 
1022  // setup arrays containing starting-offsets into exports for each send,
1023  // and num-packets-to-send for each send.
1024  Array<size_t> sendPacketOffsets(plan.getNumSends(),0), packetsPerSend(plan.getNumSends(),0);
1025  size_t maxNumPackets = 0;
1026  size_t curPKToffset = 0;
1027  for (size_t pp=0; pp<plan.getNumSends(); ++pp) {
1028  sendPacketOffsets[pp] = curPKToffset;
1029  size_t numPackets = 0;
1030  for (size_t j=plan.getStartsTo()[pp]; j<plan.getStartsTo()[pp]+plan.getLengthsTo()[pp]; ++j) {
1031  numPackets += numExportPacketsPerLID[j];
1032  }
1033  if (numPackets > maxNumPackets) maxNumPackets = numPackets;
1034  // numPackets will be used as a message length, so make sure it can be represented as int
1035  TEUCHOS_TEST_FOR_EXCEPTION(numPackets > size_t(INT_MAX),
1036  std::logic_error, "Tpetra::Distributor::doPosts(4 args, Kokkos): "
1037  "packetsPerSend[" << pp << "] = " << numPackets << " is too large "
1038  "to be represented as int.");
1039  packetsPerSend[pp] = numPackets;
1040  curPKToffset += numPackets;
1041  }
1042 
1043  // setup scan through getProcsTo() list starting with higher numbered procs
1044  // (should help balance message traffic)
1045  size_t numBlocks = plan.getNumSends() + plan.hasSelfMessage();
1046  size_t procIndex = 0;
1047  while ((procIndex < numBlocks) && (plan.getProcsTo()[procIndex] < myProcID)) {
1048  ++procIndex;
1049  }
1050  if (procIndex == numBlocks) {
1051  procIndex = 0;
1052  }
1053 
1054  size_t selfNum = 0;
1055  size_t selfIndex = 0;
1056  if (plan.getIndicesTo().is_null()) {
1057 
1058 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
1059  Teuchos::TimeMonitor timeMonSends2 (*timer_doPosts4KV_sends_fast_);
1060 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
1061 
1062  // Data are already blocked (laid out) by process, so we don't
1063  // need a separate send buffer (besides the exports array).
1064  for (size_t i = 0; i < numBlocks; ++i) {
1065  size_t p = i + procIndex;
1066  if (p > (numBlocks - 1)) {
1067  p -= numBlocks;
1068  }
1069 
1070  if (plan.getProcsTo()[p] != myProcID && packetsPerSend[p] > 0) {
1071  exports_view_type tmpSend =
1072  subview_offset(exports, sendPacketOffsets[p], packetsPerSend[p]);
1073 
1074  if (sendType == Details::DISTRIBUTOR_ISEND) {
1075  exports_view_type tmpSendBuf =
1076  subview_offset (exports, sendPacketOffsets[p], packetsPerSend[p]);
1077  requests_.push_back (isend<int> (tmpSendBuf, plan.getProcsTo()[p],
1078  mpiTag_, *plan.getComm()));
1079  }
1080  else { // DISTRIBUTOR_SEND
1081  send<int> (tmpSend,
1082  as<int> (tmpSend.size ()),
1083  plan.getProcsTo()[p], mpiTag_, *plan.getComm());
1084  }
1085  }
1086  else { // "Sending" the message to myself
1087  selfNum = p;
1088  }
1089  }
1090 
1091  if (plan.hasSelfMessage()) {
1092  deep_copy_offset(imports, exports, selfReceiveOffset,
1093  sendPacketOffsets[selfNum], packetsPerSend[selfNum]);
1094  }
1095  }
1096  else { // data are not blocked by proc, use send buffer
1097 
1098 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
1099  Teuchos::TimeMonitor timeMonSends2 (*timer_doPosts4KV_sends_slow_);
1100 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
1101 
1102  // FIXME (mfh 05 Mar 2013) This may be broken for Isend.
1103  typedef typename ExpView::non_const_value_type Packet;
1104  typedef typename ExpView::array_layout Layout;
1105  typedef typename ExpView::device_type Device;
1106  typedef typename ExpView::memory_traits Mem;
1107 
1108  // This buffer is long enough for only one message at a time.
1109  // Thus, we use DISTRIBUTOR_SEND always in this case, regardless
1110  // of sendType requested by user.
1111  // This code path formerly errored out with message:
1112  // Tpetra::Distributor::doPosts(4-arg, Kokkos):
1113  // The "send buffer" code path
1114  // doesn't currently work with nonblocking sends.
1115  // Now, we opt to just do the communication in a way that works.
1116 #ifdef HAVE_TPETRA_DEBUG
1117  if (sendType != Details::DISTRIBUTOR_SEND) {
1118  if (plan.getComm()->getRank() == 0)
1119  std::cout << "The requested Tpetra send type "
1120  << DistributorSendTypeEnumToString(sendType)
1121  << " requires Distributor data to be ordered by"
1122  << " the receiving processor rank. Since these"
1123  << " data are not ordered, Tpetra will use Send"
1124  << " instead." << std::endl;
1125  }
1126 #endif
1127 
1128  Kokkos::View<Packet*,Layout,Device,Mem> sendArray ("sendArray",
1129  maxNumPackets);
1130 
1131  Array<size_t> indicesOffsets (numExportPacketsPerLID.size(), 0);
1132  size_t ioffset = 0;
1133  for (int j=0; j<numExportPacketsPerLID.size(); ++j) {
1134  indicesOffsets[j] = ioffset;
1135  ioffset += numExportPacketsPerLID[j];
1136  }
1137 
1138  for (size_t i = 0; i < numBlocks; ++i) {
1139  size_t p = i + procIndex;
1140  if (p > (numBlocks - 1)) {
1141  p -= numBlocks;
1142  }
1143 
1144  if (plan.getProcsTo()[p] != myProcID) {
1145  size_t sendArrayOffset = 0;
1146  size_t j = plan.getStartsTo()[p];
1147  size_t numPacketsTo_p = 0;
1148  for (size_t k = 0; k < plan.getLengthsTo()[p]; ++k, ++j) {
1149  numPacketsTo_p += numExportPacketsPerLID[j];
1150  deep_copy_offset(sendArray, exports, sendArrayOffset,
1151  indicesOffsets[j], numExportPacketsPerLID[j]);
1152  sendArrayOffset += numExportPacketsPerLID[j];
1153  }
1154  typename ExpView::execution_space().fence();
1155 
1156  if (numPacketsTo_p > 0) {
1157  ImpView tmpSend =
1158  subview_offset(sendArray, size_t(0), numPacketsTo_p);
1159 
1160  send<int> (tmpSend,
1161  as<int> (tmpSend.size ()),
1162  plan.getProcsTo()[p], mpiTag_, *plan.getComm());
1163  }
1164  }
1165  else { // "Sending" the message to myself
1166  selfNum = p;
1167  selfIndex = plan.getStartsTo()[p];
1168  }
1169  }
1170 
1171  if (plan.hasSelfMessage()) {
1172  for (size_t k = 0; k < plan.getLengthsTo()[selfNum]; ++k) {
1173  deep_copy_offset(imports, exports, selfReceiveOffset,
1174  indicesOffsets[selfIndex],
1175  numExportPacketsPerLID[selfIndex]);
1176  selfReceiveOffset += numExportPacketsPerLID[selfIndex];
1177  ++selfIndex;
1178  }
1179  }
1180  }
1181 }
1182 
1183 }
1184 }
1185 
1186 #endif
Add specializations of Teuchos::Details::MpiTypeTraits for Kokkos::complex&lt;float&gt; and Kokkos::complex...
std::string DistributorSendTypeEnumToString(EDistributorSendType sendType)
Convert an EDistributorSendType enum value to a string.
std::string DistributorHowInitializedEnumToString(EDistributorHowInitialized how)
Convert an EDistributorHowInitialized enum value to a string.
Stand-alone utility functions and macros.
EDistributorSendType
The type of MPI send that Distributor should use.