Tpetra parallel linear algebra  Version of the Day
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
Tpetra_Details_DistributorActor.hpp
1 // @HEADER
2 // ***********************************************************************
3 //
4 // Tpetra: Templated Linear Algebra Services Package
5 // Copyright (2008) Sandia Corporation
6 //
7 // Under the terms of Contract DE-AC04-94AL85000 with Sandia Corporation,
8 // the U.S. Government retains certain rights in this software.
9 //
10 // Redistribution and use in source and binary forms, with or without
11 // modification, are permitted provided that the following conditions are
12 // met:
13 //
14 // 1. Redistributions of source code must retain the above copyright
15 // notice, this list of conditions and the following disclaimer.
16 //
17 // 2. Redistributions in binary form must reproduce the above copyright
18 // notice, this list of conditions and the following disclaimer in the
19 // documentation and/or other materials provided with the distribution.
20 //
21 // 3. Neither the name of the Corporation nor the names of the
22 // contributors may be used to endorse or promote products derived from
23 // this software without specific prior written permission.
24 //
25 // THIS SOFTWARE IS PROVIDED BY SANDIA CORPORATION "AS IS" AND ANY
26 // EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
27 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
28 // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SANDIA CORPORATION OR THE
29 // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
30 // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
31 // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
32 // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
33 // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
34 // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
35 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
36 //
37 // ************************************************************************
38 // @HEADER
39 // clang-format off
40 
41 #ifndef TPETRA_DETAILS_DISTRIBUTOR_ACTOR_HPP
42 #define TPETRA_DETAILS_DISTRIBUTOR_ACTOR_HPP
43 
45 #include "Tpetra_Util.hpp"
46 
47 #include "Teuchos_Array.hpp"
48 #include "Teuchos_Comm.hpp"
50 #include "Teuchos_RCP.hpp"
51 #include "Teuchos_Time.hpp"
52 
53 #include "Kokkos_TeuchosCommAdapters.hpp"
54 
55 #ifdef HAVE_TPETRA_MPI
56 #include "mpi.h"
57 #endif
58 
59 namespace Tpetra {
60 namespace Details {
61 
62 template <class View1, class View2>
63 constexpr bool areKokkosViews = Kokkos::is_view<View1>::value && Kokkos::is_view<View2>::value;
64 
65 class DistributorActor {
66  static constexpr int DEFAULT_MPI_TAG = 1;
67 
68 public:
69  DistributorActor();
70  DistributorActor(const DistributorActor& otherActor);
71 
72  template <class ExpView, class ImpView>
73  void doPostsAndWaits(const DistributorPlan& plan,
74  const ExpView &exports,
75  size_t numPackets,
76  const ImpView &imports);
77 
78  template <class ExpView, class ImpView>
79  void doPostsAndWaits(const DistributorPlan& plan,
80  const ExpView &exports,
81  const Teuchos::ArrayView<const size_t>& numExportPacketsPerLID,
82  const ImpView &imports,
83  const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID);
84 
85  template <class ExpView, class ImpView>
86  void doPosts(const DistributorPlan& plan,
87  const ExpView& exports,
88  size_t numPackets,
89  const ImpView& imports);
90 
91  template <class ExpView, class ImpView>
92  void doPosts(const DistributorPlan& plan,
93  const ExpView &exports,
94  const Teuchos::ArrayView<const size_t>& numExportPacketsPerLID,
95  const ImpView &imports,
96  const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID);
97 
98  void doWaits(const DistributorPlan& plan);
99 
100  bool isReady() const;
101 
102 private:
103 // clang-format on
104 #ifdef HAVE_TPETRA_MPI
105  template <class ExpView, class ImpView>
106  void doPostsAllToAll(const DistributorPlan &plan, const ExpView &exports,
107  size_t numPackets, const ImpView &imports);
108 
109  template <class ExpView, class ImpView>
110  void doPostsAllToAll(
111  const DistributorPlan &plan, const ExpView &exports,
112  const Teuchos::ArrayView<const size_t> &numExportPacketsPerLID,
113  const ImpView &imports,
114  const Teuchos::ArrayView<const size_t> &numImportPacketsPerLID);
115 
116 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
117  template <class ExpView, class ImpView>
118  void doPostsNbrAllToAllV(const DistributorPlan &plan, const ExpView &exports,
119  size_t numPackets, const ImpView &imports);
120 
121  template <class ExpView, class ImpView>
122  void doPostsNbrAllToAllV(
123  const DistributorPlan &plan, const ExpView &exports,
124  const Teuchos::ArrayView<const size_t> &numExportPacketsPerLID,
125  const ImpView &imports,
126  const Teuchos::ArrayView<const size_t> &numImportPacketsPerLID);
127 #endif // HAVE_TPETRACORE_MPI_ADVANCE
128 #endif // HAVE_TPETRA_CORE
129  // clang-format off
130  int mpiTag_;
131 
132  Teuchos::Array<Teuchos::RCP<Teuchos::CommRequest<int>>> requests_;
133 
134 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
135  Teuchos::RCP<Teuchos::Time> timer_doPosts3KV_;
136  Teuchos::RCP<Teuchos::Time> timer_doPosts4KV_;
137  Teuchos::RCP<Teuchos::Time> timer_doWaits_;
138  Teuchos::RCP<Teuchos::Time> timer_doPosts3KV_recvs_;
139  Teuchos::RCP<Teuchos::Time> timer_doPosts4KV_recvs_;
140  Teuchos::RCP<Teuchos::Time> timer_doPosts3KV_barrier_;
141  Teuchos::RCP<Teuchos::Time> timer_doPosts4KV_barrier_;
142  Teuchos::RCP<Teuchos::Time> timer_doPosts3KV_sends_;
143  Teuchos::RCP<Teuchos::Time> timer_doPosts4KV_sends_;
144  Teuchos::RCP<Teuchos::Time> timer_doPosts3KV_sends_slow_;
145  Teuchos::RCP<Teuchos::Time> timer_doPosts4KV_sends_slow_;
146  Teuchos::RCP<Teuchos::Time> timer_doPosts3KV_sends_fast_;
147  Teuchos::RCP<Teuchos::Time> timer_doPosts4KV_sends_fast_;
148 
150  void makeTimers();
151 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
152 };
153 
154 template <class ExpView, class ImpView>
155 void DistributorActor::doPostsAndWaits(const DistributorPlan& plan,
156  const ExpView& exports,
157  size_t numPackets,
158  const ImpView& imports)
159 {
160  static_assert(areKokkosViews<ExpView, ImpView>,
161  "Data arrays for DistributorActor::doPostsAndWaits must be Kokkos::Views");
162  doPosts(plan, exports, numPackets, imports);
163  doWaits(plan);
164 }
165 
166 template <class ExpView, class ImpView>
167 void DistributorActor::doPostsAndWaits(const DistributorPlan& plan,
168  const ExpView& exports,
169  const Teuchos::ArrayView<const size_t>& numExportPacketsPerLID,
170  const ImpView& imports,
171  const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID)
172 {
173  static_assert(areKokkosViews<ExpView, ImpView>,
174  "Data arrays for DistributorActor::doPostsAndWaits must be Kokkos::Views");
175  doPosts(plan, exports, numExportPacketsPerLID, imports, numImportPacketsPerLID);
176  doWaits(plan);
177 }
178 
179 template <typename ViewType>
180 using HostAccessibility = Kokkos::SpaceAccessibility<Kokkos::DefaultHostExecutionSpace, typename ViewType::memory_space>;
181 
182 template <typename DstViewType, typename SrcViewType>
183 using enableIfHostAccessible = std::enable_if_t<HostAccessibility<DstViewType>::accessible &&
184  HostAccessibility<SrcViewType>::accessible>;
185 
186 template <typename DstViewType, typename SrcViewType>
187 using enableIfNotHostAccessible = std::enable_if_t<!HostAccessibility<DstViewType>::accessible ||
188  !HostAccessibility<SrcViewType>::accessible>;
189 
190 template <typename DstViewType, typename SrcViewType>
191 enableIfHostAccessible<DstViewType, SrcViewType>
192 packOffset(const DstViewType& dst,
193  const SrcViewType& src,
194  const size_t dst_offset,
195  const size_t src_offset,
196  const size_t size)
197 {
198  memcpy(dst.data()+dst_offset, src.data()+src_offset, size*sizeof(typename DstViewType::value_type));
199 }
200 
201 template <typename DstViewType, typename SrcViewType>
202 enableIfNotHostAccessible<DstViewType, SrcViewType>
203 packOffset(const DstViewType& dst,
204  const SrcViewType& src,
205  const size_t dst_offset,
206  const size_t src_offset,
207  const size_t size)
208 {
209  Kokkos::Compat::deep_copy_offset(dst, src, dst_offset, src_offset, size);
210 }
211 
212 // clang-format on
213 #ifdef HAVE_TPETRA_MPI
214 template <class ExpView, class ImpView>
215 void DistributorActor::doPostsAllToAll(const DistributorPlan &plan,
216  const ExpView &exports,
217  size_t numPackets,
218  const ImpView &imports) {
219  using size_type = Teuchos::Array<size_t>::size_type;
220 
221  TEUCHOS_TEST_FOR_EXCEPTION(
222  !plan.getIndicesTo().is_null(), std::runtime_error,
223  "Send Type=\"Alltoall\" only works for fast-path communication.");
224 
225  auto comm = plan.getComm();
226  const int myRank = comm->getRank();
227  std::vector<int> sendcounts(comm->getSize(), 0);
228  std::vector<int> sdispls(comm->getSize(), 0);
229  std::vector<int> recvcounts(comm->getSize(), 0);
230  std::vector<int> rdispls(comm->getSize(), 0);
231 
232  size_t numBlocks = plan.getNumSends() + plan.hasSelfMessage();
233  for (size_t p = 0; p < numBlocks; ++p) {
234  sdispls[plan.getProcsTo()[p]] = plan.getStartsTo()[p] * numPackets;
235  size_t sendcount = plan.getLengthsTo()[p] * numPackets;
236  // sendcount is converted down to int, so make sure it can be represented
237  TEUCHOS_TEST_FOR_EXCEPTION(sendcount > size_t(INT_MAX), std::logic_error,
238  "Tpetra::Distributor::doPosts(3 args, Kokkos): "
239  "Send count for block "
240  << p << " (" << sendcount
241  << ") is too large "
242  "to be represented as int.");
243  sendcounts[plan.getProcsTo()[p]] = static_cast<int>(sendcount);
244  }
245 
246  const size_type actualNumReceives =
247  Teuchos::as<size_type>(plan.getNumReceives()) +
248  Teuchos::as<size_type>(plan.hasSelfMessage() ? 1 : 0);
249  size_t curBufferOffset = 0;
250  for (size_type i = 0; i < actualNumReceives; ++i) {
251  const size_t curBufLen = plan.getLengthsFrom()[i] * numPackets;
252  TEUCHOS_TEST_FOR_EXCEPTION(
253  curBufferOffset + curBufLen > static_cast<size_t>(imports.size()),
254  std::logic_error,
255  "Tpetra::Distributor::doPosts(3 args, Kokkos): "
256  "Exceeded size of 'imports' array in packing loop on Process "
257  << myRank << ". imports.size() = " << imports.size()
258  << " < "
259  "curBufferOffset("
260  << curBufferOffset << ") + curBufLen(" << curBufLen << ").");
261  rdispls[plan.getProcsFrom()[i]] = curBufferOffset;
262  // curBufLen is converted down to int, so make sure it can be represented
263  TEUCHOS_TEST_FOR_EXCEPTION(curBufLen > size_t(INT_MAX), std::logic_error,
264  "Tpetra::Distributor::doPosts(3 args, Kokkos): "
265  "Recv count for receive "
266  << i << " (" << curBufLen
267  << ") is too large "
268  "to be represented as int.");
269  recvcounts[plan.getProcsFrom()[i]] = static_cast<int>(curBufLen);
270  curBufferOffset += curBufLen;
271  }
272 
273  using T = typename ExpView::non_const_value_type;
274  MPI_Datatype rawType = ::Tpetra::Details::MpiTypeTraits<T>::getType(T());
275 
276 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
277  if (Details::DISTRIBUTOR_MPIADVANCE_ALLTOALL == plan.getSendType()) {
278  MPIX_Comm *mpixComm = *plan.getMPIXComm();
279  TEUCHOS_TEST_FOR_EXCEPTION(
280  !mpixComm, std::runtime_error,
281  "plan's MPIX_Comm null in doPostsAllToAll, but "
282  "DISTRIBUTOR_MPIADVANCE_ALLTOALL set: plan.howInitialized()="
283  << DistributorHowInitializedEnumToString(plan.howInitialized()));
284 
285  const int err = MPIX_Alltoallv(
286  exports.data(), sendcounts.data(), sdispls.data(), rawType,
287  imports.data(), recvcounts.data(), rdispls.data(), rawType, mpixComm);
288 
289  TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error,
290  "MPIX_Alltoallv failed with error \""
291  << Teuchos::mpiErrorCodeToString(err)
292  << "\".");
293 
294  return;
295  }
296 #endif
297  Teuchos::RCP<const Teuchos::MpiComm<int>> mpiComm =
298  Teuchos::rcp_dynamic_cast<const Teuchos::MpiComm<int>>(comm);
299  Teuchos::RCP<const Teuchos::OpaqueWrapper<MPI_Comm>> rawComm =
300  mpiComm->getRawMpiComm();
301 
302  const int err = MPI_Alltoallv(
303  exports.data(), sendcounts.data(), sdispls.data(), rawType,
304  imports.data(), recvcounts.data(), rdispls.data(), rawType, (*rawComm)());
305 
306  TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error,
307  "MPI_Alltoallv failed with error \""
308  << Teuchos::mpiErrorCodeToString(err)
309  << "\".");
310 
311  return;
312 }
313 
314 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
315 template <class ExpView, class ImpView>
316 void DistributorActor::doPostsNbrAllToAllV(const DistributorPlan &plan,
317  const ExpView &exports,
318  size_t numPackets,
319  const ImpView &imports) {
320  TEUCHOS_TEST_FOR_EXCEPTION(
321  !plan.getIndicesTo().is_null(), std::runtime_error,
322  "Send Type=\"Alltoall\" only works for fast-path communication.");
323 
324  const int myRank = plan.getComm()->getRank();
325  MPIX_Comm *mpixComm = *plan.getMPIXComm();
326 
327  const size_t numSends = plan.getNumSends() + plan.hasSelfMessage();
328  const size_t numRecvs = plan.getNumReceives() + plan.hasSelfMessage();
329  std::vector<int> sendcounts(numSends, 0);
330  std::vector<int> sdispls(numSends, 0);
331  std::vector<int> recvcounts(numRecvs, 0);
332  std::vector<int> rdispls(numRecvs, 0);
333 
334  for (size_t p = 0; p < numSends; ++p) {
335  sdispls[p] = plan.getStartsTo()[p] * numPackets;
336  const size_t sendcount = plan.getLengthsTo()[p] * numPackets;
337  // sendcount is converted down to int, so make sure it can be represented
338  TEUCHOS_TEST_FOR_EXCEPTION(sendcount > size_t(INT_MAX), std::logic_error,
339  "Tpetra::Distributor::doPosts(3 args, Kokkos): "
340  "Send count for block "
341  << p << " (" << sendcount
342  << ") is too large "
343  "to be represented as int.");
344  sendcounts[p] = static_cast<int>(sendcount);
345  }
346 
347  size_t curBufferOffset = 0;
348  for (size_t i = 0; i < numRecvs; ++i) {
349  const size_t curBufLen = plan.getLengthsFrom()[i] * numPackets;
350  TEUCHOS_TEST_FOR_EXCEPTION(
351  curBufferOffset + curBufLen > static_cast<size_t>(imports.size()),
352  std::logic_error,
353  "Tpetra::Distributor::doPosts(3 args, Kokkos): "
354  "Exceeded size of 'imports' array in packing loop on Process "
355  << myRank << ". imports.size() = " << imports.size()
356  << " < "
357  "curBufferOffset("
358  << curBufferOffset << ") + curBufLen(" << curBufLen << ").");
359  rdispls[i] = curBufferOffset;
360  // curBufLen is converted down to int, so make sure it can be represented
361  TEUCHOS_TEST_FOR_EXCEPTION(curBufLen > size_t(INT_MAX), std::logic_error,
362  "Tpetra::Distributor::doPosts(3 args, Kokkos): "
363  "Recv count for receive "
364  << i << " (" << curBufLen
365  << ") is too large "
366  "to be represented as int.");
367  recvcounts[i] = static_cast<int>(curBufLen);
368  curBufferOffset += curBufLen;
369  }
370 
371  using T = typename ExpView::non_const_value_type;
372  MPI_Datatype rawType = ::Tpetra::Details::MpiTypeTraits<T>::getType(T());
373 
374  const int err = MPIX_Neighbor_alltoallv(
375  exports.data(), sendcounts.data(), sdispls.data(), rawType,
376  imports.data(), recvcounts.data(), rdispls.data(), rawType, mpixComm);
377 
378  TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error,
379  "MPIX_Neighbor_alltoallv failed with error \""
380  << Teuchos::mpiErrorCodeToString(err)
381  << "\".");
382 }
383 #endif // HAVE_TPETRACORE_MPI_ADVANCE
384 #endif // HAVE_TPETRA_MPI
385 // clang-format off
386 
387 template <class ExpView, class ImpView>
388 void DistributorActor::doPosts(const DistributorPlan& plan,
389  const ExpView& exports,
390  size_t numPackets,
391  const ImpView& imports)
392 {
393  static_assert(areKokkosViews<ExpView, ImpView>,
394  "Data arrays for DistributorActor::doPosts must be Kokkos::Views");
395  using Teuchos::Array;
396  using Teuchos::as;
397  using Teuchos::FancyOStream;
398  using Teuchos::includesVerbLevel;
399  using Teuchos::ireceive;
400  using Teuchos::isend;
401  using Teuchos::send;
402  using Teuchos::TypeNameTraits;
403  using Teuchos::typeName;
404  using std::endl;
405  using Kokkos::Compat::create_const_view;
406  using Kokkos::Compat::create_view;
407  using Kokkos::Compat::subview_offset;
408  using Kokkos::Compat::deep_copy_offset;
409  typedef Array<size_t>::size_type size_type;
410  typedef ExpView exports_view_type;
411  typedef ImpView imports_view_type;
412 
413 #ifdef KOKKOS_ENABLE_CUDA
414  static_assert
415  (! std::is_same<typename ExpView::memory_space, Kokkos::CudaUVMSpace>::value &&
416  ! std::is_same<typename ImpView::memory_space, Kokkos::CudaUVMSpace>::value,
417  "Please do not use Tpetra::Distributor with UVM allocations. "
418  "See Trilinos GitHub issue #1088.");
419 #endif // KOKKOS_ENABLE_CUDA
420 
421 #ifdef KOKKOS_ENABLE_SYCL
422  static_assert
423  (! std::is_same<typename ExpView::memory_space, Kokkos::Experimental::SYCLSharedUSMSpace>::value &&
424  ! std::is_same<typename ImpView::memory_space, Kokkos::Experimental::SYCLSharedUSMSpace>::value,
425  "Please do not use Tpetra::Distributor with SharedUSM allocations. "
426  "See Trilinos GitHub issue #1088 (corresponding to CUDA).");
427 #endif // KOKKOS_ENABLE_SYCL
428 
429 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
430  Teuchos::TimeMonitor timeMon (*timer_doPosts3KV_);
431 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
432 
433  const int myRank = plan.getComm()->getRank ();
434  // Run-time configurable parameters that come from the input
435  // ParameterList set by setParameterList().
436  const Details::EDistributorSendType sendType = plan.getSendType();
437 
438 //clang-format on
439 #if defined(HAVE_TPETRA_MPI)
440  // All-to-all communication layout is quite different from
441  // point-to-point, so we handle it separately.
442 
443  if (sendType == Details::DISTRIBUTOR_ALLTOALL) {
444  doPostsAllToAll(plan, exports,numPackets, imports);
445  return;
446  }
447 #ifdef HAVE_TPETRACORE_MPI_ADVANCE
448  else if (sendType == Details::DISTRIBUTOR_MPIADVANCE_ALLTOALL) {
449  doPostsAllToAll(plan, exports,numPackets, imports);
450  return;
451  } else if (sendType == Details::DISTRIBUTOR_MPIADVANCE_NBRALLTOALLV) {
452  doPostsNbrAllToAllV(plan, exports,numPackets, imports);
453  return;
454  }
455 #endif // defined(HAVE_TPETRACORE_MPI_ADVANCE)
456 // clang-format off
457 
458 #else // HAVE_TPETRA_MPI
459  if (plan.hasSelfMessage()) {
460  // This is how we "send a message to ourself": we copy from
461  // the export buffer to the import buffer. That saves
462  // Teuchos::Comm implementations other than MpiComm (in
463  // particular, SerialComm) the trouble of implementing self
464  // messages correctly. (To do this right, SerialComm would
465  // need internal buffer space for messages, keyed on the
466  // message's tag.)
467  size_t selfReceiveOffset = 0;
468  deep_copy_offset(imports, exports, selfReceiveOffset,
469  plan.getStartsTo()[0]*numPackets,
470  plan.getLengthsTo()[0]*numPackets);
471  }
472  // should we just return here?
473  // likely not as comm could be a serial comm
474 #endif // HAVE_TPETRA_MPI
475 
476  size_t selfReceiveOffset = 0;
477 
478 #ifdef HAVE_TPETRA_DEBUG
479  TEUCHOS_TEST_FOR_EXCEPTION
480  (requests_.size () != 0,
481  std::logic_error,
482  "Tpetra::Distributor::doPosts(3 args, Kokkos): Process "
483  << myRank << ": requests_.size() = " << requests_.size () << " != 0.");
484 #endif // HAVE_TPETRA_DEBUG
485 
486  // Distributor uses requests_.size() as the number of outstanding
487  // nonblocking message requests, so we resize to zero to maintain
488  // this invariant.
489  //
490  // getNumReceives() does _not_ include the self message, if there is
491  // one. Here, we do actually send a message to ourselves, so we
492  // include any self message in the "actual" number of receives to
493  // post.
494  //
495  // NOTE (mfh 19 Mar 2012): Epetra_MpiDistributor::DoPosts()
496  // doesn't (re)allocate its array of requests. That happens in
497  // CreateFromSends(), ComputeRecvs_(), DoReversePosts() (on
498  // demand), or Resize_().
499  const size_type actualNumReceives = as<size_type> (plan.getNumReceives()) +
500  as<size_type> (plan.hasSelfMessage() ? 1 : 0);
501  requests_.resize (0);
502 
503  // Post the nonblocking receives. It's common MPI wisdom to post
504  // receives before sends. In MPI terms, this means favoring
505  // adding to the "posted queue" (of receive requests) over adding
506  // to the "unexpected queue" (of arrived messages not yet matched
507  // with a receive).
508  {
509 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
510  Teuchos::TimeMonitor timeMonRecvs (*timer_doPosts3KV_recvs_);
511 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
512 
513  size_t curBufferOffset = 0;
514  for (size_type i = 0; i < actualNumReceives; ++i) {
515  const size_t curBufLen = plan.getLengthsFrom()[i] * numPackets;
516  if (plan.getProcsFrom()[i] != myRank) {
517  // If my process is receiving these packet(s) from another
518  // process (not a self-receive):
519  //
520  // 1. Set up the persisting view (recvBuf) of the imports
521  // array, given the offset and size (total number of
522  // packets from process getProcsFrom()[i]).
523  // 2. Start the Irecv and save the resulting request.
524  TEUCHOS_TEST_FOR_EXCEPTION(
525  curBufferOffset + curBufLen > static_cast<size_t> (imports.size ()),
526  std::logic_error, "Tpetra::Distributor::doPosts(3 args, Kokkos): "
527  "Exceeded size of 'imports' array in packing loop on Process " <<
528  myRank << ". imports.size() = " << imports.size () << " < "
529  "curBufferOffset(" << curBufferOffset << ") + curBufLen(" <<
530  curBufLen << ").");
531  imports_view_type recvBuf =
532  subview_offset (imports, curBufferOffset, curBufLen);
533  requests_.push_back (ireceive<int> (recvBuf, plan.getProcsFrom()[i],
534  mpiTag_, *plan.getComm()));
535  }
536  else { // Receiving from myself
537  selfReceiveOffset = curBufferOffset; // Remember the self-recv offset
538  }
539  curBufferOffset += curBufLen;
540  }
541  }
542 
543 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
544  Teuchos::TimeMonitor timeMonSends (*timer_doPosts3KV_sends_);
545 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
546 
547  // setup scan through getProcsTo() list starting with higher numbered procs
548  // (should help balance message traffic)
549  //
550  // FIXME (mfh 20 Feb 2013) Why haven't we precomputed this?
551  // It doesn't depend on the input at all.
552  size_t numBlocks = plan.getNumSends() + plan.hasSelfMessage();
553  size_t procIndex = 0;
554  while ((procIndex < numBlocks) && (plan.getProcsTo()[procIndex] < myRank)) {
555  ++procIndex;
556  }
557  if (procIndex == numBlocks) {
558  procIndex = 0;
559  }
560 
561  size_t selfNum = 0;
562  size_t selfIndex = 0;
563 
564  if (plan.getIndicesTo().is_null()) {
565 
566 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
567  Teuchos::TimeMonitor timeMonSends2 (*timer_doPosts3KV_sends_fast_);
568 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
569 
570  // Data are already blocked (laid out) by process, so we don't
571  // need a separate send buffer (besides the exports array).
572  for (size_t i = 0; i < numBlocks; ++i) {
573  size_t p = i + procIndex;
574  if (p > (numBlocks - 1)) {
575  p -= numBlocks;
576  }
577 
578  if (plan.getProcsTo()[p] != myRank) {
579  exports_view_type tmpSend = subview_offset(
580  exports, plan.getStartsTo()[p]*numPackets, plan.getLengthsTo()[p]*numPackets);
581 
582  if (sendType == Details::DISTRIBUTOR_ISEND) {
583  // NOTE: This looks very similar to the tmpSend above, but removing
584  // tmpSendBuf and uses tmpSend leads to a performance hit on Arm
585  // SerialNode builds
586  exports_view_type tmpSendBuf =
587  subview_offset (exports, plan.getStartsTo()[p] * numPackets,
588  plan.getLengthsTo()[p] * numPackets);
589  requests_.push_back (isend<int> (tmpSendBuf, plan.getProcsTo()[p],
590  mpiTag_, *plan.getComm()));
591  }
592  else { // DISTRIBUTOR_SEND
593  send<int> (tmpSend,
594  as<int> (tmpSend.size ()),
595  plan.getProcsTo()[p], mpiTag_, *plan.getComm());
596  }
597  }
598  else { // "Sending" the message to myself
599  selfNum = p;
600  }
601  }
602 
603  if (plan.hasSelfMessage()) {
604  // This is how we "send a message to ourself": we copy from
605  // the export buffer to the import buffer. That saves
606  // Teuchos::Comm implementations other than MpiComm (in
607  // particular, SerialComm) the trouble of implementing self
608  // messages correctly. (To do this right, SerialComm would
609  // need internal buffer space for messages, keyed on the
610  // message's tag.)
611  deep_copy_offset(imports, exports, selfReceiveOffset,
612  plan.getStartsTo()[selfNum]*numPackets,
613  plan.getLengthsTo()[selfNum]*numPackets);
614  }
615 
616  }
617  else { // data are not blocked by proc, use send buffer
618 
619 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
620  Teuchos::TimeMonitor timeMonSends2 (*timer_doPosts3KV_sends_slow_);
621 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
622 
623  typedef typename ExpView::non_const_value_type Packet;
624  typedef typename ExpView::array_layout Layout;
625  typedef typename ExpView::device_type Device;
626  typedef typename ExpView::memory_traits Mem;
627 
628  // This buffer is long enough for only one message at a time.
629  // Thus, we use DISTRIBUTOR_SEND always in this case, regardless
630  // of sendType requested by user.
631  // This code path formerly errored out with message:
632  // Tpetra::Distributor::doPosts(3 args, Kokkos):
633  // The "send buffer" code path
634  // doesn't currently work with nonblocking sends.
635  // Now, we opt to just do the communication in a way that works.
636 #ifdef HAVE_TPETRA_DEBUG
637  if (sendType != Details::DISTRIBUTOR_SEND) {
638  if (plan.getComm()->getRank() == 0)
639  std::cout << "The requested Tpetra send type "
641  << " requires Distributor data to be ordered by"
642  << " the receiving processor rank. Since these"
643  << " data are not ordered, Tpetra will use Send"
644  << " instead." << std::endl;
645  }
646 #endif
647 
648  Kokkos::View<Packet*,Layout,Device,Mem> sendArray ("sendArray",
649  plan.getMaxSendLength() * numPackets);
650 
651  for (size_t i = 0; i < numBlocks; ++i) {
652  size_t p = i + procIndex;
653  if (p > (numBlocks - 1)) {
654  p -= numBlocks;
655  }
656 
657  if (plan.getProcsTo()[p] != myRank) {
658  size_t sendArrayOffset = 0;
659  size_t j = plan.getStartsTo()[p];
660  for (size_t k = 0; k < plan.getLengthsTo()[p]; ++k, ++j) {
661  packOffset(sendArray, exports, sendArrayOffset, plan.getIndicesTo()[j]*numPackets, numPackets);
662  sendArrayOffset += numPackets;
663  }
664  typename ExpView::execution_space().fence();
665 
666  ImpView tmpSend =
667  subview_offset(sendArray, size_t(0), plan.getLengthsTo()[p]*numPackets);
668 
669  send<int> (tmpSend,
670  as<int> (tmpSend.size ()),
671  plan.getProcsTo()[p], mpiTag_, *plan.getComm());
672  }
673  else { // "Sending" the message to myself
674  selfNum = p;
675  selfIndex = plan.getStartsTo()[p];
676  }
677  }
678 
679  if (plan.hasSelfMessage()) {
680  for (size_t k = 0; k < plan.getLengthsTo()[selfNum]; ++k) {
681  packOffset(imports, exports, selfReceiveOffset, plan.getIndicesTo()[selfIndex]*numPackets, numPackets);
682  ++selfIndex;
683  selfReceiveOffset += numPackets;
684  }
685  }
686  }
687 }
688 
689 // clang-format on
690 #ifdef HAVE_TPETRA_MPI
691 template <class ExpView, class ImpView>
692 void DistributorActor::doPostsAllToAll(
693  const DistributorPlan &plan, const ExpView &exports,
694  const Teuchos::ArrayView<const size_t> &numExportPacketsPerLID,
695  const ImpView &imports,
696  const Teuchos::ArrayView<const size_t> &numImportPacketsPerLID) {
697  TEUCHOS_TEST_FOR_EXCEPTION(
698  !plan.getIndicesTo().is_null(), std::runtime_error,
699  "Send Type=\"Alltoall\" only works for fast-path communication.");
700 
701  using size_type = Teuchos::Array<size_t>::size_type;
702 
703  auto comm = plan.getComm();
704  std::vector<int> sendcounts(comm->getSize(), 0);
705  std::vector<int> sdispls(comm->getSize(), 0);
706  std::vector<int> recvcounts(comm->getSize(), 0);
707  std::vector<int> rdispls(comm->getSize(), 0);
708 
709  size_t curPKToffset = 0;
710  for (size_t pp = 0; pp < plan.getNumSends(); ++pp) {
711  sdispls[plan.getProcsTo()[pp]] = curPKToffset;
712  size_t numPackets = 0;
713  for (size_t j = plan.getStartsTo()[pp];
714  j < plan.getStartsTo()[pp] + plan.getLengthsTo()[pp]; ++j) {
715  numPackets += numExportPacketsPerLID[j];
716  }
717  // numPackets is converted down to int, so make sure it can be represented
718  TEUCHOS_TEST_FOR_EXCEPTION(numPackets > size_t(INT_MAX), std::logic_error,
719  "Tpetra::Distributor::doPosts(4 args, Kokkos): "
720  "Send count for send "
721  << pp << " (" << numPackets
722  << ") is too large "
723  "to be represented as int.");
724  sendcounts[plan.getProcsTo()[pp]] = static_cast<int>(numPackets);
725  curPKToffset += numPackets;
726  }
727 
728  const size_type actualNumReceives =
729  Teuchos::as<size_type>(plan.getNumReceives()) +
730  Teuchos::as<size_type>(plan.hasSelfMessage() ? 1 : 0);
731 
732  size_t curBufferOffset = 0;
733  size_t curLIDoffset = 0;
734  for (size_type i = 0; i < actualNumReceives; ++i) {
735  size_t totalPacketsFrom_i = 0;
736  for (size_t j = 0; j < plan.getLengthsFrom()[i]; ++j) {
737  totalPacketsFrom_i += numImportPacketsPerLID[curLIDoffset + j];
738  }
739  curLIDoffset += plan.getLengthsFrom()[i];
740 
741  rdispls[plan.getProcsFrom()[i]] = curBufferOffset;
742  // totalPacketsFrom_i is converted down to int, so make sure it can be
743  // represented
744  TEUCHOS_TEST_FOR_EXCEPTION(totalPacketsFrom_i > size_t(INT_MAX),
745  std::logic_error,
746  "Tpetra::Distributor::doPosts(3 args, Kokkos): "
747  "Recv count for receive "
748  << i << " (" << totalPacketsFrom_i
749  << ") is too large "
750  "to be represented as int.");
751  recvcounts[plan.getProcsFrom()[i]] = static_cast<int>(totalPacketsFrom_i);
752  curBufferOffset += totalPacketsFrom_i;
753  }
754 
755  Teuchos::RCP<const Teuchos::MpiComm<int>> mpiComm =
756  Teuchos::rcp_dynamic_cast<const Teuchos::MpiComm<int>>(comm);
757  Teuchos::RCP<const Teuchos::OpaqueWrapper<MPI_Comm>> rawComm =
758  mpiComm->getRawMpiComm();
759  using T = typename ExpView::non_const_value_type;
760  MPI_Datatype rawType = ::Tpetra::Details::MpiTypeTraits<T>::getType(T());
761 
762 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
763  if (Details::DISTRIBUTOR_MPIADVANCE_ALLTOALL == plan.getSendType()) {
764  MPIX_Comm *mpixComm = *plan.getMPIXComm();
765  TEUCHOS_TEST_FOR_EXCEPTION(!mpixComm, std::runtime_error,
766  "MPIX_Comm is null in doPostsAllToAll \""
767  << __FILE__ << ":" << __LINE__);
768 
769  const int err = MPIX_Alltoallv(
770  exports.data(), sendcounts.data(), sdispls.data(), rawType,
771  imports.data(), recvcounts.data(), rdispls.data(), rawType, mpixComm);
772 
773  TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error,
774  "MPIX_Alltoallv failed with error \""
775  << Teuchos::mpiErrorCodeToString(err)
776  << "\".");
777 
778  return;
779  }
780 #endif // HAVE_TPETRACORE_MPI_ADVANCE
781 
782  const int err = MPI_Alltoallv(
783  exports.data(), sendcounts.data(), sdispls.data(), rawType,
784  imports.data(), recvcounts.data(), rdispls.data(), rawType, (*rawComm)());
785 
786  TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error,
787  "MPI_Alltoallv failed with error \""
788  << Teuchos::mpiErrorCodeToString(err)
789  << "\".");
790 }
791 
792 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
793 template <class ExpView, class ImpView>
794 void DistributorActor::doPostsNbrAllToAllV(
795  const DistributorPlan &plan, const ExpView &exports,
796  const Teuchos::ArrayView<const size_t> &numExportPacketsPerLID,
797  const ImpView &imports,
798  const Teuchos::ArrayView<const size_t> &numImportPacketsPerLID) {
799  TEUCHOS_TEST_FOR_EXCEPTION(
800  !plan.getIndicesTo().is_null(), std::runtime_error,
801  "Send Type=\"Alltoall\" only works for fast-path communication.");
802 
803  const Teuchos_Ordinal numSends = plan.getProcsTo().size();
804  const Teuchos_Ordinal numRecvs = plan.getProcsFrom().size();
805 
806  auto comm = plan.getComm();
807  std::vector<int> sendcounts(numSends, 0);
808  std::vector<int> sdispls(numSends, 0);
809  std::vector<int> recvcounts(numRecvs, 0);
810  std::vector<int> rdispls(numRecvs, 0);
811 
812  Teuchos::RCP<const Teuchos::MpiComm<int>> mpiComm =
813  Teuchos::rcp_dynamic_cast<const Teuchos::MpiComm<int>>(comm);
814  Teuchos::RCP<const Teuchos::OpaqueWrapper<MPI_Comm>> rawComm =
815  mpiComm->getRawMpiComm();
816  using T = typename ExpView::non_const_value_type;
817  MPI_Datatype rawType = ::Tpetra::Details::MpiTypeTraits<T>::getType(T());
818 
819  // unlike standard alltoall, entry `i` in sdispls and sendcounts
820  // refer to the ith participating rank, rather than rank i
821  size_t curPKToffset = 0;
822  for (Teuchos_Ordinal pp = 0; pp < numSends; ++pp) {
823  sdispls[pp] = curPKToffset;
824  size_t numPackets = 0;
825  for (size_t j = plan.getStartsTo()[pp];
826  j < plan.getStartsTo()[pp] + plan.getLengthsTo()[pp]; ++j) {
827  numPackets += numExportPacketsPerLID[j];
828  }
829  // numPackets is converted down to int, so make sure it can be represented
830  TEUCHOS_TEST_FOR_EXCEPTION(numPackets > size_t(INT_MAX), std::logic_error,
831  "Tpetra::Distributor::doPosts(4 args, Kokkos): "
832  "Send count for send "
833  << pp << " (" << numPackets
834  << ") is too large "
835  "to be represented as int.");
836  sendcounts[pp] = static_cast<int>(numPackets);
837  curPKToffset += numPackets;
838  }
839  size_t curBufferOffset = 0;
840  size_t curLIDoffset = 0;
841  for (Teuchos_Ordinal i = 0; i < numRecvs; ++i) {
842  size_t totalPacketsFrom_i = 0;
843  for (size_t j = 0; j < plan.getLengthsFrom()[i]; ++j) {
844  totalPacketsFrom_i += numImportPacketsPerLID[curLIDoffset + j];
845  }
846  curLIDoffset += plan.getLengthsFrom()[i];
847 
848  rdispls[i] = curBufferOffset;
849  // totalPacketsFrom_i is converted down to int, so make sure it can be
850  // represented
851  TEUCHOS_TEST_FOR_EXCEPTION(totalPacketsFrom_i > size_t(INT_MAX),
852  std::logic_error,
853  "Tpetra::Distributor::doPosts(3 args, Kokkos): "
854  "Recv count for receive "
855  << i << " (" << totalPacketsFrom_i
856  << ") is too large "
857  "to be represented as int.");
858  recvcounts[i] = static_cast<int>(totalPacketsFrom_i);
859  curBufferOffset += totalPacketsFrom_i;
860  }
861 
862  MPIX_Comm *mpixComm = *plan.getMPIXComm();
863  const int err = MPIX_Neighbor_alltoallv(
864  exports.data(), sendcounts.data(), sdispls.data(), rawType,
865  imports.data(), recvcounts.data(), rdispls.data(), rawType, mpixComm);
866 
867  TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error,
868  "MPIX_Neighbor_alltoallv failed with error \""
869  << Teuchos::mpiErrorCodeToString(err)
870  << "\".");
871 }
872 #endif // HAVE_TPETRACORE_MPI_ADVANCE
873 #endif // HAVE_TPETRA_MPI
874  // clang-format off
875 
876 template <class ExpView, class ImpView>
877 void DistributorActor::doPosts(const DistributorPlan& plan,
878  const ExpView &exports,
879  const Teuchos::ArrayView<const size_t>& numExportPacketsPerLID,
880  const ImpView &imports,
881  const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID)
882 {
883  static_assert(areKokkosViews<ExpView, ImpView>,
884  "Data arrays for DistributorActor::doPosts must be Kokkos::Views");
885  using Teuchos::Array;
886  using Teuchos::as;
887  using Teuchos::ireceive;
888  using Teuchos::isend;
889  using Teuchos::send;
890  using Teuchos::TypeNameTraits;
891  using std::endl;
892  using Kokkos::Compat::create_const_view;
893  using Kokkos::Compat::create_view;
894  using Kokkos::Compat::subview_offset;
895  using Kokkos::Compat::deep_copy_offset;
896  typedef Array<size_t>::size_type size_type;
897  typedef ExpView exports_view_type;
898  typedef ImpView imports_view_type;
899 
900 #ifdef KOKKOS_ENABLE_CUDA
901  static_assert (! std::is_same<typename ExpView::memory_space, Kokkos::CudaUVMSpace>::value &&
902  ! std::is_same<typename ImpView::memory_space, Kokkos::CudaUVMSpace>::value,
903  "Please do not use Tpetra::Distributor with UVM "
904  "allocations. See GitHub issue #1088.");
905 #endif // KOKKOS_ENABLE_CUDA
906 
907 #ifdef KOKKOS_ENABLE_SYCL
908  static_assert (! std::is_same<typename ExpView::memory_space, Kokkos::Experimental::SYCLSharedUSMSpace>::value &&
909  ! std::is_same<typename ImpView::memory_space, Kokkos::Experimental::SYCLSharedUSMSpace>::value,
910  "Please do not use Tpetra::Distributor with SharedUSM "
911  "allocations. See GitHub issue #1088 (corresponding to CUDA).");
912 #endif // KOKKOS_ENABLE_SYCL
913 
914 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
915  Teuchos::TimeMonitor timeMon (*timer_doPosts4KV_);
916 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
917 
918  // Run-time configurable parameters that come from the input
919  // ParameterList set by setParameterList().
920  const Details::EDistributorSendType sendType = plan.getSendType();
921 
922 #ifdef HAVE_TPETRA_MPI
923  // All-to-all communication layout is quite different from
924  // point-to-point, so we handle it separately.
925  if (sendType == Details::DISTRIBUTOR_ALLTOALL) {
926  doPostsAllToAll(plan, exports, numExportPacketsPerLID, imports, numImportPacketsPerLID);
927  return;
928  }
929 #ifdef HAVE_TPETRACORE_MPI_ADVANCE
930  else if (sendType == Details::DISTRIBUTOR_MPIADVANCE_ALLTOALL)
931  {
932  doPostsAllToAll(plan, exports, numExportPacketsPerLID, imports, numImportPacketsPerLID);
933  return;
934  } else if (sendType == Details::DISTRIBUTOR_MPIADVANCE_NBRALLTOALLV) {
935  doPostsNbrAllToAllV(plan, exports, numExportPacketsPerLID, imports, numImportPacketsPerLID);
936  return;
937  }
938 #endif
939 
940 #else // HAVE_TPETRA_MPI
941  if (plan.hasSelfMessage()) {
942 
943  size_t selfReceiveOffset = 0;
944 
945  // setup arrays containing starting-offsets into exports for each send,
946  // and num-packets-to-send for each send.
947  Array<size_t> sendPacketOffsets(plan.getNumSends(),0), packetsPerSend(plan.getNumSends(),0);
948  size_t maxNumPackets = 0;
949  size_t curPKToffset = 0;
950  for (size_t pp=0; pp<plan.getNumSends(); ++pp) {
951  sendPacketOffsets[pp] = curPKToffset;
952  size_t numPackets = 0;
953  for (size_t j=plan.getStartsTo()[pp]; j<plan.getStartsTo()[pp]+plan.getLengthsTo()[pp]; ++j) {
954  numPackets += numExportPacketsPerLID[j];
955  }
956  if (numPackets > maxNumPackets) maxNumPackets = numPackets;
957  packetsPerSend[pp] = numPackets;
958  curPKToffset += numPackets;
959  }
960 
961  deep_copy_offset(imports, exports, selfReceiveOffset,
962  sendPacketOffsets[0], packetsPerSend[0]);
963  }
964 #endif // HAVE_TPETRA_MPI
965 
966  const int myProcID = plan.getComm()->getRank ();
967  size_t selfReceiveOffset = 0;
968 
969 #ifdef HAVE_TPETRA_DEBUG
970  // Different messages may have different numbers of packets.
971  size_t totalNumImportPackets = 0;
972  for (size_type ii = 0; ii < numImportPacketsPerLID.size (); ++ii) {
973  totalNumImportPackets += numImportPacketsPerLID[ii];
974  }
975  TEUCHOS_TEST_FOR_EXCEPTION(
976  imports.extent (0) < totalNumImportPackets, std::runtime_error,
977  "Tpetra::Distributor::doPosts(4 args, Kokkos): The 'imports' array must have "
978  "enough entries to hold the expected number of import packets. "
979  "imports.extent(0) = " << imports.extent (0) << " < "
980  "totalNumImportPackets = " << totalNumImportPackets << ".");
981  TEUCHOS_TEST_FOR_EXCEPTION
982  (requests_.size () != 0, std::logic_error, "Tpetra::Distributor::"
983  "doPosts(4 args, Kokkos): Process " << myProcID << ": requests_.size () = "
984  << requests_.size () << " != 0.");
985 #endif // HAVE_TPETRA_DEBUG
986  // Distributor uses requests_.size() as the number of outstanding
987  // nonblocking message requests, so we resize to zero to maintain
988  // this invariant.
989  //
990  // getNumReceives() does _not_ include the self message, if there is
991  // one. Here, we do actually send a message to ourselves, so we
992  // include any self message in the "actual" number of receives to
993  // post.
994  //
995  // NOTE (mfh 19 Mar 2012): Epetra_MpiDistributor::DoPosts()
996  // doesn't (re)allocate its array of requests. That happens in
997  // CreateFromSends(), ComputeRecvs_(), DoReversePosts() (on
998  // demand), or Resize_().
999  const size_type actualNumReceives = as<size_type> (plan.getNumReceives()) +
1000  as<size_type> (plan.hasSelfMessage() ? 1 : 0);
1001  requests_.resize (0);
1002 
1003  // Post the nonblocking receives. It's common MPI wisdom to post
1004  // receives before sends. In MPI terms, this means favoring
1005  // adding to the "posted queue" (of receive requests) over adding
1006  // to the "unexpected queue" (of arrived messages not yet matched
1007  // with a receive).
1008  {
1009 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
1010  Teuchos::TimeMonitor timeMonRecvs (*timer_doPosts4KV_recvs_);
1011 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
1012 
1013  size_t curBufferOffset = 0;
1014  size_t curLIDoffset = 0;
1015  for (size_type i = 0; i < actualNumReceives; ++i) {
1016  size_t totalPacketsFrom_i = 0;
1017  for (size_t j = 0; j < plan.getLengthsFrom()[i]; ++j) {
1018  totalPacketsFrom_i += numImportPacketsPerLID[curLIDoffset+j];
1019  }
1020  // totalPacketsFrom_i is converted down to int, so make sure it can be represented
1021  TEUCHOS_TEST_FOR_EXCEPTION(totalPacketsFrom_i > size_t(INT_MAX),
1022  std::logic_error, "Tpetra::Distributor::doPosts(3 args, Kokkos): "
1023  "Recv count for receive " << i << " (" << totalPacketsFrom_i << ") is too large "
1024  "to be represented as int.");
1025  curLIDoffset += plan.getLengthsFrom()[i];
1026  if (plan.getProcsFrom()[i] != myProcID && totalPacketsFrom_i) {
1027  // If my process is receiving these packet(s) from another
1028  // process (not a self-receive), and if there is at least
1029  // one packet to receive:
1030  //
1031  // 1. Set up the persisting view (recvBuf) into the imports
1032  // array, given the offset and size (total number of
1033  // packets from process getProcsFrom()[i]).
1034  // 2. Start the Irecv and save the resulting request.
1035  imports_view_type recvBuf =
1036  subview_offset (imports, curBufferOffset, totalPacketsFrom_i);
1037  requests_.push_back (ireceive<int> (recvBuf, plan.getProcsFrom()[i],
1038  mpiTag_, *plan.getComm()));
1039  }
1040  else { // Receiving these packet(s) from myself
1041  selfReceiveOffset = curBufferOffset; // Remember the offset
1042  }
1043  curBufferOffset += totalPacketsFrom_i;
1044  }
1045  }
1046 
1047 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
1048  Teuchos::TimeMonitor timeMonSends (*timer_doPosts4KV_sends_);
1049 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
1050 
1051  // setup arrays containing starting-offsets into exports for each send,
1052  // and num-packets-to-send for each send.
1053  Array<size_t> sendPacketOffsets(plan.getNumSends(),0), packetsPerSend(plan.getNumSends(),0);
1054  size_t maxNumPackets = 0;
1055  size_t curPKToffset = 0;
1056  for (size_t pp=0; pp<plan.getNumSends(); ++pp) {
1057  sendPacketOffsets[pp] = curPKToffset;
1058  size_t numPackets = 0;
1059  for (size_t j=plan.getStartsTo()[pp]; j<plan.getStartsTo()[pp]+plan.getLengthsTo()[pp]; ++j) {
1060  numPackets += numExportPacketsPerLID[j];
1061  }
1062  if (numPackets > maxNumPackets) maxNumPackets = numPackets;
1063  // numPackets will be used as a message length, so make sure it can be represented as int
1064  TEUCHOS_TEST_FOR_EXCEPTION(numPackets > size_t(INT_MAX),
1065  std::logic_error, "Tpetra::Distributor::doPosts(4 args, Kokkos): "
1066  "packetsPerSend[" << pp << "] = " << numPackets << " is too large "
1067  "to be represented as int.");
1068  packetsPerSend[pp] = numPackets;
1069  curPKToffset += numPackets;
1070  }
1071 
1072  // setup scan through getProcsTo() list starting with higher numbered procs
1073  // (should help balance message traffic)
1074  size_t numBlocks = plan.getNumSends() + plan.hasSelfMessage();
1075  size_t procIndex = 0;
1076  while ((procIndex < numBlocks) && (plan.getProcsTo()[procIndex] < myProcID)) {
1077  ++procIndex;
1078  }
1079  if (procIndex == numBlocks) {
1080  procIndex = 0;
1081  }
1082 
1083  size_t selfNum = 0;
1084  size_t selfIndex = 0;
1085  if (plan.getIndicesTo().is_null()) {
1086 
1087 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
1088  Teuchos::TimeMonitor timeMonSends2 (*timer_doPosts4KV_sends_fast_);
1089 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
1090 
1091  // Data are already blocked (laid out) by process, so we don't
1092  // need a separate send buffer (besides the exports array).
1093  for (size_t i = 0; i < numBlocks; ++i) {
1094  size_t p = i + procIndex;
1095  if (p > (numBlocks - 1)) {
1096  p -= numBlocks;
1097  }
1098 
1099  if (plan.getProcsTo()[p] != myProcID && packetsPerSend[p] > 0) {
1100  exports_view_type tmpSend =
1101  subview_offset(exports, sendPacketOffsets[p], packetsPerSend[p]);
1102 
1103  if (sendType == Details::DISTRIBUTOR_ISEND) {
1104  exports_view_type tmpSendBuf =
1105  subview_offset (exports, sendPacketOffsets[p], packetsPerSend[p]);
1106  requests_.push_back (isend<int> (tmpSendBuf, plan.getProcsTo()[p],
1107  mpiTag_, *plan.getComm()));
1108  }
1109  else { // DISTRIBUTOR_SEND
1110  send<int> (tmpSend,
1111  as<int> (tmpSend.size ()),
1112  plan.getProcsTo()[p], mpiTag_, *plan.getComm());
1113  }
1114  }
1115  else { // "Sending" the message to myself
1116  selfNum = p;
1117  }
1118  }
1119 
1120  if (plan.hasSelfMessage()) {
1121  deep_copy_offset(imports, exports, selfReceiveOffset,
1122  sendPacketOffsets[selfNum], packetsPerSend[selfNum]);
1123  }
1124  }
1125  else { // data are not blocked by proc, use send buffer
1126 
1127 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
1128  Teuchos::TimeMonitor timeMonSends2 (*timer_doPosts4KV_sends_slow_);
1129 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
1130 
1131  // FIXME (mfh 05 Mar 2013) This may be broken for Isend.
1132  typedef typename ExpView::non_const_value_type Packet;
1133  typedef typename ExpView::array_layout Layout;
1134  typedef typename ExpView::device_type Device;
1135  typedef typename ExpView::memory_traits Mem;
1136 
1137  // This buffer is long enough for only one message at a time.
1138  // Thus, we use DISTRIBUTOR_SEND always in this case, regardless
1139  // of sendType requested by user.
1140  // This code path formerly errored out with message:
1141  // Tpetra::Distributor::doPosts(4-arg, Kokkos):
1142  // The "send buffer" code path
1143  // doesn't currently work with nonblocking sends.
1144  // Now, we opt to just do the communication in a way that works.
1145 #ifdef HAVE_TPETRA_DEBUG
1146  if (sendType != Details::DISTRIBUTOR_SEND) {
1147  if (plan.getComm()->getRank() == 0)
1148  std::cout << "The requested Tpetra send type "
1149  << DistributorSendTypeEnumToString(sendType)
1150  << " requires Distributor data to be ordered by"
1151  << " the receiving processor rank. Since these"
1152  << " data are not ordered, Tpetra will use Send"
1153  << " instead." << std::endl;
1154  }
1155 #endif
1156 
1157  Kokkos::View<Packet*,Layout,Device,Mem> sendArray ("sendArray",
1158  maxNumPackets);
1159 
1160  Array<size_t> indicesOffsets (numExportPacketsPerLID.size(), 0);
1161  size_t ioffset = 0;
1162  for (int j=0; j<numExportPacketsPerLID.size(); ++j) {
1163  indicesOffsets[j] = ioffset;
1164  ioffset += numExportPacketsPerLID[j];
1165  }
1166 
1167  for (size_t i = 0; i < numBlocks; ++i) {
1168  size_t p = i + procIndex;
1169  if (p > (numBlocks - 1)) {
1170  p -= numBlocks;
1171  }
1172 
1173  if (plan.getProcsTo()[p] != myProcID) {
1174  size_t sendArrayOffset = 0;
1175  size_t j = plan.getStartsTo()[p];
1176  size_t numPacketsTo_p = 0;
1177  for (size_t k = 0; k < plan.getLengthsTo()[p]; ++k, ++j) {
1178  numPacketsTo_p += numExportPacketsPerLID[j];
1179  deep_copy_offset(sendArray, exports, sendArrayOffset,
1180  indicesOffsets[j], numExportPacketsPerLID[j]);
1181  sendArrayOffset += numExportPacketsPerLID[j];
1182  }
1183  typename ExpView::execution_space().fence();
1184 
1185  if (numPacketsTo_p > 0) {
1186  ImpView tmpSend =
1187  subview_offset(sendArray, size_t(0), numPacketsTo_p);
1188 
1189  send<int> (tmpSend,
1190  as<int> (tmpSend.size ()),
1191  plan.getProcsTo()[p], mpiTag_, *plan.getComm());
1192  }
1193  }
1194  else { // "Sending" the message to myself
1195  selfNum = p;
1196  selfIndex = plan.getStartsTo()[p];
1197  }
1198  }
1199 
1200  if (plan.hasSelfMessage()) {
1201  for (size_t k = 0; k < plan.getLengthsTo()[selfNum]; ++k) {
1202  deep_copy_offset(imports, exports, selfReceiveOffset,
1203  indicesOffsets[selfIndex],
1204  numExportPacketsPerLID[selfIndex]);
1205  selfReceiveOffset += numExportPacketsPerLID[selfIndex];
1206  ++selfIndex;
1207  }
1208  }
1209  }
1210 }
1211 
1212 }
1213 }
1214 
1215 #endif
Add specializations of Teuchos::Details::MpiTypeTraits for Kokkos::complex&lt;float&gt; and Kokkos::complex...
std::string DistributorSendTypeEnumToString(EDistributorSendType sendType)
Convert an EDistributorSendType enum value to a string.
std::string DistributorHowInitializedEnumToString(EDistributorHowInitialized how)
Convert an EDistributorHowInitialized enum value to a string.
Stand-alone utility functions and macros.
EDistributorSendType
The type of MPI send that Distributor should use.