Tpetra parallel linear algebra  Version of the Day
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
Tpetra_Details_DistributorActor.hpp
1 // @HEADER
2 // *****************************************************************************
3 // Tpetra: Templated Linear Algebra Services Package
4 //
5 // Copyright 2008 NTESS and the Tpetra contributors.
6 // SPDX-License-Identifier: BSD-3-Clause
7 // *****************************************************************************
8 // @HEADER
9 
10 #ifndef TPETRA_DETAILS_DISTRIBUTOR_ACTOR_HPP
11 #define TPETRA_DETAILS_DISTRIBUTOR_ACTOR_HPP
12 
13 #include <optional>
14 #include <vector>
15 
16 #include "Teuchos_Assert.hpp"
18 #include "Tpetra_Util.hpp"
19 
20 #include "Teuchos_Array.hpp"
21 #include "Teuchos_Comm.hpp"
24 #include "Teuchos_RCP.hpp"
25 
26 #include "Kokkos_TeuchosCommAdapters.hpp"
27 
28 #ifdef HAVE_TPETRA_MPI
29 #include "mpi.h"
30 #include "Tpetra_Details_Ialltofewv.hpp"
31 #endif
32 
33 namespace Tpetra::Details {
34 
35 template <class View>
36 constexpr bool isKokkosView = Kokkos::is_view<View>::value;
37 
38 template <class View1, class View2>
39 constexpr bool areKokkosViews = Kokkos::is_view<View1>::value && Kokkos::is_view<View2>::value;
40 
41 class DistributorActor {
42  static constexpr int DEFAULT_MPI_TAG = 1;
43 
44  using IndexView = DistributorPlan::IndexView;
45  using SubViewLimits = DistributorPlan::SubViewLimits;
46 
47 public:
48  DistributorActor();
49  DistributorActor(const DistributorActor& otherActor) = default;
50 
51  template <class ExpView, class ImpView>
52  void doPostsAndWaits(const DistributorPlan& plan,
53  const ExpView &exports,
54  size_t numPackets,
55  const ImpView &imports);
56 
57  template <class ExpView, class ImpView>
58  void doPostsAndWaits(const DistributorPlan& plan,
59  const ExpView &exports,
60  const Teuchos::ArrayView<const size_t>& numExportPacketsPerLID,
61  const ImpView &imports,
62  const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID);
63 
64  template <class ImpView>
65  void doPostRecvs(const DistributorPlan& plan,
66  size_t numPackets,
67  const ImpView& imports);
68 
69  template <class ImpView>
70  void doPostRecvs(const DistributorPlan& plan,
71  const ImpView &imports,
72  const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID);
73 
74  template <class ExpView, class ImpView>
75  void doPostSends(const DistributorPlan& plan,
76  const ExpView& exports,
77  size_t numPackets,
78  const ImpView& imports);
79 
80  template <class ExpView, class ImpView>
81  void doPostSends(const DistributorPlan& plan,
82  const ExpView &exports,
83  const Teuchos::ArrayView<const size_t>& numExportPacketsPerLID,
84  const ImpView &imports,
85  const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID);
86 
87  template <class ExpView, class ImpView>
88  void doPosts(const DistributorPlan& plan,
89  const ExpView& exports,
90  size_t numPackets,
91  const ImpView& imports);
92 
93  template <class ExpView, class ImpView>
94  void doPosts(const DistributorPlan& plan,
95  const ExpView &exports,
96  const Teuchos::ArrayView<const size_t>& numExportPacketsPerLID,
97  const ImpView &imports,
98  const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID);
99 
100  void doWaits(const DistributorPlan& plan);
101 
102  void doWaitsRecv(const DistributorPlan& plan);
103 
104  void doWaitsSend(const DistributorPlan& plan);
105 
106  void doWaitsIalltofewv(const DistributorPlan &plan);
107 
108  bool isReady() const;
109 
110 private:
111 
112  template <class ImpView>
113  void doPostRecvsImpl(const DistributorPlan& plan,
114  const ImpView& imports,
115  const SubViewLimits& totalPacketsFrom);
116 
117  template <class ExpView, class ImpView>
118  void doPostSendsImpl(const DistributorPlan& plan,
119  const ExpView& exports,
120  const SubViewLimits& exportSubViewLimits,
121  const ImpView& imports,
122  const SubViewLimits& importSubViewLimits);
123 
124 #ifdef HAVE_TPETRA_MPI
125  template <class ExpView, class ImpView>
126  void doPostsAllToAllImpl(const DistributorPlan &plan,
127  const ExpView &exports,
128  const SubViewLimits& exportSubViewLimits,
129  const ImpView &imports,
130  const SubViewLimits& importSubViewLimits);
131 
132 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
133  template <class ExpView, class ImpView>
134  void doPostsNbrAllToAllVImpl(const DistributorPlan &plan,
135  const ExpView &exports,
136  const SubViewLimits& exportSubViewLimits,
137  const ImpView &imports,
138  const SubViewLimits& importSubViewLimits);
139 #endif // HAVE_TPETRACORE_MPI_ADVANCE
140 
141  template <typename ExpView, typename ImpView>
142  void doPostsIalltofewvImpl(const DistributorPlan &plan,
143  const ExpView &exports,
144  const SubViewLimits& exportSubViewLimits,
145  const ImpView &imports,
146  const SubViewLimits& importSubViewLimits);
147 
148  // ialltofewv members
149  struct {
150  Details::Ialltofewv impl;
151  std::optional<Details::Ialltofewv::Req> req;
152  Teuchos::RCP<std::vector<int>> sendcounts;
153  Teuchos::RCP<std::vector<int>> sdispls;
154  Teuchos::RCP<std::vector<int>> recvcounts;
155  Teuchos::RCP<std::vector<int>> rdispls;
156  std::vector<int> roots;
157  } ialltofewv_;
158 
159 #endif // HAVE_TPETRA_MPI
160 
161  int mpiTag_;
162 
163  Teuchos::Array<Teuchos::RCP<Teuchos::CommRequest<int>>> requestsRecv_;
164  Teuchos::Array<Teuchos::RCP<Teuchos::CommRequest<int>>> requestsSend_;
165 };
166 
167 template <class ExpView, class ImpView>
168 void DistributorActor::doPosts(const DistributorPlan& plan,
169  const ExpView& exports,
170  size_t numPackets,
171  const ImpView& imports)
172 {
173  doPostRecvs(plan, numPackets, imports);
174  doPostSends(plan, exports, numPackets, imports);
175 }
176 
177 template <class ExpView, class ImpView>
178 void DistributorActor::doPostsAndWaits(const DistributorPlan& plan,
179  const ExpView& exports,
180  size_t numPackets,
181  const ImpView& imports)
182 {
183  static_assert(areKokkosViews<ExpView, ImpView>,
184  "Data arrays for DistributorActor::doPostsAndWaits must be Kokkos::Views");
185  doPosts(plan, exports, numPackets, imports);
186  doWaits(plan);
187 }
188 
189 template <class ExpView, class ImpView>
190 void DistributorActor::doPosts(const DistributorPlan& plan,
191  const ExpView &exports,
192  const Teuchos::ArrayView<const size_t>& numExportPacketsPerLID,
193  const ImpView &imports,
194  const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID)
195 {
196  doPostRecvs(plan, imports, numImportPacketsPerLID);
197  doPostSends(plan, exports, numExportPacketsPerLID, imports, numImportPacketsPerLID);
198 }
199 
200 template <class ExpView, class ImpView>
201 void DistributorActor::doPostsAndWaits(const DistributorPlan& plan,
202  const ExpView& exports,
203  const Teuchos::ArrayView<const size_t>& numExportPacketsPerLID,
204  const ImpView& imports,
205  const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID)
206 {
207  static_assert(areKokkosViews<ExpView, ImpView>,
208  "Data arrays for DistributorActor::doPostsAndWaits must be Kokkos::Views");
209  doPosts(plan, exports, numExportPacketsPerLID, imports, numImportPacketsPerLID);
210  doWaits(plan);
211 }
212 
213 template <typename ViewType>
214 using HostAccessibility = Kokkos::SpaceAccessibility<Kokkos::DefaultHostExecutionSpace, typename ViewType::memory_space>;
215 
216 template <typename DstViewType, typename SrcViewType>
217 using enableIfHostAccessible = std::enable_if_t<HostAccessibility<DstViewType>::accessible &&
218  HostAccessibility<SrcViewType>::accessible>;
219 
220 template <typename DstViewType, typename SrcViewType>
221 using enableIfNotHostAccessible = std::enable_if_t<!HostAccessibility<DstViewType>::accessible ||
222  !HostAccessibility<SrcViewType>::accessible>;
223 
224 template <typename DstViewType, typename SrcViewType>
225 enableIfHostAccessible<DstViewType, SrcViewType>
226 packOffset(const DstViewType& dst,
227  const SrcViewType& src,
228  const size_t dst_offset,
229  const size_t src_offset,
230  const size_t size)
231 {
232  memcpy((void*) (dst.data()+dst_offset), src.data()+src_offset, size*sizeof(typename DstViewType::value_type));
233 }
234 
235 template <typename DstViewType, typename SrcViewType>
236 enableIfNotHostAccessible<DstViewType, SrcViewType>
237 packOffset(const DstViewType& dst,
238  const SrcViewType& src,
239  const size_t dst_offset,
240  const size_t src_offset,
241  const size_t size)
242 {
243  Kokkos::Compat::deep_copy_offset(dst, src, dst_offset, src_offset, size);
244 }
245 
246 #ifdef HAVE_TPETRA_MPI
247 
248 template <class ExpView, class ImpView>
249 void DistributorActor::doPostsIalltofewvImpl(const DistributorPlan &plan,
250  const ExpView &exports,
251  const SubViewLimits& exportSubViewLimits,
252  const ImpView &imports,
253  const SubViewLimits& importSubViewLimits) {
254  using size_type = Teuchos::Array<size_t>::size_type;
255  using ExportValue = typename ExpView::non_const_value_type;
256 
257  ProfilingRegion pr("Tpetra::Distributor: doPostsIalltofewvImpl");
258 
259  TEUCHOS_TEST_FOR_EXCEPTION(
260  !plan.getIndicesTo().is_null(), std::runtime_error,
261  "Send Type=\"Ialltofewv\" only works for fast-path communication.");
262 
263  TEUCHOS_TEST_FOR_EXCEPTION(
264  bool(ialltofewv_.req), std::runtime_error,
265  "This actor has an active Ialltofewv already");
266 
267  TEUCHOS_TEST_FOR_EXCEPTION(
268  bool(ialltofewv_.sendcounts), std::runtime_error,
269  "This actor has an active Ialltofewv already");
270 
271  TEUCHOS_TEST_FOR_EXCEPTION(
272  bool(ialltofewv_.sdispls), std::runtime_error,
273  "This actor has an active Ialltofewv already");
274 
275  TEUCHOS_TEST_FOR_EXCEPTION(
276  bool(ialltofewv_.recvcounts), std::runtime_error,
277  "This actor has an active Ialltofewv already");
278 
279  TEUCHOS_TEST_FOR_EXCEPTION(
280  bool(ialltofewv_.rdispls), std::runtime_error,
281  "This actor has an active Ialltofewv already");
282 
283  auto comm = plan.getComm();
284 
285  const auto& [importStarts, importLengths] = importSubViewLimits;
286  const auto& [exportStarts, exportLengths] = exportSubViewLimits;
287 
288 
289  ialltofewv_.roots = plan.getRoots();
290  const int nroots = ialltofewv_.roots.size();
291  const int *roots = ialltofewv_.roots.data();
292  ialltofewv_.req = std::make_optional<Details::Ialltofewv::Req>();
293  ialltofewv_.sendcounts = Teuchos::rcp(new std::vector<int>(nroots));
294  ialltofewv_.sdispls = Teuchos::rcp(new std::vector<int>(nroots));
295  ialltofewv_.recvcounts = Teuchos::rcp(new std::vector<int>);
296  ialltofewv_.rdispls = Teuchos::rcp(new std::vector<int>);
297 
298  for (int rootIdx = 0; rootIdx < nroots; ++rootIdx) {
299  const int root = roots[rootIdx];
300 
301  //if we can't find the root proc index in our plan, it just means we send 0
302  // also make sure root only appears once in getProcsTo()
303  size_type rootProcIndex = plan.getProcsTo().size(); // sentinel value -> not found
304  for (size_type pi = 0; pi < plan.getProcsTo().size(); ++pi) {
305  if (plan.getProcsTo()[pi] == root) {
306  rootProcIndex = pi;
307  break;
308  }
309  }
310 
311  // am I sending to root?
312  int sendcount = 0;
313  if (rootProcIndex != plan.getProcsTo().size()) {
314  sendcount = exportLengths[rootProcIndex];
315  }
316  (*ialltofewv_.sendcounts)[rootIdx] = sendcount;
317 
318  int sdispl = 0;
319  if (0 != sendcount) {
320  sdispl = exportStarts[rootProcIndex];
321  }
322  (*ialltofewv_.sdispls)[rootIdx] = sdispl;
323 
324  // If I happen to be this root, set up my receive metadata
325  if (comm->getRank() == root) {
326 
327  // don't recv anything from anywhere by default
328  ialltofewv_.recvcounts->resize(comm->getSize());
329  std::fill(ialltofewv_.recvcounts->begin(), ialltofewv_.recvcounts->end(), 0);
330  ialltofewv_.rdispls->resize(comm->getSize());
331  std::fill(ialltofewv_.rdispls->begin(), ialltofewv_.rdispls->end(), 0);
332 
333  const size_type actualNumReceives =
334  Teuchos::as<size_type>(plan.getNumReceives()) +
335  Teuchos::as<size_type>(plan.hasSelfMessage() ? 1 : 0);
336 
337  for (size_type i = 0; i < actualNumReceives; ++i) {
338  const int src = plan.getProcsFrom()[i];
339  (*ialltofewv_.rdispls)[src] = importStarts[i];
340  (*ialltofewv_.recvcounts)[src] = Teuchos::as<int>(importLengths[i]);
341  }
342  }
343 
344  } // rootIdx
345 
346  // TODO: do we need to pass ExportValue{} here?
347  MPI_Datatype rawType = ::Tpetra::Details::MpiTypeTraits<ExportValue>::getType(ExportValue{});
348  // FIXME: is there a better way to do this?
349  Teuchos::RCP<const Teuchos::MpiComm<int>> tMpiComm =
350  Teuchos::rcp_dynamic_cast<const Teuchos::MpiComm<int>>(comm);
351  Teuchos::RCP<const Teuchos::OpaqueWrapper<MPI_Comm>> oMpiComm =
352  tMpiComm->getRawMpiComm();
353  MPI_Comm mpiComm = (*oMpiComm)();
354 
355  // don't care about send-side accessibility because it's not accessed through kokkos
356  // rely on MPI to do the right thing
357  constexpr bool recvDevAccess = Kokkos::SpaceAccessibility<
358  Kokkos::DefaultExecutionSpace, typename ImpView::memory_space>::accessible;
359  constexpr bool sendDevAccess = Kokkos::SpaceAccessibility<
360  Kokkos::DefaultExecutionSpace, typename ExpView::memory_space>::accessible;
361  static_assert(recvDevAccess == sendDevAccess, "sending across host/device");
362 
363  const int err = ialltofewv_.impl.post<recvDevAccess>(exports.data(), ialltofewv_.sendcounts->data(), ialltofewv_.sdispls->data(), rawType,
364  imports.data(), ialltofewv_.recvcounts->data(), ialltofewv_.rdispls->data(),
365  roots, nroots,
366  rawType,
367  mpiTag_, mpiComm, &(*ialltofewv_.req));
368 
369  TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error,
370  "ialltofewv failed with error \""
371  << Teuchos::mpiErrorCodeToString(err)
372  << "\".");
373 }
374 
375 
376 
377 
378 
379 template <class ExpView, class ImpView>
380 void DistributorActor::doPostsAllToAllImpl(const DistributorPlan &plan,
381  const ExpView &exports,
382  const SubViewLimits& exportSubViewLimits,
383  const ImpView &imports,
384  const SubViewLimits& importSubViewLimits) {
385  TEUCHOS_TEST_FOR_EXCEPTION(
386  !plan.getIndicesTo().is_null(), std::runtime_error,
387  "Send Type=\"Alltoall\" only works for fast-path communication.");
388 
389  using size_type = Teuchos::Array<size_t>::size_type;
390 
391  auto comm = plan.getComm();
392  std::vector<int> sendcounts(comm->getSize(), 0);
393  std::vector<int> sdispls(comm->getSize(), 0);
394  std::vector<int> recvcounts(comm->getSize(), 0);
395  std::vector<int> rdispls(comm->getSize(), 0);
396 
397  auto& [importStarts, importLengths] = importSubViewLimits;
398  auto& [exportStarts, exportLengths] = exportSubViewLimits;
399 
400  for (size_t pp = 0; pp < plan.getNumSends(); ++pp) {
401  sdispls[plan.getProcsTo()[pp]] = exportStarts[pp];
402  size_t numPackets = exportLengths[pp];
403  // numPackets is converted down to int, so make sure it can be represented
404  TEUCHOS_TEST_FOR_EXCEPTION(numPackets > size_t(INT_MAX), std::logic_error,
405  "Tpetra::Distributor::doPostsAllToAll: "
406  "Send count for send "
407  << pp << " (" << numPackets
408  << ") is too large "
409  "to be represented as int.");
410  sendcounts[plan.getProcsTo()[pp]] = static_cast<int>(numPackets);
411  }
412 
413  const size_type actualNumReceives =
414  Teuchos::as<size_type>(plan.getNumReceives()) +
415  Teuchos::as<size_type>(plan.hasSelfMessage() ? 1 : 0);
416 
417  for (size_type i = 0; i < actualNumReceives; ++i) {
418  rdispls[plan.getProcsFrom()[i]] = importStarts[i];
419  size_t totalPacketsFrom_i = importLengths[i];
420  // totalPacketsFrom_i is converted down to int, so make sure it can be
421  // represented
422  TEUCHOS_TEST_FOR_EXCEPTION(totalPacketsFrom_i > size_t(INT_MAX),
423  std::logic_error,
424  "Tpetra::Distributor::doPostsAllToAll: "
425  "Recv count for receive "
426  << i << " (" << totalPacketsFrom_i
427  << ") is too large "
428  "to be represented as int.");
429  recvcounts[plan.getProcsFrom()[i]] = static_cast<int>(totalPacketsFrom_i);
430  }
431 
432  Teuchos::RCP<const Teuchos::MpiComm<int>> mpiComm =
433  Teuchos::rcp_dynamic_cast<const Teuchos::MpiComm<int>>(comm);
434  Teuchos::RCP<const Teuchos::OpaqueWrapper<MPI_Comm>> rawComm =
435  mpiComm->getRawMpiComm();
436  using T = typename ExpView::non_const_value_type;
437  MPI_Datatype rawType = ::Tpetra::Details::MpiTypeTraits<T>::getType(T());
438 
439 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
440  if (Details::DISTRIBUTOR_MPIADVANCE_ALLTOALL == plan.getSendType()) {
441  MPIX_Comm *mpixComm = *plan.getMPIXComm();
442  TEUCHOS_TEST_FOR_EXCEPTION(!mpixComm, std::runtime_error,
443  "MPIX_Comm is null in doPostsAllToAll \""
444  << __FILE__ << ":" << __LINE__);
445 
446  const int err = MPIX_Alltoallv(
447  exports.data(), sendcounts.data(), sdispls.data(), rawType,
448  imports.data(), recvcounts.data(), rdispls.data(), rawType, mpixComm);
449 
450  TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error,
451  "MPIX_Alltoallv failed with error \""
452  << Teuchos::mpiErrorCodeToString(err)
453  << "\".");
454 
455  return;
456  }
457 #endif // HAVE_TPETRACORE_MPI_ADVANCE
458 
459  const int err = MPI_Alltoallv(
460  exports.data(), sendcounts.data(), sdispls.data(), rawType,
461  imports.data(), recvcounts.data(), rdispls.data(), rawType, (*rawComm)());
462 
463  TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error,
464  "MPI_Alltoallv failed with error \""
465  << Teuchos::mpiErrorCodeToString(err)
466  << "\".");
467 }
468 
469 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
470 template <class ExpView, class ImpView>
471 void DistributorActor::doPostsNbrAllToAllVImpl(const DistributorPlan &plan,
472  const ExpView &exports,
473  const SubViewLimits& exportSubViewLimits,
474  const ImpView &imports,
475  const SubViewLimits& importSubViewLimits) {
476  TEUCHOS_TEST_FOR_EXCEPTION(
477  !plan.getIndicesTo().is_null(), std::runtime_error,
478  "Send Type=\"Alltoall\" only works for fast-path communication.");
479 
480  const int myRank = plan.getComm()->getRank();
481  MPIX_Comm *mpixComm = *plan.getMPIXComm();
482  using size_type = Teuchos::Array<size_t>::size_type;
483 
484  const size_t numSends = plan.getNumSends() + plan.hasSelfMessage();
485  const size_t numRecvs = plan.getNumReceives() + plan.hasSelfMessage();
486  std::vector<int> sendcounts(numSends, 0);
487  std::vector<int> sdispls(numSends, 0);
488  std::vector<int> recvcounts(numRecvs, 0);
489  std::vector<int> rdispls(numRecvs, 0);
490 
491  auto& [importStarts, importLengths] = importSubViewLimits;
492  auto& [exportStarts, exportLengths] = exportSubViewLimits;
493 
494  for (size_t pp = 0; pp < numSends; ++pp) {
495  sdispls[pp] = exportStarts[pp];
496  size_t numPackets = exportLengths[pp];
497  // numPackets is converted down to int, so make sure it can be represented
498  TEUCHOS_TEST_FOR_EXCEPTION(numPackets > size_t(INT_MAX), std::logic_error,
499  "Tpetra::Distributor::doPostsNbrAllToAllV: "
500  "Send count for send "
501  << pp << " (" << numPackets
502  << ") is too large "
503  "to be represented as int.");
504  sendcounts[pp] = static_cast<int>(numPackets);
505  }
506 
507  for (size_type i = 0; i < numRecvs; ++i) {
508  rdispls[i] = importStarts[i];
509  size_t totalPacketsFrom_i = importLengths[i];
510  // totalPacketsFrom_i is converted down to int, so make sure it can be
511  // represented
512  TEUCHOS_TEST_FOR_EXCEPTION(totalPacketsFrom_i > size_t(INT_MAX),
513  std::logic_error,
514  "Tpetra::Distributor::doPostsNbrAllToAllV: "
515  "Recv count for receive "
516  << i << " (" << totalPacketsFrom_i
517  << ") is too large "
518  "to be represented as int.");
519  recvcounts[i] = static_cast<int>(totalPacketsFrom_i);
520  }
521 
522  using T = typename ExpView::non_const_value_type;
523  MPI_Datatype rawType = ::Tpetra::Details::MpiTypeTraits<T>::getType(T());
524 
525  MPIX_Info* xinfo;
526  MPIX_Topo* xtopo;
527  MPIX_Info_init(&xinfo);
528  MPIX_Topo_init(numRecvs, plan.getProcsFrom().data(), recvcounts.data(),
529  numSends, plan.getProcsTo().data(), sendcounts.data(), xinfo, &xtopo);
530  const int err = MPIX_Neighbor_alltoallv_topo(
531  exports.data(), sendcounts.data(), sdispls.data(), rawType,
532  imports.data(), recvcounts.data(), rdispls.data(), rawType, xtopo, mpixComm);
533  MPIX_Topo_free(&xtopo);
534  MPIX_Info_free(&xinfo);
535 
536  TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error,
537  "MPIX_Neighbor_alltoallv failed with error \""
538  << Teuchos::mpiErrorCodeToString(err)
539  << "\".");
540 }
541 #endif // HAVE_TPETRACORE_MPI_ADVANCE
542 #endif // HAVE_TPETRA_MPI
543 
544 template <class ImpView>
545 void DistributorActor::doPostRecvs(const DistributorPlan& plan,
546  size_t numPackets,
547  const ImpView& imports)
548 {
549  auto importSubViewLimits = plan.getImportViewLimits(numPackets);
550  doPostRecvsImpl(plan, imports, importSubViewLimits);
551 }
552 
553 template <class ImpView>
554 void DistributorActor::doPostRecvs(const DistributorPlan& plan,
555  const ImpView &imports,
556  const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID)
557 {
558  auto importSubViewLimits = plan.getImportViewLimits(numImportPacketsPerLID);
559  doPostRecvsImpl(plan, imports, importSubViewLimits);
560 }
561 
562 template <class ImpView>
563 void DistributorActor::doPostRecvsImpl(const DistributorPlan& plan,
564  const ImpView &imports,
565  const SubViewLimits& importSubViewLimits)
566 {
567  static_assert(isKokkosView<ImpView>,
568  "Data arrays for DistributorActor::doPostRecvs must be Kokkos::Views");
569  using Teuchos::Array;
570  using Teuchos::as;
571  using Teuchos::ireceive;
572  using Kokkos::Compat::subview_offset;
573  using size_type = Array<size_t>::size_type;
574  using imports_view_type = ImpView;
575 
576 #ifdef KOKKOS_ENABLE_CUDA
577  static_assert (! std::is_same<typename ImpView::memory_space, Kokkos::CudaUVMSpace>::value,
578  "Please do not use Tpetra::Distributor with UVM "
579  "allocations. See GitHub issue #1088.");
580 #endif // KOKKOS_ENABLE_CUDA
581 
582 #ifdef KOKKOS_ENABLE_SYCL
583  static_assert (! std::is_same<typename ImpView::memory_space, Kokkos::Experimental::SYCLSharedUSMSpace>::value,
584  "Please do not use Tpetra::Distributor with SharedUSM "
585  "allocations. See GitHub issue #1088 (corresponding to CUDA).");
586 #endif // KOKKOS_ENABLE_SYCL
587 
588 
589 #if defined(HAVE_TPETRA_MPI)
590  // All-to-all communication layout is quite different from
591  // point-to-point, so we handle it separately.
592 
593  // These send options require no matching receives, so we just return.
594  const Details::EDistributorSendType sendType = plan.getSendType();
595  if ((sendType == Details::DISTRIBUTOR_ALLTOALL)
596  || (sendType == Details::DISTRIBUTOR_IALLTOFEWV)
597 #ifdef HAVE_TPETRACORE_MPI_ADVANCE
598  || (sendType == Details::DISTRIBUTOR_MPIADVANCE_ALLTOALL)
599  || (sendType == Details::DISTRIBUTOR_MPIADVANCE_NBRALLTOALLV)
600 #endif
601  ) {
602  return;
603  }
604 #endif // HAVE_TPETRA_MPI
605 
606  ProfilingRegion pr("Tpetra::Distributor::doPostRecvs");
607 
608  const int myProcID = plan.getComm()->getRank ();
609 
610  auto& [importStarts, importLengths] = importSubViewLimits;
611 
612  // Distributor uses requestsRecv_.size() and requestsSend_.size()
613  // as the number of outstanding nonblocking message requests, so
614  // we resize to zero to maintain this invariant.
615  //
616  // getNumReceives() does _not_ include the self message, if there is
617  // one. Here, we do actually send a message to ourselves, so we
618  // include any self message in the "actual" number of receives to
619  // post.
620  //
621  // NOTE (mfh 19 Mar 2012): Epetra_MpiDistributor::DoPosts()
622  // doesn't (re)allocate its array of requests. That happens in
623  // CreateFromSends(), ComputeRecvs_(), DoReversePosts() (on
624  // demand), or Resize_().
625  const size_type actualNumReceives = as<size_type> (plan.getNumReceives()) +
626  as<size_type> (plan.hasSelfMessage() ? 1 : 0);
627 
628 #ifdef HAVE_TPETRA_DEBUG
629  size_t totalNumImportPackets = 0;
630  for (size_t i = 0; i < Teuchos::as<size_t>(actualNumReceives); ++i) {
631  totalNumImportPackets += importLengths[i];
632  }
633  TEUCHOS_TEST_FOR_EXCEPTION(
634  imports.extent (0) < totalNumImportPackets, std::runtime_error,
635  "Tpetra::Distributor::doPostRecvs: The 'imports' array must have "
636  "enough entries to hold the expected number of import packets. "
637  "imports.extent(0) = " << imports.extent (0) << " < "
638  "totalNumImportPackets = " << totalNumImportPackets << ".");
639  TEUCHOS_TEST_FOR_EXCEPTION
640  (!requestsRecv_.empty(), std::logic_error, "Tpetra::Distributor::"
641  "doPostRecvs: Process " << myProcID << ": requestsRecv_.size () = "
642  << requestsRecv_.size () << " != 0.");
643 #endif // HAVE_TPETRA_DEBUG
644 
645  requestsRecv_.resize (0);
646 
647  // Post the nonblocking receives. It's common MPI wisdom to post
648  // receives before sends. In MPI terms, this means favoring
649  // adding to the "posted queue" (of receive requests) over adding
650  // to the "unexpected queue" (of arrived messages not yet matched
651  // with a receive).
652  {
653  ProfilingRegion prr("Tpetra::Distributor::doPostRecvs MPI_Irecv");
654 
655  for (size_type i = 0; i < actualNumReceives; ++i) {
656  size_t totalPacketsFrom_i = importLengths[Teuchos::as<size_t>(i)];
657  TEUCHOS_TEST_FOR_EXCEPTION(totalPacketsFrom_i > size_t(INT_MAX),
658  std::logic_error, "Tpetra::Distributor::doPostRecvs: "
659  "Recv count for receive " << i << " (" << totalPacketsFrom_i << ") is too large "
660  "to be represented as int.");
661  if (plan.getProcsFrom()[i] != myProcID && totalPacketsFrom_i) {
662  // If my process is receiving these packet(s) from another
663  // process (not a self-receive), and if there is at least
664  // one packet to receive:
665  //
666  // 1. Set up the persisting view (recvBuf) into the imports
667  // array, given the offset and size (total number of
668  // packets from process getProcsFrom()[i]).
669  // 2. Start the Irecv and save the resulting request.
670  imports_view_type recvBuf =
671  subview_offset (imports, importStarts[i], totalPacketsFrom_i);
672  requestsRecv_.push_back (ireceive<int> (recvBuf, plan.getProcsFrom()[i],
673  mpiTag_, *plan.getComm()));
674  }
675  }
676  }
677 }
678 
679 template <class ExpView, class ImpView>
680 void DistributorActor::doPostSends(const DistributorPlan& plan,
681  const ExpView& exports,
682  size_t numPackets,
683  const ImpView& imports)
684 {
685  auto exportSubViewLimits = plan.getExportViewLimits(numPackets);
686  auto importSubViewLimits = plan.getImportViewLimits(numPackets);
687  doPostSendsImpl(plan, exports, exportSubViewLimits, imports, importSubViewLimits);
688 }
689 
690 template <class ExpView, class ImpView>
691 void DistributorActor::doPostSends(const DistributorPlan& plan,
692  const ExpView &exports,
693  const Teuchos::ArrayView<const size_t>& numExportPacketsPerLID,
694  const ImpView &imports,
695  const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID)
696 {
697  auto exportSubViewLimits = plan.getExportViewLimits(numExportPacketsPerLID);
698  auto importSubViewLimits = plan.getImportViewLimits(numImportPacketsPerLID);
699  doPostSendsImpl(plan, exports, exportSubViewLimits, imports, importSubViewLimits);
700 }
701 
702 template <class ExpView, class ImpView>
703 void DistributorActor::doPostSendsImpl(const DistributorPlan& plan,
704  const ExpView& exports,
705  const SubViewLimits& exportSubViewLimits,
706  const ImpView& imports,
707  const SubViewLimits& importSubViewLimits)
708 {
709  static_assert(areKokkosViews<ExpView, ImpView>,
710  "Data arrays for DistributorActor::doPostSends must be Kokkos::Views");
711  using Teuchos::Array;
712  using Teuchos::as;
713  using Teuchos::isend;
714  using Teuchos::send;
715  using Kokkos::Compat::subview_offset;
716  using Kokkos::Compat::deep_copy_offset;
717  using size_type = Array<size_t>::size_type;
718  using exports_view_type = ExpView;
719 
720 #ifdef KOKKOS_ENABLE_CUDA
721  static_assert
722  (! std::is_same<typename ExpView::memory_space, Kokkos::CudaUVMSpace>::value &&
723  ! std::is_same<typename ImpView::memory_space, Kokkos::CudaUVMSpace>::value,
724  "Please do not use Tpetra::Distributor with UVM allocations. "
725  "See Trilinos GitHub issue #1088.");
726 #endif // KOKKOS_ENABLE_CUDA
727 
728 #ifdef KOKKOS_ENABLE_SYCL
729  static_assert
730  (! std::is_same<typename ExpView::memory_space, Kokkos::Experimental::SYCLSharedUSMSpace>::value &&
731  ! std::is_same<typename ImpView::memory_space, Kokkos::Experimental::SYCLSharedUSMSpace>::value,
732  "Please do not use Tpetra::Distributor with SharedUSM allocations. "
733  "See Trilinos GitHub issue #1088 (corresponding to CUDA).");
734 #endif // KOKKOS_ENABLE_SYCL
735 
736  ProfilingRegion ps("Tpetra::Distributor::doPostSends");
737 
738  const int myRank = plan.getComm()->getRank ();
739  // Run-time configurable parameters that come from the input
740  // ParameterList set by setParameterList().
741  const Details::EDistributorSendType sendType = plan.getSendType();
742 
743  auto& [exportStarts, exportLengths] = exportSubViewLimits;
744  auto& [importStarts, importLengths] = importSubViewLimits;
745 
746 #if defined(HAVE_TPETRA_MPI)
747  // All-to-all communication layout is quite different from
748  // point-to-point, so we handle it separately.
749 
750  if (sendType == Details::DISTRIBUTOR_ALLTOALL) {
751  doPostsAllToAllImpl(plan, exports, exportSubViewLimits, imports, importSubViewLimits);
752  return;
753  } else if (sendType == Details::DISTRIBUTOR_IALLTOFEWV) {
754  doPostsIalltofewvImpl(plan, exports, exportSubViewLimits, imports, importSubViewLimits);
755  return;
756  }
757 #ifdef HAVE_TPETRACORE_MPI_ADVANCE
758  else if (sendType == Details::DISTRIBUTOR_MPIADVANCE_ALLTOALL) {
759  doPostsAllToAllImpl(plan, exports, exportSubViewLimits, imports, importSubViewLimits);
760  return;
761  } else if (sendType == Details::DISTRIBUTOR_MPIADVANCE_NBRALLTOALLV) {
762  doPostsNbrAllToAllVImpl(plan, exports, exportSubViewLimits, imports, importSubViewLimits);
763  return;
764  }
765 #endif // defined(HAVE_TPETRACORE_MPI_ADVANCE)
766 
767 
768 #else // HAVE_TPETRA_MPI
769  if (plan.hasSelfMessage()) {
770  // This is how we "send a message to ourself": we copy from
771  // the export buffer to the import buffer. That saves
772  // Teuchos::Comm implementations other than MpiComm (in
773  // particular, SerialComm) the trouble of implementing self
774  // messages correctly. (To do this right, SerialComm would
775  // need internal buffer space for messages, keyed on the
776  // message's tag.)
777  size_t selfReceiveOffset = 0;
778  deep_copy_offset(imports, exports, selfReceiveOffset,
779  exportStarts[0],
780  exportLengths[0]);
781  }
782  // should we just return here?
783  // likely not as comm could be a serial comm
784 #endif // HAVE_TPETRA_MPI
785 
786  size_t selfReceiveOffset = 0;
787 
788 #ifdef HAVE_TPETRA_DEBUG
789  TEUCHOS_TEST_FOR_EXCEPTION
790  (requestsSend_.size () != 0,
791  std::logic_error,
792  "Tpetra::Distributor::doPostSends: Process "
793  << myRank << ": requestsSend_.size() = " << requestsSend_.size () << " != 0.");
794 #endif // HAVE_TPETRA_DEBUG
795 
796  // Distributor uses requestsRecv_.size() and requestsSend_.size()
797  // as the number of outstanding nonblocking message requests, so
798  // we resize to zero to maintain this invariant.
799  //
800  // getNumReceives() does _not_ include the self message, if there is
801  // one. Here, we do actually send a message to ourselves, so we
802  // include any self message in the "actual" number of receives to
803  // post.
804  //
805  // NOTE (mfh 19 Mar 2012): Epetra_MpiDistributor::DoPosts()
806  // doesn't (re)allocate its array of requests. That happens in
807  // CreateFromSends(), ComputeRecvs_(), DoReversePosts() (on
808  // demand), or Resize_().
809  const size_type actualNumReceives = as<size_type> (plan.getNumReceives()) +
810  as<size_type> (plan.hasSelfMessage() ? 1 : 0);
811  requestsSend_.resize (0);
812 
813  {
814  for (size_type i = 0; i < actualNumReceives; ++i) {
815  if (plan.getProcsFrom()[i] == myRank) { // Receiving from myself
816  selfReceiveOffset = importStarts[i]; // Remember the self-recv offset
817  }
818  }
819  }
820 
821  ProfilingRegion pss("Tpetra::Distributor::doPostSends sends");
822 
823  // setup scan through getProcsTo() list starting with higher numbered procs
824  // (should help balance message traffic)
825  //
826  // FIXME (mfh 20 Feb 2013) Why haven't we precomputed this?
827  // It doesn't depend on the input at all.
828  size_t numBlocks = plan.getNumSends() + plan.hasSelfMessage();
829  size_t procIndex = 0;
830  while ((procIndex < numBlocks) && (plan.getProcsTo()[procIndex] < myRank)) {
831  ++procIndex;
832  }
833  if (procIndex == numBlocks) {
834  procIndex = 0;
835  }
836 
837  size_t selfNum = 0;
838  size_t selfIndex = 0;
839 
840  if (plan.getIndicesTo().is_null()) {
841  const char isend_region[] = "Tpetra::Distributor::doPostSends MPI_Isend FAST";
842  const char send_region[] = "Tpetra::Distributor::doPostSends MPI_Send FAST";
843  ProfilingRegion pssf((sendType == Details::DISTRIBUTOR_ISEND) ? isend_region : send_region);
844 
845  // Data are already blocked (laid out) by process, so we don't
846  // need a separate send buffer (besides the exports array).
847  for (size_t i = 0; i < numBlocks; ++i) {
848  size_t p = i + procIndex;
849  if (p > (numBlocks - 1)) {
850  p -= numBlocks;
851  }
852 
853  if (plan.getProcsTo()[p] != myRank) {
854  if (exportLengths[p] == 0) {
855  // Do not attempt to send messages of length 0.
856  continue;
857  }
858 
859  exports_view_type tmpSend = subview_offset(exports, exportStarts[p], exportLengths[p]);
860 
861  if (sendType == Details::DISTRIBUTOR_ISEND) {
862  // NOTE: This looks very similar to the tmpSend above, but removing
863  // tmpSendBuf and uses tmpSend leads to a performance hit on Arm
864  // SerialNode builds
865  exports_view_type tmpSendBuf =
866  subview_offset (exports, exportStarts[p], exportLengths[p]);
867  requestsSend_.push_back (isend<int> (tmpSendBuf, plan.getProcsTo()[p],
868  mpiTag_, *plan.getComm()));
869  }
870  else { // DISTRIBUTOR_SEND
871  send<int> (tmpSend,
872  as<int> (tmpSend.size ()),
873  plan.getProcsTo()[p], mpiTag_, *plan.getComm());
874  }
875  }
876  else { // "Sending" the message to myself
877  selfNum = p;
878  }
879  }
880 
881  if (plan.hasSelfMessage()) {
882  // This is how we "send a message to ourself": we copy from
883  // the export buffer to the import buffer. That saves
884  // Teuchos::Comm implementations other than MpiComm (in
885  // particular, SerialComm) the trouble of implementing self
886  // messages correctly. (To do this right, SerialComm would
887  // need internal buffer space for messages, keyed on the
888  // message's tag.)
889  deep_copy_offset(imports, exports, selfReceiveOffset,
890  exportStarts[selfNum], exportLengths[selfNum]);
891  }
892 
893  }
894  else { // data are not blocked by proc, use send buffer
895  ProfilingRegion psss("Tpetra::Distributor::doPostSends: MPI_Send SLOW");
896 
897  using Packet = typename ExpView::non_const_value_type;
898  using Layout = typename ExpView::array_layout;
899  using Device = typename ExpView::device_type;
900  using Mem = typename ExpView::memory_traits;
901 
902  // This buffer is long enough for only one message at a time.
903  // Thus, we use DISTRIBUTOR_SEND always in this case, regardless
904  // of sendType requested by user.
905  // This code path formerly errored out with message:
906  // Tpetra::Distributor::doPosts(3 args):
907  // The "send buffer" code path
908  // doesn't currently work with nonblocking sends.
909  // Now, we opt to just do the communication in a way that works.
910 #ifdef HAVE_TPETRA_DEBUG
911  if (sendType != Details::DISTRIBUTOR_SEND) {
912  if (plan.getComm()->getRank() == 0)
913  std::cout << "The requested Tpetra send type "
915  << " requires Distributor data to be ordered by"
916  << " the receiving processor rank. Since these"
917  << " data are not ordered, Tpetra will use Send"
918  << " instead." << std::endl;
919  }
920 #endif
921 
922  size_t maxSendLength = 0;
923  for (size_t i = 0; i < numBlocks; ++i) {
924  size_t p = i + procIndex;
925  if (p > (numBlocks - 1)) {
926  p -= numBlocks;
927  }
928 
929  size_t sendArrayOffset = 0;
930  size_t j = plan.getStartsTo()[p];
931  for (size_t k = 0; k < plan.getLengthsTo()[p]; ++k, ++j) {
932  sendArrayOffset += exportLengths[j];
933  }
934  maxSendLength = std::max(maxSendLength, sendArrayOffset);
935  }
936  Kokkos::View<Packet*,Layout,Device,Mem> sendArray ("sendArray", maxSendLength);
937 
938  for (size_t i = 0; i < numBlocks; ++i) {
939  size_t p = i + procIndex;
940  if (p > (numBlocks - 1)) {
941  p -= numBlocks;
942  }
943 
944  if (plan.getProcsTo()[p] != myRank) {
945  size_t sendArrayOffset = 0;
946  size_t j = plan.getStartsTo()[p];
947  for (size_t k = 0; k < plan.getLengthsTo()[p]; ++k, ++j) {
948  packOffset(sendArray, exports, sendArrayOffset, exportStarts[j], exportLengths[j]);
949  sendArrayOffset += exportLengths[j];
950  }
951  typename ExpView::execution_space().fence();
952 
953  ImpView tmpSend =
954  subview_offset(sendArray, size_t(0), sendArrayOffset);
955 
956  send<int> (tmpSend,
957  as<int> (tmpSend.size ()),
958  plan.getProcsTo()[p], mpiTag_, *plan.getComm());
959  }
960  else { // "Sending" the message to myself
961  selfNum = p;
962  selfIndex = plan.getStartsTo()[p];
963  }
964  }
965 
966  if (plan.hasSelfMessage()) {
967  for (size_t k = 0; k < plan.getLengthsTo()[selfNum]; ++k) {
968  packOffset(imports, exports, selfReceiveOffset, exportStarts[selfIndex], exportLengths[selfIndex]);
969  selfReceiveOffset += exportLengths[selfIndex];
970  ++selfIndex;
971  }
972  }
973  }
974 }
975 
976 }
977 
978 #endif
Add specializations of Teuchos::Details::MpiTypeTraits for Kokkos::complex&lt;float&gt; and Kokkos::complex...
Declaration of Tpetra::Details::Profiling, a scope guard for Kokkos Profiling.
std::string DistributorSendTypeEnumToString(EDistributorSendType sendType)
Convert an EDistributorSendType enum value to a string.
Stand-alone utility functions and macros.
EDistributorSendType
The type of MPI send that Distributor should use.