Tpetra parallel linear algebra  Version of the Day
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
Tpetra_Details_DistributorActor.hpp
1 // @HEADER
2 // *****************************************************************************
3 // Tpetra: Templated Linear Algebra Services Package
4 //
5 // Copyright 2008 NTESS and the Tpetra contributors.
6 // SPDX-License-Identifier: BSD-3-Clause
7 // *****************************************************************************
8 // @HEADER
9 
10 #ifndef TPETRA_DETAILS_DISTRIBUTOR_ACTOR_HPP
11 #define TPETRA_DETAILS_DISTRIBUTOR_ACTOR_HPP
12 
13 #include <optional>
14 #include <vector>
15 
16 #include "Teuchos_Assert.hpp"
18 #include "Tpetra_Util.hpp"
19 
20 #include "Teuchos_Array.hpp"
21 #include "Teuchos_Comm.hpp"
24 #include "Teuchos_RCP.hpp"
25 
26 #include "Kokkos_TeuchosCommAdapters.hpp"
27 
28 #ifdef HAVE_TPETRA_MPI
29 #include "mpi.h"
30 #include "Tpetra_Details_Ialltofewv.hpp"
31 #endif
32 
33 namespace Tpetra::Details {
34 
35 template <class View>
36 constexpr bool isKokkosView = Kokkos::is_view<View>::value;
37 
38 template <class View1, class View2>
39 constexpr bool areKokkosViews = Kokkos::is_view<View1>::value&& Kokkos::is_view<View2>::value;
40 
41 class DistributorActor {
42  static constexpr int DEFAULT_MPI_TAG = 1;
43 
44  using IndexView = DistributorPlan::IndexView;
45  using SubViewLimits = DistributorPlan::SubViewLimits;
46 
47  public:
48  DistributorActor();
49  DistributorActor(const DistributorActor& otherActor) = default;
50 
51  template <class ExpView, class ImpView>
52  void doPostsAndWaits(const DistributorPlan& plan,
53  const ExpView& exports,
54  size_t numPackets,
55  const ImpView& imports);
56 
57  template <class ExpView, class ImpView>
58  void doPostsAndWaits(const DistributorPlan& plan,
59  const ExpView& exports,
60  const Teuchos::ArrayView<const size_t>& numExportPacketsPerLID,
61  const ImpView& imports,
62  const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID);
63 
64  template <class ImpView>
65  void doPostRecvs(const DistributorPlan& plan,
66  size_t numPackets,
67  const ImpView& imports);
68 
69  template <class ImpView>
70  void doPostRecvs(const DistributorPlan& plan,
71  const ImpView& imports,
72  const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID);
73 
74  template <class ExpView, class ImpView>
75  void doPostSends(const DistributorPlan& plan,
76  const ExpView& exports,
77  size_t numPackets,
78  const ImpView& imports);
79 
80  template <class ExpView, class ImpView>
81  void doPostSends(const DistributorPlan& plan,
82  const ExpView& exports,
83  const Teuchos::ArrayView<const size_t>& numExportPacketsPerLID,
84  const ImpView& imports,
85  const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID);
86 
87  template <class ExpView, class ImpView>
88  void doPosts(const DistributorPlan& plan,
89  const ExpView& exports,
90  size_t numPackets,
91  const ImpView& imports);
92 
93  template <class ExpView, class ImpView>
94  void doPosts(const DistributorPlan& plan,
95  const ExpView& exports,
96  const Teuchos::ArrayView<const size_t>& numExportPacketsPerLID,
97  const ImpView& imports,
98  const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID);
99 
100  void doWaits(const DistributorPlan& plan);
101 
102  void doWaitsRecv(const DistributorPlan& plan);
103 
104  void doWaitsSend(const DistributorPlan& plan);
105 
106  void doWaitsIalltofewv(const DistributorPlan& plan);
107 
108  bool isReady() const;
109 
110  private:
111  template <class ImpView>
112  void doPostRecvsImpl(const DistributorPlan& plan,
113  const ImpView& imports,
114  const SubViewLimits& totalPacketsFrom);
115 
116  template <class ExpView, class ImpView>
117  void doPostSendsImpl(const DistributorPlan& plan,
118  const ExpView& exports,
119  const SubViewLimits& exportSubViewLimits,
120  const ImpView& imports,
121  const SubViewLimits& importSubViewLimits);
122 
123 #ifdef HAVE_TPETRA_MPI
124  template <class ExpView, class ImpView>
125  void doPostsAllToAllImpl(const DistributorPlan& plan,
126  const ExpView& exports,
127  const SubViewLimits& exportSubViewLimits,
128  const ImpView& imports,
129  const SubViewLimits& importSubViewLimits);
130 
131 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
132  template <class ExpView, class ImpView>
133  void doPostsNbrAllToAllVImpl(const DistributorPlan& plan,
134  const ExpView& exports,
135  const SubViewLimits& exportSubViewLimits,
136  const ImpView& imports,
137  const SubViewLimits& importSubViewLimits);
138 #endif // HAVE_TPETRACORE_MPI_ADVANCE
139 
140  template <typename ExpView, typename ImpView>
141  void doPostsIalltofewvImpl(const DistributorPlan& plan,
142  const ExpView& exports,
143  const SubViewLimits& exportSubViewLimits,
144  const ImpView& imports,
145  const SubViewLimits& importSubViewLimits);
146 
147  // ialltofewv members
148  struct {
149  Details::Ialltofewv impl;
150  std::optional<Details::Ialltofewv::Req> req;
151  Teuchos::RCP<std::vector<int>> sendcounts;
152  Teuchos::RCP<std::vector<int>> sdispls;
153  Teuchos::RCP<std::vector<int>> recvcounts;
154  Teuchos::RCP<std::vector<int>> rdispls;
155  std::vector<int> roots;
156  } ialltofewv_;
157 
158 #endif // HAVE_TPETRA_MPI
159 
160  int mpiTag_;
161 
162  Teuchos::Array<Teuchos::RCP<Teuchos::CommRequest<int>>> requestsRecv_;
163  Teuchos::Array<Teuchos::RCP<Teuchos::CommRequest<int>>> requestsSend_;
164 };
165 
166 template <class ExpView, class ImpView>
167 void DistributorActor::doPosts(const DistributorPlan& plan,
168  const ExpView& exports,
169  size_t numPackets,
170  const ImpView& imports) {
171  doPostRecvs(plan, numPackets, imports);
172  doPostSends(plan, exports, numPackets, imports);
173 }
174 
175 template <class ExpView, class ImpView>
176 void DistributorActor::doPostsAndWaits(const DistributorPlan& plan,
177  const ExpView& exports,
178  size_t numPackets,
179  const ImpView& imports) {
180  static_assert(areKokkosViews<ExpView, ImpView>,
181  "Data arrays for DistributorActor::doPostsAndWaits must be Kokkos::Views");
182  doPosts(plan, exports, numPackets, imports);
183  doWaits(plan);
184 }
185 
186 template <class ExpView, class ImpView>
187 void DistributorActor::doPosts(const DistributorPlan& plan,
188  const ExpView& exports,
189  const Teuchos::ArrayView<const size_t>& numExportPacketsPerLID,
190  const ImpView& imports,
191  const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID) {
192  doPostRecvs(plan, imports, numImportPacketsPerLID);
193  doPostSends(plan, exports, numExportPacketsPerLID, imports, numImportPacketsPerLID);
194 }
195 
196 template <class ExpView, class ImpView>
197 void DistributorActor::doPostsAndWaits(const DistributorPlan& plan,
198  const ExpView& exports,
199  const Teuchos::ArrayView<const size_t>& numExportPacketsPerLID,
200  const ImpView& imports,
201  const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID) {
202  static_assert(areKokkosViews<ExpView, ImpView>,
203  "Data arrays for DistributorActor::doPostsAndWaits must be Kokkos::Views");
204  doPosts(plan, exports, numExportPacketsPerLID, imports, numImportPacketsPerLID);
205  doWaits(plan);
206 }
207 
208 template <typename ViewType>
209 using HostAccessibility = Kokkos::SpaceAccessibility<Kokkos::DefaultHostExecutionSpace, typename ViewType::memory_space>;
210 
211 template <typename DstViewType, typename SrcViewType>
212 using enableIfHostAccessible = std::enable_if_t<HostAccessibility<DstViewType>::accessible &&
213  HostAccessibility<SrcViewType>::accessible>;
214 
215 template <typename DstViewType, typename SrcViewType>
216 using enableIfNotHostAccessible = std::enable_if_t<!HostAccessibility<DstViewType>::accessible ||
217  !HostAccessibility<SrcViewType>::accessible>;
218 
219 template <typename DstViewType, typename SrcViewType>
220 enableIfHostAccessible<DstViewType, SrcViewType>
221 packOffset(const DstViewType& dst,
222  const SrcViewType& src,
223  const size_t dst_offset,
224  const size_t src_offset,
225  const size_t size) {
226  memcpy((void*)(dst.data() + dst_offset), src.data() + src_offset, size * sizeof(typename DstViewType::value_type));
227 }
228 
229 template <typename DstViewType, typename SrcViewType>
230 enableIfNotHostAccessible<DstViewType, SrcViewType>
231 packOffset(const DstViewType& dst,
232  const SrcViewType& src,
233  const size_t dst_offset,
234  const size_t src_offset,
235  const size_t size) {
236  Kokkos::Compat::deep_copy_offset(dst, src, dst_offset, src_offset, size);
237 }
238 
239 #ifdef HAVE_TPETRA_MPI
240 
241 template <class ExpView, class ImpView>
242 void DistributorActor::doPostsIalltofewvImpl(const DistributorPlan& plan,
243  const ExpView& exports,
244  const SubViewLimits& exportSubViewLimits,
245  const ImpView& imports,
246  const SubViewLimits& importSubViewLimits) {
247  using size_type = Teuchos::Array<size_t>::size_type;
248  using ExportValue = typename ExpView::non_const_value_type;
249 
250  ProfilingRegion pr("Tpetra::Distributor::doPostsIalltofewvImpl");
251 
252  TEUCHOS_TEST_FOR_EXCEPTION(
253  !plan.getIndicesTo().is_null(), std::runtime_error,
254  "Send Type=\"Ialltofewv\" only works for fast-path communication.");
255 
256  TEUCHOS_TEST_FOR_EXCEPTION(
257  bool(ialltofewv_.req), std::runtime_error,
258  "This actor has an active Ialltofewv already");
259 
260  TEUCHOS_TEST_FOR_EXCEPTION(
261  bool(ialltofewv_.sendcounts), std::runtime_error,
262  "This actor has an active Ialltofewv already");
263 
264  TEUCHOS_TEST_FOR_EXCEPTION(
265  bool(ialltofewv_.sdispls), std::runtime_error,
266  "This actor has an active Ialltofewv already");
267 
268  TEUCHOS_TEST_FOR_EXCEPTION(
269  bool(ialltofewv_.recvcounts), std::runtime_error,
270  "This actor has an active Ialltofewv already");
271 
272  TEUCHOS_TEST_FOR_EXCEPTION(
273  bool(ialltofewv_.rdispls), std::runtime_error,
274  "This actor has an active Ialltofewv already");
275 
276  auto comm = plan.getComm();
277 
278  const auto& [importStarts, importLengths] = importSubViewLimits;
279  const auto& [exportStarts, exportLengths] = exportSubViewLimits;
280 
281  ialltofewv_.roots = plan.getRoots();
282  const int nroots = ialltofewv_.roots.size();
283  const int* roots = ialltofewv_.roots.data();
284  ialltofewv_.req = std::make_optional<Details::Ialltofewv::Req>();
285  ialltofewv_.sendcounts = Teuchos::rcp(new std::vector<int>(nroots));
286  ialltofewv_.sdispls = Teuchos::rcp(new std::vector<int>(nroots));
287  ialltofewv_.recvcounts = Teuchos::rcp(new std::vector<int>);
288  ialltofewv_.rdispls = Teuchos::rcp(new std::vector<int>);
289 
290  for (int rootIdx = 0; rootIdx < nroots; ++rootIdx) {
291  const int root = roots[rootIdx];
292 
293  // if we can't find the root proc index in our plan, it just means we send 0
294  // also make sure root only appears once in getProcsTo()
295  size_type rootProcIndex = plan.getProcsTo().size(); // sentinel value -> not found
296  for (size_type pi = 0; pi < plan.getProcsTo().size(); ++pi) {
297  if (plan.getProcsTo()[pi] == root) {
298  rootProcIndex = pi;
299  break;
300  }
301  }
302 
303  // am I sending to root?
304  int sendcount = 0;
305  if (rootProcIndex != plan.getProcsTo().size()) {
306  sendcount = exportLengths[rootProcIndex];
307  }
308  (*ialltofewv_.sendcounts)[rootIdx] = sendcount;
309 
310  int sdispl = 0;
311  if (0 != sendcount) {
312  sdispl = exportStarts[rootProcIndex];
313  }
314  (*ialltofewv_.sdispls)[rootIdx] = sdispl;
315 
316  // If I happen to be this root, set up my receive metadata
317  if (comm->getRank() == root) {
318  // don't recv anything from anywhere by default
319  ialltofewv_.recvcounts->resize(comm->getSize());
320  std::fill(ialltofewv_.recvcounts->begin(), ialltofewv_.recvcounts->end(), 0);
321  ialltofewv_.rdispls->resize(comm->getSize());
322  std::fill(ialltofewv_.rdispls->begin(), ialltofewv_.rdispls->end(), 0);
323 
324  const size_type actualNumReceives =
325  Teuchos::as<size_type>(plan.getNumReceives()) +
326  Teuchos::as<size_type>(plan.hasSelfMessage() ? 1 : 0);
327 
328  for (size_type i = 0; i < actualNumReceives; ++i) {
329  const int src = plan.getProcsFrom()[i];
330  (*ialltofewv_.rdispls)[src] = importStarts[i];
331  (*ialltofewv_.recvcounts)[src] = Teuchos::as<int>(importLengths[i]);
332  }
333  }
334 
335  } // rootIdx
336 
337  // TODO: do we need to pass ExportValue{} here?
338  MPI_Datatype rawType = ::Tpetra::Details::MpiTypeTraits<ExportValue>::getType(ExportValue{});
339  // FIXME: is there a better way to do this?
340  Teuchos::RCP<const Teuchos::MpiComm<int>> tMpiComm =
341  Teuchos::rcp_dynamic_cast<const Teuchos::MpiComm<int>>(comm);
342  Teuchos::RCP<const Teuchos::OpaqueWrapper<MPI_Comm>> oMpiComm =
343  tMpiComm->getRawMpiComm();
344  MPI_Comm mpiComm = (*oMpiComm)();
345 
346  // don't care about send-side accessibility because it's not accessed through kokkos
347  // rely on MPI to do the right thing
348  constexpr bool recvDevAccess = Kokkos::SpaceAccessibility<
349  Kokkos::DefaultExecutionSpace, typename ImpView::memory_space>::accessible;
350  constexpr bool sendDevAccess = Kokkos::SpaceAccessibility<
351  Kokkos::DefaultExecutionSpace, typename ExpView::memory_space>::accessible;
352  static_assert(recvDevAccess == sendDevAccess, "sending across host/device");
353 
354  const int err = ialltofewv_.impl.post<recvDevAccess>(exports.data(), ialltofewv_.sendcounts->data(), ialltofewv_.sdispls->data(), rawType,
355  imports.data(), ialltofewv_.recvcounts->data(), ialltofewv_.rdispls->data(),
356  roots, nroots,
357  rawType,
358  mpiTag_, mpiComm, &(*ialltofewv_.req));
359 
360  TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error,
361  "ialltofewv failed with error \""
362  << Teuchos::mpiErrorCodeToString(err)
363  << "\".");
364 }
365 
366 template <class ExpView, class ImpView>
367 void DistributorActor::doPostsAllToAllImpl(const DistributorPlan& plan,
368  const ExpView& exports,
369  const SubViewLimits& exportSubViewLimits,
370  const ImpView& imports,
371  const SubViewLimits& importSubViewLimits) {
372  TEUCHOS_TEST_FOR_EXCEPTION(
373  !plan.getIndicesTo().is_null(), std::runtime_error,
374  "Send Type=\"Alltoall\" only works for fast-path communication.");
375 
376  using size_type = Teuchos::Array<size_t>::size_type;
377 
378  auto comm = plan.getComm();
379  std::vector<int> sendcounts(comm->getSize(), 0);
380  std::vector<int> sdispls(comm->getSize(), 0);
381  std::vector<int> recvcounts(comm->getSize(), 0);
382  std::vector<int> rdispls(comm->getSize(), 0);
383 
384  auto& [importStarts, importLengths] = importSubViewLimits;
385  auto& [exportStarts, exportLengths] = exportSubViewLimits;
386 
387  for (size_t pp = 0; pp < plan.getNumSends(); ++pp) {
388  sdispls[plan.getProcsTo()[pp]] = exportStarts[pp];
389  size_t numPackets = exportLengths[pp];
390  // numPackets is converted down to int, so make sure it can be represented
391  TEUCHOS_TEST_FOR_EXCEPTION(numPackets > size_t(INT_MAX), std::logic_error,
392  "Tpetra::Distributor::doPostsAllToAll: "
393  "Send count for send "
394  << pp << " (" << numPackets
395  << ") is too large "
396  "to be represented as int.");
397  sendcounts[plan.getProcsTo()[pp]] = static_cast<int>(numPackets);
398  }
399 
400  const size_type actualNumReceives =
401  Teuchos::as<size_type>(plan.getNumReceives()) +
402  Teuchos::as<size_type>(plan.hasSelfMessage() ? 1 : 0);
403 
404  for (size_type i = 0; i < actualNumReceives; ++i) {
405  rdispls[plan.getProcsFrom()[i]] = importStarts[i];
406  size_t totalPacketsFrom_i = importLengths[i];
407  // totalPacketsFrom_i is converted down to int, so make sure it can be
408  // represented
409  TEUCHOS_TEST_FOR_EXCEPTION(totalPacketsFrom_i > size_t(INT_MAX),
410  std::logic_error,
411  "Tpetra::Distributor::doPostsAllToAll: "
412  "Recv count for receive "
413  << i << " (" << totalPacketsFrom_i
414  << ") is too large "
415  "to be represented as int.");
416  recvcounts[plan.getProcsFrom()[i]] = static_cast<int>(totalPacketsFrom_i);
417  }
418 
419  Teuchos::RCP<const Teuchos::MpiComm<int>> mpiComm =
420  Teuchos::rcp_dynamic_cast<const Teuchos::MpiComm<int>>(comm);
421  Teuchos::RCP<const Teuchos::OpaqueWrapper<MPI_Comm>> rawComm =
422  mpiComm->getRawMpiComm();
423  using T = typename ExpView::non_const_value_type;
424  MPI_Datatype rawType = ::Tpetra::Details::MpiTypeTraits<T>::getType(T());
425 
426 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
427  if (Details::DISTRIBUTOR_MPIADVANCE_ALLTOALL == plan.getSendType()) {
428  MPIX_Comm* mpixComm = *plan.getMPIXComm();
429  TEUCHOS_TEST_FOR_EXCEPTION(!mpixComm, std::runtime_error,
430  "MPIX_Comm is null in doPostsAllToAll \""
431  << __FILE__ << ":" << __LINE__);
432 
433  const int err = MPIX_Alltoallv(
434  exports.data(), sendcounts.data(), sdispls.data(), rawType,
435  imports.data(), recvcounts.data(), rdispls.data(), rawType, mpixComm);
436 
437  TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error,
438  "MPIX_Alltoallv failed with error \""
439  << Teuchos::mpiErrorCodeToString(err)
440  << "\".");
441 
442  return;
443  }
444 #endif // HAVE_TPETRACORE_MPI_ADVANCE
445 
446  const int err = MPI_Alltoallv(
447  exports.data(), sendcounts.data(), sdispls.data(), rawType,
448  imports.data(), recvcounts.data(), rdispls.data(), rawType, (*rawComm)());
449 
450  TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error,
451  "MPI_Alltoallv failed with error \""
452  << Teuchos::mpiErrorCodeToString(err)
453  << "\".");
454 }
455 
456 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
457 template <class ExpView, class ImpView>
458 void DistributorActor::doPostsNbrAllToAllVImpl(const DistributorPlan& plan,
459  const ExpView& exports,
460  const SubViewLimits& exportSubViewLimits,
461  const ImpView& imports,
462  const SubViewLimits& importSubViewLimits) {
463  TEUCHOS_TEST_FOR_EXCEPTION(
464  !plan.getIndicesTo().is_null(), std::runtime_error,
465  "Send Type=\"Alltoall\" only works for fast-path communication.");
466 
467  const int myRank = plan.getComm()->getRank();
468  MPIX_Comm* mpixComm = *plan.getMPIXComm();
469  using size_type = Teuchos::Array<size_t>::size_type;
470 
471  const size_t numSends = plan.getNumSends() + plan.hasSelfMessage();
472  const size_t numRecvs = plan.getNumReceives() + plan.hasSelfMessage();
473  std::vector<int> sendcounts(numSends, 0);
474  std::vector<int> sdispls(numSends, 0);
475  std::vector<int> recvcounts(numRecvs, 0);
476  std::vector<int> rdispls(numRecvs, 0);
477 
478  auto& [importStarts, importLengths] = importSubViewLimits;
479  auto& [exportStarts, exportLengths] = exportSubViewLimits;
480 
481  for (size_t pp = 0; pp < numSends; ++pp) {
482  sdispls[pp] = exportStarts[pp];
483  size_t numPackets = exportLengths[pp];
484  // numPackets is converted down to int, so make sure it can be represented
485  TEUCHOS_TEST_FOR_EXCEPTION(numPackets > size_t(INT_MAX), std::logic_error,
486  "Tpetra::Distributor::doPostsNbrAllToAllV: "
487  "Send count for send "
488  << pp << " (" << numPackets
489  << ") is too large "
490  "to be represented as int.");
491  sendcounts[pp] = static_cast<int>(numPackets);
492  }
493 
494  for (size_type i = 0; i < numRecvs; ++i) {
495  rdispls[i] = importStarts[i];
496  size_t totalPacketsFrom_i = importLengths[i];
497  // totalPacketsFrom_i is converted down to int, so make sure it can be
498  // represented
499  TEUCHOS_TEST_FOR_EXCEPTION(totalPacketsFrom_i > size_t(INT_MAX),
500  std::logic_error,
501  "Tpetra::Distributor::doPostsNbrAllToAllV: "
502  "Recv count for receive "
503  << i << " (" << totalPacketsFrom_i
504  << ") is too large "
505  "to be represented as int.");
506  recvcounts[i] = static_cast<int>(totalPacketsFrom_i);
507  }
508 
509  using T = typename ExpView::non_const_value_type;
510  MPI_Datatype rawType = ::Tpetra::Details::MpiTypeTraits<T>::getType(T());
511 
512  MPIX_Info* xinfo;
513  MPIX_Topo* xtopo;
514  MPIX_Info_init(&xinfo);
515  MPIX_Topo_init(numRecvs, plan.getProcsFrom().data(), recvcounts.data(),
516  numSends, plan.getProcsTo().data(), sendcounts.data(), xinfo, &xtopo);
517  const int err = MPIX_Neighbor_alltoallv_topo(
518  exports.data(), sendcounts.data(), sdispls.data(), rawType,
519  imports.data(), recvcounts.data(), rdispls.data(), rawType, xtopo, mpixComm);
520  MPIX_Topo_free(&xtopo);
521  MPIX_Info_free(&xinfo);
522 
523  TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error,
524  "MPIX_Neighbor_alltoallv failed with error \""
525  << Teuchos::mpiErrorCodeToString(err)
526  << "\".");
527 }
528 #endif // HAVE_TPETRACORE_MPI_ADVANCE
529 #endif // HAVE_TPETRA_MPI
530 
531 template <class ImpView>
532 void DistributorActor::doPostRecvs(const DistributorPlan& plan,
533  size_t numPackets,
534  const ImpView& imports) {
535  auto importSubViewLimits = plan.getImportViewLimits(numPackets);
536  doPostRecvsImpl(plan, imports, importSubViewLimits);
537 }
538 
539 template <class ImpView>
540 void DistributorActor::doPostRecvs(const DistributorPlan& plan,
541  const ImpView& imports,
542  const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID) {
543  auto importSubViewLimits = plan.getImportViewLimits(numImportPacketsPerLID);
544  doPostRecvsImpl(plan, imports, importSubViewLimits);
545 }
546 
547 template <class ImpView>
548 void DistributorActor::doPostRecvsImpl(const DistributorPlan& plan,
549  const ImpView& imports,
550  const SubViewLimits& importSubViewLimits) {
551  static_assert(isKokkosView<ImpView>,
552  "Data arrays for DistributorActor::doPostRecvs must be Kokkos::Views");
553  using Kokkos::Compat::subview_offset;
554  using Teuchos::Array;
555  using Teuchos::as;
556  using Teuchos::ireceive;
557  using size_type = Array<size_t>::size_type;
558  using imports_view_type = ImpView;
559 
560 #ifdef KOKKOS_ENABLE_CUDA
561  static_assert(!std::is_same<typename ImpView::memory_space, Kokkos::CudaUVMSpace>::value,
562  "Please do not use Tpetra::Distributor with UVM "
563  "allocations. See GitHub issue #1088.");
564 #endif // KOKKOS_ENABLE_CUDA
565 
566 #ifdef KOKKOS_ENABLE_SYCL
567  static_assert(!std::is_same<typename ImpView::memory_space, Kokkos::Experimental::SYCLSharedUSMSpace>::value,
568  "Please do not use Tpetra::Distributor with SharedUSM "
569  "allocations. See GitHub issue #1088 (corresponding to CUDA).");
570 #endif // KOKKOS_ENABLE_SYCL
571 
572 #if defined(HAVE_TPETRA_MPI)
573  // All-to-all communication layout is quite different from
574  // point-to-point, so we handle it separately.
575 
576  // These send options require no matching receives, so we just return.
577  const Details::EDistributorSendType sendType = plan.getSendType();
578  if ((sendType == Details::DISTRIBUTOR_ALLTOALL) || (sendType == Details::DISTRIBUTOR_IALLTOFEWV)
579 #ifdef HAVE_TPETRACORE_MPI_ADVANCE
580  || (sendType == Details::DISTRIBUTOR_MPIADVANCE_ALLTOALL) || (sendType == Details::DISTRIBUTOR_MPIADVANCE_NBRALLTOALLV)
581 #endif
582  ) {
583  return;
584  }
585 #endif // HAVE_TPETRA_MPI
586 
587  ProfilingRegion pr("Tpetra::Distributor::doPostRecvs");
588 
589  const int myProcID = plan.getComm()->getRank();
590 
591  auto& [importStarts, importLengths] = importSubViewLimits;
592 
593  // Distributor uses requestsRecv_.size() and requestsSend_.size()
594  // as the number of outstanding nonblocking message requests, so
595  // we resize to zero to maintain this invariant.
596  //
597  // getNumReceives() does _not_ include the self message, if there is
598  // one. Here, we do actually send a message to ourselves, so we
599  // include any self message in the "actual" number of receives to
600  // post.
601  //
602  // NOTE (mfh 19 Mar 2012): Epetra_MpiDistributor::DoPosts()
603  // doesn't (re)allocate its array of requests. That happens in
604  // CreateFromSends(), ComputeRecvs_(), DoReversePosts() (on
605  // demand), or Resize_().
606  const size_type actualNumReceives = as<size_type>(plan.getNumReceives()) +
607  as<size_type>(plan.hasSelfMessage() ? 1 : 0);
608 
609 #ifdef HAVE_TPETRA_DEBUG
610  size_t totalNumImportPackets = 0;
611  for (size_t i = 0; i < Teuchos::as<size_t>(actualNumReceives); ++i) {
612  totalNumImportPackets += importLengths[i];
613  }
614  TEUCHOS_TEST_FOR_EXCEPTION(
615  imports.extent(0) < totalNumImportPackets, std::runtime_error,
616  "Tpetra::Distributor::doPostRecvs: The 'imports' array must have "
617  "enough entries to hold the expected number of import packets. "
618  "imports.extent(0) = "
619  << imports.extent(0) << " < "
620  "totalNumImportPackets = "
621  << totalNumImportPackets << ".");
622  TEUCHOS_TEST_FOR_EXCEPTION(!requestsRecv_.empty(), std::logic_error,
623  "Tpetra::Distributor::"
624  "doPostRecvs: Process "
625  << myProcID << ": requestsRecv_.size () = "
626  << requestsRecv_.size() << " != 0.");
627 #endif // HAVE_TPETRA_DEBUG
628 
629  requestsRecv_.resize(0);
630 
631  // Post the nonblocking receives. It's common MPI wisdom to post
632  // receives before sends. In MPI terms, this means favoring
633  // adding to the "posted queue" (of receive requests) over adding
634  // to the "unexpected queue" (of arrived messages not yet matched
635  // with a receive).
636  {
637  ProfilingRegion prr("Tpetra::Distributor::doPostRecvs MPI_Irecv");
638 
639  for (size_type i = 0; i < actualNumReceives; ++i) {
640  size_t totalPacketsFrom_i = importLengths[Teuchos::as<size_t>(i)];
641  TEUCHOS_TEST_FOR_EXCEPTION(totalPacketsFrom_i > size_t(INT_MAX),
642  std::logic_error,
643  "Tpetra::Distributor::doPostRecvs: "
644  "Recv count for receive "
645  << i << " (" << totalPacketsFrom_i << ") is too large "
646  "to be represented as int.");
647  if (plan.getProcsFrom()[i] != myProcID && totalPacketsFrom_i) {
648  // If my process is receiving these packet(s) from another
649  // process (not a self-receive), and if there is at least
650  // one packet to receive:
651  //
652  // 1. Set up the persisting view (recvBuf) into the imports
653  // array, given the offset and size (total number of
654  // packets from process getProcsFrom()[i]).
655  // 2. Start the Irecv and save the resulting request.
656  imports_view_type recvBuf =
657  subview_offset(imports, importStarts[i], totalPacketsFrom_i);
658  requestsRecv_.push_back(ireceive<int>(recvBuf, plan.getProcsFrom()[i],
659  mpiTag_, *plan.getComm()));
660  }
661  }
662  }
663 }
664 
665 template <class ExpView, class ImpView>
666 void DistributorActor::doPostSends(const DistributorPlan& plan,
667  const ExpView& exports,
668  size_t numPackets,
669  const ImpView& imports) {
670  auto exportSubViewLimits = plan.getExportViewLimits(numPackets);
671  auto importSubViewLimits = plan.getImportViewLimits(numPackets);
672  doPostSendsImpl(plan, exports, exportSubViewLimits, imports, importSubViewLimits);
673 }
674 
675 template <class ExpView, class ImpView>
676 void DistributorActor::doPostSends(const DistributorPlan& plan,
677  const ExpView& exports,
678  const Teuchos::ArrayView<const size_t>& numExportPacketsPerLID,
679  const ImpView& imports,
680  const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID) {
681  auto exportSubViewLimits = plan.getExportViewLimits(numExportPacketsPerLID);
682  auto importSubViewLimits = plan.getImportViewLimits(numImportPacketsPerLID);
683  doPostSendsImpl(plan, exports, exportSubViewLimits, imports, importSubViewLimits);
684 }
685 
686 template <class ExpView, class ImpView>
687 void DistributorActor::doPostSendsImpl(const DistributorPlan& plan,
688  const ExpView& exports,
689  const SubViewLimits& exportSubViewLimits,
690  const ImpView& imports,
691  const SubViewLimits& importSubViewLimits) {
692  static_assert(areKokkosViews<ExpView, ImpView>,
693  "Data arrays for DistributorActor::doPostSends must be Kokkos::Views");
694  using Kokkos::Compat::deep_copy_offset;
695  using Kokkos::Compat::subview_offset;
696  using Teuchos::Array;
697  using Teuchos::as;
698  using Teuchos::isend;
699  using Teuchos::send;
700  using size_type = Array<size_t>::size_type;
701  using exports_view_type = ExpView;
702 
703 #ifdef KOKKOS_ENABLE_CUDA
704  static_assert(!std::is_same<typename ExpView::memory_space, Kokkos::CudaUVMSpace>::value &&
705  !std::is_same<typename ImpView::memory_space, Kokkos::CudaUVMSpace>::value,
706  "Please do not use Tpetra::Distributor with UVM allocations. "
707  "See Trilinos GitHub issue #1088.");
708 #endif // KOKKOS_ENABLE_CUDA
709 
710 #ifdef KOKKOS_ENABLE_SYCL
711  static_assert(!std::is_same<typename ExpView::memory_space, Kokkos::Experimental::SYCLSharedUSMSpace>::value &&
712  !std::is_same<typename ImpView::memory_space, Kokkos::Experimental::SYCLSharedUSMSpace>::value,
713  "Please do not use Tpetra::Distributor with SharedUSM allocations. "
714  "See Trilinos GitHub issue #1088 (corresponding to CUDA).");
715 #endif // KOKKOS_ENABLE_SYCL
716 
717  ProfilingRegion ps("Tpetra::Distributor::doPostSends");
718 
719  const int myRank = plan.getComm()->getRank();
720  // Run-time configurable parameters that come from the input
721  // ParameterList set by setParameterList().
722  const Details::EDistributorSendType sendType = plan.getSendType();
723 
724  auto& [exportStarts, exportLengths] = exportSubViewLimits;
725  auto& [importStarts, importLengths] = importSubViewLimits;
726 
727 #if defined(HAVE_TPETRA_MPI)
728  // All-to-all communication layout is quite different from
729  // point-to-point, so we handle it separately.
730 
731  if (sendType == Details::DISTRIBUTOR_ALLTOALL) {
732  doPostsAllToAllImpl(plan, exports, exportSubViewLimits, imports, importSubViewLimits);
733  return;
734  } else if (sendType == Details::DISTRIBUTOR_IALLTOFEWV) {
735  doPostsIalltofewvImpl(plan, exports, exportSubViewLimits, imports, importSubViewLimits);
736  return;
737  }
738 #ifdef HAVE_TPETRACORE_MPI_ADVANCE
739  else if (sendType == Details::DISTRIBUTOR_MPIADVANCE_ALLTOALL) {
740  doPostsAllToAllImpl(plan, exports, exportSubViewLimits, imports, importSubViewLimits);
741  return;
742  } else if (sendType == Details::DISTRIBUTOR_MPIADVANCE_NBRALLTOALLV) {
743  doPostsNbrAllToAllVImpl(plan, exports, exportSubViewLimits, imports, importSubViewLimits);
744  return;
745  }
746 #endif // defined(HAVE_TPETRACORE_MPI_ADVANCE)
747 
748 #else // HAVE_TPETRA_MPI
749  if (plan.hasSelfMessage()) {
750  // This is how we "send a message to ourself": we copy from
751  // the export buffer to the import buffer. That saves
752  // Teuchos::Comm implementations other than MpiComm (in
753  // particular, SerialComm) the trouble of implementing self
754  // messages correctly. (To do this right, SerialComm would
755  // need internal buffer space for messages, keyed on the
756  // message's tag.)
757  size_t selfReceiveOffset = 0;
758  deep_copy_offset(imports, exports, selfReceiveOffset,
759  exportStarts[0],
760  exportLengths[0]);
761  }
762  // should we just return here?
763  // likely not as comm could be a serial comm
764 #endif // HAVE_TPETRA_MPI
765 
766  size_t selfReceiveOffset = 0;
767 
768 #ifdef HAVE_TPETRA_DEBUG
769  TEUCHOS_TEST_FOR_EXCEPTION(requestsSend_.size() != 0,
770  std::logic_error,
771  "Tpetra::Distributor::doPostSends: Process "
772  << myRank << ": requestsSend_.size() = " << requestsSend_.size() << " != 0.");
773 #endif // HAVE_TPETRA_DEBUG
774 
775  // Distributor uses requestsRecv_.size() and requestsSend_.size()
776  // as the number of outstanding nonblocking message requests, so
777  // we resize to zero to maintain this invariant.
778  //
779  // getNumReceives() does _not_ include the self message, if there is
780  // one. Here, we do actually send a message to ourselves, so we
781  // include any self message in the "actual" number of receives to
782  // post.
783  //
784  // NOTE (mfh 19 Mar 2012): Epetra_MpiDistributor::DoPosts()
785  // doesn't (re)allocate its array of requests. That happens in
786  // CreateFromSends(), ComputeRecvs_(), DoReversePosts() (on
787  // demand), or Resize_().
788  const size_type actualNumReceives = as<size_type>(plan.getNumReceives()) +
789  as<size_type>(plan.hasSelfMessage() ? 1 : 0);
790  requestsSend_.resize(0);
791 
792  {
793  for (size_type i = 0; i < actualNumReceives; ++i) {
794  if (plan.getProcsFrom()[i] == myRank) { // Receiving from myself
795  selfReceiveOffset = importStarts[i]; // Remember the self-recv offset
796  }
797  }
798  }
799 
800  ProfilingRegion pss("Tpetra::Distributor::doPostSends sends");
801 
802  // setup scan through getProcsTo() list starting with higher numbered procs
803  // (should help balance message traffic)
804  //
805  // FIXME (mfh 20 Feb 2013) Why haven't we precomputed this?
806  // It doesn't depend on the input at all.
807  size_t numBlocks = plan.getNumSends() + plan.hasSelfMessage();
808  size_t procIndex = 0;
809  while ((procIndex < numBlocks) && (plan.getProcsTo()[procIndex] < myRank)) {
810  ++procIndex;
811  }
812  if (procIndex == numBlocks) {
813  procIndex = 0;
814  }
815 
816  size_t selfNum = 0;
817  size_t selfIndex = 0;
818 
819  if (plan.getIndicesTo().is_null()) {
820  const char isend_region[] = "Tpetra::Distributor::doPostSends MPI_Isend FAST";
821  const char send_region[] = "Tpetra::Distributor::doPostSends MPI_Send FAST";
822  ProfilingRegion pssf((sendType == Details::DISTRIBUTOR_ISEND) ? isend_region : send_region);
823 
824  // Data are already blocked (laid out) by process, so we don't
825  // need a separate send buffer (besides the exports array).
826  for (size_t i = 0; i < numBlocks; ++i) {
827  size_t p = i + procIndex;
828  if (p > (numBlocks - 1)) {
829  p -= numBlocks;
830  }
831 
832  if (plan.getProcsTo()[p] != myRank) {
833  if (exportLengths[p] == 0) {
834  // Do not attempt to send messages of length 0.
835  continue;
836  }
837 
838  exports_view_type tmpSend = subview_offset(exports, exportStarts[p], exportLengths[p]);
839 
840  if (sendType == Details::DISTRIBUTOR_ISEND) {
841  // NOTE: This looks very similar to the tmpSend above, but removing
842  // tmpSendBuf and uses tmpSend leads to a performance hit on Arm
843  // SerialNode builds
844  exports_view_type tmpSendBuf =
845  subview_offset(exports, exportStarts[p], exportLengths[p]);
846  requestsSend_.push_back(isend<int>(tmpSendBuf, plan.getProcsTo()[p],
847  mpiTag_, *plan.getComm()));
848  } else { // DISTRIBUTOR_SEND
849  send<int>(tmpSend,
850  as<int>(tmpSend.size()),
851  plan.getProcsTo()[p], mpiTag_, *plan.getComm());
852  }
853  } else { // "Sending" the message to myself
854  selfNum = p;
855  }
856  }
857 
858  if (plan.hasSelfMessage()) {
859  // This is how we "send a message to ourself": we copy from
860  // the export buffer to the import buffer. That saves
861  // Teuchos::Comm implementations other than MpiComm (in
862  // particular, SerialComm) the trouble of implementing self
863  // messages correctly. (To do this right, SerialComm would
864  // need internal buffer space for messages, keyed on the
865  // message's tag.)
866  deep_copy_offset(imports, exports, selfReceiveOffset,
867  exportStarts[selfNum], exportLengths[selfNum]);
868  }
869 
870  } else { // data are not blocked by proc, use send buffer
871  ProfilingRegion psss("Tpetra::Distributor::doPostSends: MPI_Send SLOW");
872 
873  using Packet = typename ExpView::non_const_value_type;
874  using Layout = typename ExpView::array_layout;
875  using Device = typename ExpView::device_type;
876  using Mem = typename ExpView::memory_traits;
877 
878  // This buffer is long enough for only one message at a time.
879  // Thus, we use DISTRIBUTOR_SEND always in this case, regardless
880  // of sendType requested by user.
881  // This code path formerly errored out with message:
882  // Tpetra::Distributor::doPosts(3 args):
883  // The "send buffer" code path
884  // doesn't currently work with nonblocking sends.
885  // Now, we opt to just do the communication in a way that works.
886 #ifdef HAVE_TPETRA_DEBUG
887  if (sendType != Details::DISTRIBUTOR_SEND) {
888  if (plan.getComm()->getRank() == 0)
889  std::cout << "The requested Tpetra send type "
891  << " requires Distributor data to be ordered by"
892  << " the receiving processor rank. Since these"
893  << " data are not ordered, Tpetra will use Send"
894  << " instead." << std::endl;
895  }
896 #endif
897 
898  size_t maxSendLength = 0;
899  for (size_t i = 0; i < numBlocks; ++i) {
900  size_t p = i + procIndex;
901  if (p > (numBlocks - 1)) {
902  p -= numBlocks;
903  }
904 
905  size_t sendArrayOffset = 0;
906  size_t j = plan.getStartsTo()[p];
907  for (size_t k = 0; k < plan.getLengthsTo()[p]; ++k, ++j) {
908  sendArrayOffset += exportLengths[j];
909  }
910  maxSendLength = std::max(maxSendLength, sendArrayOffset);
911  }
912  Kokkos::View<Packet*, Layout, Device, Mem> sendArray("sendArray", maxSendLength);
913 
914  for (size_t i = 0; i < numBlocks; ++i) {
915  size_t p = i + procIndex;
916  if (p > (numBlocks - 1)) {
917  p -= numBlocks;
918  }
919 
920  if (plan.getProcsTo()[p] != myRank) {
921  size_t sendArrayOffset = 0;
922  size_t j = plan.getStartsTo()[p];
923  for (size_t k = 0; k < plan.getLengthsTo()[p]; ++k, ++j) {
924  packOffset(sendArray, exports, sendArrayOffset, exportStarts[j], exportLengths[j]);
925  sendArrayOffset += exportLengths[j];
926  }
927  typename ExpView::execution_space().fence();
928 
929  ImpView tmpSend =
930  subview_offset(sendArray, size_t(0), sendArrayOffset);
931 
932  send<int>(tmpSend,
933  as<int>(tmpSend.size()),
934  plan.getProcsTo()[p], mpiTag_, *plan.getComm());
935  } else { // "Sending" the message to myself
936  selfNum = p;
937  selfIndex = plan.getStartsTo()[p];
938  }
939  }
940 
941  if (plan.hasSelfMessage()) {
942  for (size_t k = 0; k < plan.getLengthsTo()[selfNum]; ++k) {
943  packOffset(imports, exports, selfReceiveOffset, exportStarts[selfIndex], exportLengths[selfIndex]);
944  selfReceiveOffset += exportLengths[selfIndex];
945  ++selfIndex;
946  }
947  }
948  }
949 }
950 
951 } // namespace Tpetra::Details
952 
953 #endif
Add specializations of Teuchos::Details::MpiTypeTraits for Kokkos::complex&lt;float&gt; and Kokkos::complex...
Declaration of Tpetra::Details::Profiling, a scope guard for Kokkos Profiling.
std::string DistributorSendTypeEnumToString(EDistributorSendType sendType)
Convert an EDistributorSendType enum value to a string.
Stand-alone utility functions and macros.
EDistributorSendType
The type of MPI send that Distributor should use.