Tpetra parallel linear algebra  Version of the Day
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
Tpetra_Details_DistributorActor.hpp
1 // @HEADER
2 // *****************************************************************************
3 // Tpetra: Templated Linear Algebra Services Package
4 //
5 // Copyright 2008 NTESS and the Tpetra contributors.
6 // SPDX-License-Identifier: BSD-3-Clause
7 // *****************************************************************************
8 // @HEADER
9 
10 #ifndef TPETRA_DETAILS_DISTRIBUTOR_ACTOR_HPP
11 #define TPETRA_DETAILS_DISTRIBUTOR_ACTOR_HPP
12 
13 #include <optional>
14 #include <vector>
15 
16 #include "Teuchos_Assert.hpp"
18 #include "Tpetra_Util.hpp"
19 
20 #include "Teuchos_Array.hpp"
21 #include "Teuchos_Comm.hpp"
24 #include "Teuchos_RCP.hpp"
25 
26 #include "Kokkos_TeuchosCommAdapters.hpp"
27 
28 #ifdef HAVE_TPETRA_MPI
29 #include "mpi.h"
30 #include "Tpetra_Details_Ialltofewv.hpp"
31 #endif
32 
33 namespace Tpetra::Details {
34 
35 template <class View>
36 constexpr bool isKokkosView = Kokkos::is_view<View>::value;
37 
38 template <class View1, class View2>
39 constexpr bool areKokkosViews = Kokkos::is_view<View1>::value&& Kokkos::is_view<View2>::value;
40 
41 class DistributorActor {
42  using IndexView = DistributorPlan::IndexView;
43  using SubViewLimits = DistributorPlan::SubViewLimits;
44 
45  public:
46  static constexpr int DEFAULT_MPI_TAG = 1;
47 
48  DistributorActor();
49  DistributorActor(const DistributorActor& otherActor) = default;
50 
51  template <class ExpView, class ImpView>
52  void doPostsAndWaits(const DistributorPlan& plan,
53  const ExpView& exports,
54  size_t numPackets,
55  const ImpView& imports);
56 
57  template <class ExpView, class ImpView>
58  void doPostsAndWaits(const DistributorPlan& plan,
59  const ExpView& exports,
60  const Teuchos::ArrayView<const size_t>& numExportPacketsPerLID,
61  const ImpView& imports,
62  const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID);
63 
64  template <class ImpView>
65  void doPostRecvs(const DistributorPlan& plan,
66  size_t numPackets,
67  const ImpView& imports);
68 
69  template <class ImpView>
70  void doPostRecvs(const DistributorPlan& plan,
71  const ImpView& imports,
72  const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID);
73 
74  template <class ExpView, class ImpView>
75  void doPostSends(const DistributorPlan& plan,
76  const ExpView& exports,
77  size_t numPackets,
78  const ImpView& imports);
79 
80  template <class ExpView, class ImpView>
81  void doPostSends(const DistributorPlan& plan,
82  const ExpView& exports,
83  const Teuchos::ArrayView<const size_t>& numExportPacketsPerLID,
84  const ImpView& imports,
85  const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID);
86 
87  template <class ExpView, class ImpView>
88  void doPosts(const DistributorPlan& plan,
89  const ExpView& exports,
90  size_t numPackets,
91  const ImpView& imports);
92 
93  template <class ExpView, class ImpView>
94  void doPosts(const DistributorPlan& plan,
95  const ExpView& exports,
96  const Teuchos::ArrayView<const size_t>& numExportPacketsPerLID,
97  const ImpView& imports,
98  const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID);
99 
100  void doWaits(const DistributorPlan& plan);
101 
102  void doWaitsRecv(const DistributorPlan& plan);
103 
104  void doWaitsSend(const DistributorPlan& plan);
105 
106  void doWaitsIalltofewv(const DistributorPlan& plan);
107 
108  bool isReady() const;
109 
110  int getMpiTag() const { return mpiTag_; };
111 
112  private:
113  template <class ImpView>
114  void doPostRecvsImpl(const DistributorPlan& plan,
115  const ImpView& imports,
116  const SubViewLimits& totalPacketsFrom);
117 
118  template <class ExpView, class ImpView>
119  void doPostSendsImpl(const DistributorPlan& plan,
120  const ExpView& exports,
121  const SubViewLimits& exportSubViewLimits,
122  const ImpView& imports,
123  const SubViewLimits& importSubViewLimits);
124 
125 #ifdef HAVE_TPETRA_MPI
126  template <class ExpView, class ImpView>
127  void doPostsAllToAllImpl(const DistributorPlan& plan,
128  const ExpView& exports,
129  const SubViewLimits& exportSubViewLimits,
130  const ImpView& imports,
131  const SubViewLimits& importSubViewLimits);
132 
133 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
134  template <class ExpView, class ImpView>
135  void doPostsNbrAllToAllVImpl(const DistributorPlan& plan,
136  const ExpView& exports,
137  const SubViewLimits& exportSubViewLimits,
138  const ImpView& imports,
139  const SubViewLimits& importSubViewLimits);
140 #endif // HAVE_TPETRACORE_MPI_ADVANCE
141 
142  template <typename ExpView, typename ImpView>
143  void doPostsIalltofewvImpl(const DistributorPlan& plan,
144  const ExpView& exports,
145  const SubViewLimits& exportSubViewLimits,
146  const ImpView& imports,
147  const SubViewLimits& importSubViewLimits);
148 
149  // ialltofewv members
150  struct {
151  Details::Ialltofewv impl;
152  std::optional<Details::Ialltofewv::Req> req;
153  Teuchos::RCP<std::vector<int>> sendcounts;
154  Teuchos::RCP<std::vector<int>> sdispls;
155  Teuchos::RCP<std::vector<int>> recvcounts;
156  Teuchos::RCP<std::vector<int>> rdispls;
157  std::vector<int> roots;
158  } ialltofewv_;
159 
160 #endif // HAVE_TPETRA_MPI
161 
162  int mpiTag_;
163 
164  Teuchos::Array<Teuchos::RCP<Teuchos::CommRequest<int>>> requestsRecv_;
165  Teuchos::Array<Teuchos::RCP<Teuchos::CommRequest<int>>> requestsSend_;
166 };
167 
168 template <class ExpView, class ImpView>
169 void DistributorActor::doPosts(const DistributorPlan& plan,
170  const ExpView& exports,
171  size_t numPackets,
172  const ImpView& imports) {
173  doPostRecvs(plan, numPackets, imports);
174  doPostSends(plan, exports, numPackets, imports);
175 }
176 
177 template <class ExpView, class ImpView>
178 void DistributorActor::doPostsAndWaits(const DistributorPlan& plan,
179  const ExpView& exports,
180  size_t numPackets,
181  const ImpView& imports) {
182  static_assert(areKokkosViews<ExpView, ImpView>,
183  "Data arrays for DistributorActor::doPostsAndWaits must be Kokkos::Views");
184  doPosts(plan, exports, numPackets, imports);
185  doWaits(plan);
186 }
187 
188 template <class ExpView, class ImpView>
189 void DistributorActor::doPosts(const DistributorPlan& plan,
190  const ExpView& exports,
191  const Teuchos::ArrayView<const size_t>& numExportPacketsPerLID,
192  const ImpView& imports,
193  const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID) {
194  doPostRecvs(plan, imports, numImportPacketsPerLID);
195  doPostSends(plan, exports, numExportPacketsPerLID, imports, numImportPacketsPerLID);
196 }
197 
198 template <class ExpView, class ImpView>
199 void DistributorActor::doPostsAndWaits(const DistributorPlan& plan,
200  const ExpView& exports,
201  const Teuchos::ArrayView<const size_t>& numExportPacketsPerLID,
202  const ImpView& imports,
203  const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID) {
204  static_assert(areKokkosViews<ExpView, ImpView>,
205  "Data arrays for DistributorActor::doPostsAndWaits must be Kokkos::Views");
206  doPosts(plan, exports, numExportPacketsPerLID, imports, numImportPacketsPerLID);
207  doWaits(plan);
208 }
209 
210 template <typename ViewType>
211 using HostAccessibility = Kokkos::SpaceAccessibility<Kokkos::DefaultHostExecutionSpace, typename ViewType::memory_space>;
212 
213 template <typename DstViewType, typename SrcViewType>
214 using enableIfHostAccessible = std::enable_if_t<HostAccessibility<DstViewType>::accessible &&
215  HostAccessibility<SrcViewType>::accessible>;
216 
217 template <typename DstViewType, typename SrcViewType>
218 using enableIfNotHostAccessible = std::enable_if_t<!HostAccessibility<DstViewType>::accessible ||
219  !HostAccessibility<SrcViewType>::accessible>;
220 
221 template <typename DstViewType, typename SrcViewType>
222 enableIfHostAccessible<DstViewType, SrcViewType>
223 packOffset(const DstViewType& dst,
224  const SrcViewType& src,
225  const size_t dst_offset,
226  const size_t src_offset,
227  const size_t size) {
228  memcpy((void*)(dst.data() + dst_offset), src.data() + src_offset, size * sizeof(typename DstViewType::value_type));
229 }
230 
231 template <typename DstViewType, typename SrcViewType>
232 enableIfNotHostAccessible<DstViewType, SrcViewType>
233 packOffset(const DstViewType& dst,
234  const SrcViewType& src,
235  const size_t dst_offset,
236  const size_t src_offset,
237  const size_t size) {
238  Kokkos::Compat::deep_copy_offset(dst, src, dst_offset, src_offset, size);
239 }
240 
241 #ifdef HAVE_TPETRA_MPI
242 
243 template <class ExpView, class ImpView>
244 void DistributorActor::doPostsIalltofewvImpl(const DistributorPlan& plan,
245  const ExpView& exports,
246  const SubViewLimits& exportSubViewLimits,
247  const ImpView& imports,
248  const SubViewLimits& importSubViewLimits) {
249  using size_type = Teuchos::Array<size_t>::size_type;
250  using ExportValue = typename ExpView::non_const_value_type;
251 
252  ProfilingRegion pr("Tpetra::Distributor::doPostsIalltofewvImpl");
253 
254  TEUCHOS_TEST_FOR_EXCEPTION(
255  !plan.getIndicesTo().is_null(), std::runtime_error,
256  "Send Type=\"Ialltofewv\" only works for fast-path communication.");
257 
258  TEUCHOS_TEST_FOR_EXCEPTION(
259  bool(ialltofewv_.req), std::runtime_error,
260  "This actor has an active Ialltofewv already");
261 
262  TEUCHOS_TEST_FOR_EXCEPTION(
263  bool(ialltofewv_.sendcounts), std::runtime_error,
264  "This actor has an active Ialltofewv already");
265 
266  TEUCHOS_TEST_FOR_EXCEPTION(
267  bool(ialltofewv_.sdispls), std::runtime_error,
268  "This actor has an active Ialltofewv already");
269 
270  TEUCHOS_TEST_FOR_EXCEPTION(
271  bool(ialltofewv_.recvcounts), std::runtime_error,
272  "This actor has an active Ialltofewv already");
273 
274  TEUCHOS_TEST_FOR_EXCEPTION(
275  bool(ialltofewv_.rdispls), std::runtime_error,
276  "This actor has an active Ialltofewv already");
277 
278  auto comm = plan.getComm();
279 
280  const auto& [importStarts, importLengths] = importSubViewLimits;
281  const auto& [exportStarts, exportLengths] = exportSubViewLimits;
282 
283  ialltofewv_.roots = plan.getRoots();
284  const int nroots = ialltofewv_.roots.size();
285  const int* roots = ialltofewv_.roots.data();
286  ialltofewv_.req = std::make_optional<Details::Ialltofewv::Req>();
287  ialltofewv_.sendcounts = Teuchos::rcp(new std::vector<int>(nroots));
288  ialltofewv_.sdispls = Teuchos::rcp(new std::vector<int>(nroots));
289  ialltofewv_.recvcounts = Teuchos::rcp(new std::vector<int>);
290  ialltofewv_.rdispls = Teuchos::rcp(new std::vector<int>);
291 
292  for (int rootIdx = 0; rootIdx < nroots; ++rootIdx) {
293  const int root = roots[rootIdx];
294 
295  // if we can't find the root proc index in our plan, it just means we send 0
296  // also make sure root only appears once in getProcsTo()
297  size_type rootProcIndex = plan.getProcsTo().size(); // sentinel value -> not found
298  for (size_type pi = 0; pi < plan.getProcsTo().size(); ++pi) {
299  if (plan.getProcsTo()[pi] == root) {
300  rootProcIndex = pi;
301  break;
302  }
303  }
304 
305  // am I sending to root?
306  int sendcount = 0;
307  if (rootProcIndex != plan.getProcsTo().size()) {
308  sendcount = exportLengths[rootProcIndex];
309  }
310  (*ialltofewv_.sendcounts)[rootIdx] = sendcount;
311 
312  int sdispl = 0;
313  if (0 != sendcount) {
314  sdispl = exportStarts[rootProcIndex];
315  }
316  (*ialltofewv_.sdispls)[rootIdx] = sdispl;
317 
318  // If I happen to be this root, set up my receive metadata
319  if (comm->getRank() == root) {
320  // don't recv anything from anywhere by default
321  ialltofewv_.recvcounts->resize(comm->getSize());
322  std::fill(ialltofewv_.recvcounts->begin(), ialltofewv_.recvcounts->end(), 0);
323  ialltofewv_.rdispls->resize(comm->getSize());
324  std::fill(ialltofewv_.rdispls->begin(), ialltofewv_.rdispls->end(), 0);
325 
326  const size_type actualNumReceives =
327  Teuchos::as<size_type>(plan.getNumReceives()) +
328  Teuchos::as<size_type>(plan.hasSelfMessage() ? 1 : 0);
329 
330  for (size_type i = 0; i < actualNumReceives; ++i) {
331  const int src = plan.getProcsFrom()[i];
332  (*ialltofewv_.rdispls)[src] = importStarts[i];
333  (*ialltofewv_.recvcounts)[src] = Teuchos::as<int>(importLengths[i]);
334  }
335  }
336 
337  } // rootIdx
338 
339  // TODO: do we need to pass ExportValue{} here?
340  MPI_Datatype rawType = ::Tpetra::Details::MpiTypeTraits<ExportValue>::getType(ExportValue{});
341  // FIXME: is there a better way to do this?
342  Teuchos::RCP<const Teuchos::MpiComm<int>> tMpiComm =
343  Teuchos::rcp_dynamic_cast<const Teuchos::MpiComm<int>>(comm);
344  Teuchos::RCP<const Teuchos::OpaqueWrapper<MPI_Comm>> oMpiComm =
345  tMpiComm->getRawMpiComm();
346  MPI_Comm mpiComm = (*oMpiComm)();
347 
348  // don't care about send-side accessibility because it's not accessed through kokkos
349  // rely on MPI to do the right thing
350  constexpr bool recvDevAccess = Kokkos::SpaceAccessibility<
351  Kokkos::DefaultExecutionSpace, typename ImpView::memory_space>::accessible;
352  constexpr bool sendDevAccess = Kokkos::SpaceAccessibility<
353  Kokkos::DefaultExecutionSpace, typename ExpView::memory_space>::accessible;
354  static_assert(recvDevAccess == sendDevAccess, "sending across host/device");
355 
356  const int err = ialltofewv_.impl.post<recvDevAccess>(exports.data(), ialltofewv_.sendcounts->data(), ialltofewv_.sdispls->data(), rawType,
357  imports.data(), ialltofewv_.recvcounts->data(), ialltofewv_.rdispls->data(),
358  roots, nroots,
359  rawType,
360  mpiTag_, mpiComm, &(*ialltofewv_.req));
361 
362  TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error,
363  "ialltofewv failed with error \""
364  << Teuchos::mpiErrorCodeToString(err)
365  << "\".");
366 }
367 
368 template <class ExpView, class ImpView>
369 void DistributorActor::doPostsAllToAllImpl(const DistributorPlan& plan,
370  const ExpView& exports,
371  const SubViewLimits& exportSubViewLimits,
372  const ImpView& imports,
373  const SubViewLimits& importSubViewLimits) {
374  TEUCHOS_TEST_FOR_EXCEPTION(
375  !plan.getIndicesTo().is_null(), std::runtime_error,
376  "Send Type=\"Alltoall\" only works for fast-path communication.");
377 
378  using size_type = Teuchos::Array<size_t>::size_type;
379 
380  auto comm = plan.getComm();
381  std::vector<int> sendcounts(comm->getSize(), 0);
382  std::vector<int> sdispls(comm->getSize(), 0);
383  std::vector<int> recvcounts(comm->getSize(), 0);
384  std::vector<int> rdispls(comm->getSize(), 0);
385 
386  auto& [importStarts, importLengths] = importSubViewLimits;
387  auto& [exportStarts, exportLengths] = exportSubViewLimits;
388 
389  for (size_t pp = 0; pp < plan.getNumSends(); ++pp) {
390  sdispls[plan.getProcsTo()[pp]] = exportStarts[pp];
391  size_t numPackets = exportLengths[pp];
392  // numPackets is converted down to int, so make sure it can be represented
393  TEUCHOS_TEST_FOR_EXCEPTION(numPackets > size_t(INT_MAX), std::logic_error,
394  "Tpetra::Distributor::doPostsAllToAll: "
395  "Send count for send "
396  << pp << " (" << numPackets
397  << ") is too large "
398  "to be represented as int.");
399  sendcounts[plan.getProcsTo()[pp]] = static_cast<int>(numPackets);
400  }
401 
402  const size_type actualNumReceives =
403  Teuchos::as<size_type>(plan.getNumReceives()) +
404  Teuchos::as<size_type>(plan.hasSelfMessage() ? 1 : 0);
405 
406  for (size_type i = 0; i < actualNumReceives; ++i) {
407  rdispls[plan.getProcsFrom()[i]] = importStarts[i];
408  size_t totalPacketsFrom_i = importLengths[i];
409  // totalPacketsFrom_i is converted down to int, so make sure it can be
410  // represented
411  TEUCHOS_TEST_FOR_EXCEPTION(totalPacketsFrom_i > size_t(INT_MAX),
412  std::logic_error,
413  "Tpetra::Distributor::doPostsAllToAll: "
414  "Recv count for receive "
415  << i << " (" << totalPacketsFrom_i
416  << ") is too large "
417  "to be represented as int.");
418  recvcounts[plan.getProcsFrom()[i]] = static_cast<int>(totalPacketsFrom_i);
419  }
420 
421  Teuchos::RCP<const Teuchos::MpiComm<int>> mpiComm =
422  Teuchos::rcp_dynamic_cast<const Teuchos::MpiComm<int>>(comm);
423  Teuchos::RCP<const Teuchos::OpaqueWrapper<MPI_Comm>> rawComm =
424  mpiComm->getRawMpiComm();
425  using T = typename ExpView::non_const_value_type;
426  MPI_Datatype rawType = ::Tpetra::Details::MpiTypeTraits<T>::getType(T());
427 
428 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
429  if (Details::DISTRIBUTOR_MPIADVANCE_ALLTOALL == plan.getSendType()) {
430  MPIX_Comm* mpixComm = *plan.getMPIXComm();
431  TEUCHOS_TEST_FOR_EXCEPTION(!mpixComm, std::runtime_error,
432  "MPIX_Comm is null in doPostsAllToAll \""
433  << __FILE__ << ":" << __LINE__);
434 
435  const int err = MPIX_Alltoallv(
436  exports.data(), sendcounts.data(), sdispls.data(), rawType,
437  imports.data(), recvcounts.data(), rdispls.data(), rawType, mpixComm);
438 
439  TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error,
440  "MPIX_Alltoallv failed with error \""
441  << Teuchos::mpiErrorCodeToString(err)
442  << "\".");
443 
444  return;
445  }
446 #endif // HAVE_TPETRACORE_MPI_ADVANCE
447 
448  const int err = MPI_Alltoallv(
449  exports.data(), sendcounts.data(), sdispls.data(), rawType,
450  imports.data(), recvcounts.data(), rdispls.data(), rawType, (*rawComm)());
451 
452  TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error,
453  "MPI_Alltoallv failed with error \""
454  << Teuchos::mpiErrorCodeToString(err)
455  << "\".");
456 }
457 
458 #if defined(HAVE_TPETRACORE_MPI_ADVANCE)
459 template <class ExpView, class ImpView>
460 void DistributorActor::doPostsNbrAllToAllVImpl(const DistributorPlan& plan,
461  const ExpView& exports,
462  const SubViewLimits& exportSubViewLimits,
463  const ImpView& imports,
464  const SubViewLimits& importSubViewLimits) {
465  TEUCHOS_TEST_FOR_EXCEPTION(
466  !plan.getIndicesTo().is_null(), std::runtime_error,
467  "Send Type=\"Alltoall\" only works for fast-path communication.");
468 
469  const int myRank = plan.getComm()->getRank();
470  MPIX_Comm* mpixComm = *plan.getMPIXComm();
471  using size_type = Teuchos::Array<size_t>::size_type;
472 
473  const size_t numSends = plan.getNumSends() + plan.hasSelfMessage();
474  const size_t numRecvs = plan.getNumReceives() + plan.hasSelfMessage();
475  std::vector<int> sendcounts(numSends, 0);
476  std::vector<int> sdispls(numSends, 0);
477  std::vector<int> recvcounts(numRecvs, 0);
478  std::vector<int> rdispls(numRecvs, 0);
479 
480  auto& [importStarts, importLengths] = importSubViewLimits;
481  auto& [exportStarts, exportLengths] = exportSubViewLimits;
482 
483  for (size_t pp = 0; pp < numSends; ++pp) {
484  sdispls[pp] = exportStarts[pp];
485  size_t numPackets = exportLengths[pp];
486  // numPackets is converted down to int, so make sure it can be represented
487  TEUCHOS_TEST_FOR_EXCEPTION(numPackets > size_t(INT_MAX), std::logic_error,
488  "Tpetra::Distributor::doPostsNbrAllToAllV: "
489  "Send count for send "
490  << pp << " (" << numPackets
491  << ") is too large "
492  "to be represented as int.");
493  sendcounts[pp] = static_cast<int>(numPackets);
494  }
495 
496  for (size_type i = 0; i < numRecvs; ++i) {
497  rdispls[i] = importStarts[i];
498  size_t totalPacketsFrom_i = importLengths[i];
499  // totalPacketsFrom_i is converted down to int, so make sure it can be
500  // represented
501  TEUCHOS_TEST_FOR_EXCEPTION(totalPacketsFrom_i > size_t(INT_MAX),
502  std::logic_error,
503  "Tpetra::Distributor::doPostsNbrAllToAllV: "
504  "Recv count for receive "
505  << i << " (" << totalPacketsFrom_i
506  << ") is too large "
507  "to be represented as int.");
508  recvcounts[i] = static_cast<int>(totalPacketsFrom_i);
509  }
510 
511  using T = typename ExpView::non_const_value_type;
512  MPI_Datatype rawType = ::Tpetra::Details::MpiTypeTraits<T>::getType(T());
513 
514  MPIX_Info* xinfo;
515  MPIX_Topo* xtopo;
516  MPIX_Info_init(&xinfo);
517  MPIX_Topo_init(numRecvs, plan.getProcsFrom().data(), recvcounts.data(),
518  numSends, plan.getProcsTo().data(), sendcounts.data(), xinfo, &xtopo);
519  const int err = MPIX_Neighbor_alltoallv_topo(
520  exports.data(), sendcounts.data(), sdispls.data(), rawType,
521  imports.data(), recvcounts.data(), rdispls.data(), rawType, xtopo, mpixComm);
522  MPIX_Topo_free(&xtopo);
523  MPIX_Info_free(&xinfo);
524 
525  TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error,
526  "MPIX_Neighbor_alltoallv failed with error \""
527  << Teuchos::mpiErrorCodeToString(err)
528  << "\".");
529 }
530 #endif // HAVE_TPETRACORE_MPI_ADVANCE
531 #endif // HAVE_TPETRA_MPI
532 
533 template <class ImpView>
534 void DistributorActor::doPostRecvs(const DistributorPlan& plan,
535  size_t numPackets,
536  const ImpView& imports) {
537  auto importSubViewLimits = plan.getImportViewLimits(numPackets);
538  doPostRecvsImpl(plan, imports, importSubViewLimits);
539 }
540 
541 template <class ImpView>
542 void DistributorActor::doPostRecvs(const DistributorPlan& plan,
543  const ImpView& imports,
544  const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID) {
545  auto importSubViewLimits = plan.getImportViewLimits(numImportPacketsPerLID);
546  doPostRecvsImpl(plan, imports, importSubViewLimits);
547 }
548 
549 template <class ImpView>
550 void DistributorActor::doPostRecvsImpl(const DistributorPlan& plan,
551  const ImpView& imports,
552  const SubViewLimits& importSubViewLimits) {
553  static_assert(isKokkosView<ImpView>,
554  "Data arrays for DistributorActor::doPostRecvs must be Kokkos::Views");
555  using Kokkos::Compat::subview_offset;
556  using Teuchos::Array;
557  using Teuchos::as;
558  using Teuchos::ireceive;
559  using size_type = Array<size_t>::size_type;
560  using imports_view_type = ImpView;
561 
562  // Set the MPI message tag for this round of communication.
563  // The same tag will be used for recvs and sends. For every round of communication,
564  // the tag gets incremented by one, until it eventually gets looped around back to a
565  // small value. This logic is implemented in Teuchos.
566  auto comm = plan.getComm();
567  {
568  auto non_const_comm = Teuchos::rcp_const_cast<Teuchos::Comm<int>>(comm);
569  mpiTag_ = non_const_comm->incrementTag();
570  }
571 
572 #ifdef KOKKOS_ENABLE_CUDA
573  static_assert(!std::is_same<typename ImpView::memory_space, Kokkos::CudaUVMSpace>::value,
574  "Please do not use Tpetra::Distributor with UVM "
575  "allocations. See GitHub issue #1088.");
576 #endif // KOKKOS_ENABLE_CUDA
577 
578 #ifdef KOKKOS_ENABLE_SYCL
579  static_assert(!std::is_same<typename ImpView::memory_space, Kokkos::Experimental::SYCLSharedUSMSpace>::value,
580  "Please do not use Tpetra::Distributor with SharedUSM "
581  "allocations. See GitHub issue #1088 (corresponding to CUDA).");
582 #endif // KOKKOS_ENABLE_SYCL
583 
584 #if defined(HAVE_TPETRA_MPI)
585  // All-to-all communication layout is quite different from
586  // point-to-point, so we handle it separately.
587 
588  // These send options require no matching receives, so we just return.
589  const Details::EDistributorSendType sendType = plan.getSendType();
590  if ((sendType == Details::DISTRIBUTOR_ALLTOALL) || (sendType == Details::DISTRIBUTOR_IALLTOFEWV)
591 #ifdef HAVE_TPETRACORE_MPI_ADVANCE
592  || (sendType == Details::DISTRIBUTOR_MPIADVANCE_ALLTOALL) || (sendType == Details::DISTRIBUTOR_MPIADVANCE_NBRALLTOALLV)
593 #endif
594  ) {
595  return;
596  }
597 #endif // HAVE_TPETRA_MPI
598 
599  ProfilingRegion pr("Tpetra::Distributor::doPostRecvs");
600 
601  const int myProcID = plan.getComm()->getRank();
602 
603  auto& [importStarts, importLengths] = importSubViewLimits;
604 
605  // Distributor uses requestsRecv_.size() and requestsSend_.size()
606  // as the number of outstanding nonblocking message requests, so
607  // we resize to zero to maintain this invariant.
608  //
609  // getNumReceives() does _not_ include the self message, if there is
610  // one. Here, we do actually send a message to ourselves, so we
611  // include any self message in the "actual" number of receives to
612  // post.
613  //
614  // NOTE (mfh 19 Mar 2012): Epetra_MpiDistributor::DoPosts()
615  // doesn't (re)allocate its array of requests. That happens in
616  // CreateFromSends(), ComputeRecvs_(), DoReversePosts() (on
617  // demand), or Resize_().
618  const size_type actualNumReceives = as<size_type>(plan.getNumReceives()) +
619  as<size_type>(plan.hasSelfMessage() ? 1 : 0);
620 
621 #ifdef HAVE_TPETRA_DEBUG
622  size_t totalNumImportPackets = 0;
623  for (size_t i = 0; i < Teuchos::as<size_t>(actualNumReceives); ++i) {
624  totalNumImportPackets += importLengths[i];
625  }
626  TEUCHOS_TEST_FOR_EXCEPTION(
627  imports.extent(0) < totalNumImportPackets, std::runtime_error,
628  "Tpetra::Distributor::doPostRecvs: The 'imports' array must have "
629  "enough entries to hold the expected number of import packets. "
630  "imports.extent(0) = "
631  << imports.extent(0) << " < "
632  "totalNumImportPackets = "
633  << totalNumImportPackets << ".");
634  TEUCHOS_TEST_FOR_EXCEPTION(!requestsRecv_.empty(), std::logic_error,
635  "Tpetra::Distributor::"
636  "doPostRecvs: Process "
637  << myProcID << ": requestsRecv_.size () = "
638  << requestsRecv_.size() << " != 0.");
639 #endif // HAVE_TPETRA_DEBUG
640 
641  requestsRecv_.resize(0);
642 
643  // Post the nonblocking receives. It's common MPI wisdom to post
644  // receives before sends. In MPI terms, this means favoring
645  // adding to the "posted queue" (of receive requests) over adding
646  // to the "unexpected queue" (of arrived messages not yet matched
647  // with a receive).
648  {
649  ProfilingRegion prr("Tpetra::Distributor::doPostRecvs MPI_Irecv");
650 
651  for (size_type i = 0; i < actualNumReceives; ++i) {
652  size_t totalPacketsFrom_i = importLengths[Teuchos::as<size_t>(i)];
653  TEUCHOS_TEST_FOR_EXCEPTION(totalPacketsFrom_i > size_t(INT_MAX),
654  std::logic_error,
655  "Tpetra::Distributor::doPostRecvs: "
656  "Recv count for receive "
657  << i << " (" << totalPacketsFrom_i << ") is too large "
658  "to be represented as int.");
659  if (plan.getProcsFrom()[i] != myProcID && totalPacketsFrom_i) {
660  // If my process is receiving these packet(s) from another
661  // process (not a self-receive), and if there is at least
662  // one packet to receive:
663  //
664  // 1. Set up the persisting view (recvBuf) into the imports
665  // array, given the offset and size (total number of
666  // packets from process getProcsFrom()[i]).
667  // 2. Start the Irecv and save the resulting request.
668  imports_view_type recvBuf =
669  subview_offset(imports, importStarts[i], totalPacketsFrom_i);
670  requestsRecv_.push_back(ireceive<int>(recvBuf, plan.getProcsFrom()[i],
671  mpiTag_, *plan.getComm()));
672  }
673  }
674  }
675 }
676 
677 template <class ExpView, class ImpView>
678 void DistributorActor::doPostSends(const DistributorPlan& plan,
679  const ExpView& exports,
680  size_t numPackets,
681  const ImpView& imports) {
682  auto exportSubViewLimits = plan.getExportViewLimits(numPackets);
683  auto importSubViewLimits = plan.getImportViewLimits(numPackets);
684  doPostSendsImpl(plan, exports, exportSubViewLimits, imports, importSubViewLimits);
685 }
686 
687 template <class ExpView, class ImpView>
688 void DistributorActor::doPostSends(const DistributorPlan& plan,
689  const ExpView& exports,
690  const Teuchos::ArrayView<const size_t>& numExportPacketsPerLID,
691  const ImpView& imports,
692  const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID) {
693  auto exportSubViewLimits = plan.getExportViewLimits(numExportPacketsPerLID);
694  auto importSubViewLimits = plan.getImportViewLimits(numImportPacketsPerLID);
695  doPostSendsImpl(plan, exports, exportSubViewLimits, imports, importSubViewLimits);
696 }
697 
698 template <class ExpView, class ImpView>
699 void DistributorActor::doPostSendsImpl(const DistributorPlan& plan,
700  const ExpView& exports,
701  const SubViewLimits& exportSubViewLimits,
702  const ImpView& imports,
703  const SubViewLimits& importSubViewLimits) {
704  static_assert(areKokkosViews<ExpView, ImpView>,
705  "Data arrays for DistributorActor::doPostSends must be Kokkos::Views");
706  using Kokkos::Compat::deep_copy_offset;
707  using Kokkos::Compat::subview_offset;
708  using Teuchos::Array;
709  using Teuchos::as;
710  using Teuchos::isend;
711  using Teuchos::send;
712  using size_type = Array<size_t>::size_type;
713  using exports_view_type = ExpView;
714 
715 #ifdef KOKKOS_ENABLE_CUDA
716  static_assert(!std::is_same<typename ExpView::memory_space, Kokkos::CudaUVMSpace>::value &&
717  !std::is_same<typename ImpView::memory_space, Kokkos::CudaUVMSpace>::value,
718  "Please do not use Tpetra::Distributor with UVM allocations. "
719  "See Trilinos GitHub issue #1088.");
720 #endif // KOKKOS_ENABLE_CUDA
721 
722 #ifdef KOKKOS_ENABLE_SYCL
723  static_assert(!std::is_same<typename ExpView::memory_space, Kokkos::Experimental::SYCLSharedUSMSpace>::value &&
724  !std::is_same<typename ImpView::memory_space, Kokkos::Experimental::SYCLSharedUSMSpace>::value,
725  "Please do not use Tpetra::Distributor with SharedUSM allocations. "
726  "See Trilinos GitHub issue #1088 (corresponding to CUDA).");
727 #endif // KOKKOS_ENABLE_SYCL
728 
729  ProfilingRegion ps("Tpetra::Distributor::doPostSends");
730 
731  const int myRank = plan.getComm()->getRank();
732  // Run-time configurable parameters that come from the input
733  // ParameterList set by setParameterList().
734  const Details::EDistributorSendType sendType = plan.getSendType();
735 
736  auto& [exportStarts, exportLengths] = exportSubViewLimits;
737  auto& [importStarts, importLengths] = importSubViewLimits;
738 
739 #if defined(HAVE_TPETRA_MPI)
740  // All-to-all communication layout is quite different from
741  // point-to-point, so we handle it separately.
742 
743  if (sendType == Details::DISTRIBUTOR_ALLTOALL) {
744  doPostsAllToAllImpl(plan, exports, exportSubViewLimits, imports, importSubViewLimits);
745  return;
746  } else if (sendType == Details::DISTRIBUTOR_IALLTOFEWV) {
747  doPostsIalltofewvImpl(plan, exports, exportSubViewLimits, imports, importSubViewLimits);
748  return;
749  }
750 #ifdef HAVE_TPETRACORE_MPI_ADVANCE
751  else if (sendType == Details::DISTRIBUTOR_MPIADVANCE_ALLTOALL) {
752  doPostsAllToAllImpl(plan, exports, exportSubViewLimits, imports, importSubViewLimits);
753  return;
754  } else if (sendType == Details::DISTRIBUTOR_MPIADVANCE_NBRALLTOALLV) {
755  doPostsNbrAllToAllVImpl(plan, exports, exportSubViewLimits, imports, importSubViewLimits);
756  return;
757  }
758 #endif // defined(HAVE_TPETRACORE_MPI_ADVANCE)
759 
760 #else // HAVE_TPETRA_MPI
761  if (plan.hasSelfMessage()) {
762  // This is how we "send a message to ourself": we copy from
763  // the export buffer to the import buffer. That saves
764  // Teuchos::Comm implementations other than MpiComm (in
765  // particular, SerialComm) the trouble of implementing self
766  // messages correctly. (To do this right, SerialComm would
767  // need internal buffer space for messages, keyed on the
768  // message's tag.)
769  size_t selfReceiveOffset = 0;
770  deep_copy_offset(imports, exports, selfReceiveOffset,
771  exportStarts[0],
772  exportLengths[0]);
773  }
774  // should we just return here?
775  // likely not as comm could be a serial comm
776 #endif // HAVE_TPETRA_MPI
777 
778  size_t selfReceiveOffset = 0;
779 
780 #ifdef HAVE_TPETRA_DEBUG
781  TEUCHOS_TEST_FOR_EXCEPTION(requestsSend_.size() != 0,
782  std::logic_error,
783  "Tpetra::Distributor::doPostSends: Process "
784  << myRank << ": requestsSend_.size() = " << requestsSend_.size() << " != 0.");
785 #endif // HAVE_TPETRA_DEBUG
786 
787  // Distributor uses requestsRecv_.size() and requestsSend_.size()
788  // as the number of outstanding nonblocking message requests, so
789  // we resize to zero to maintain this invariant.
790  //
791  // getNumReceives() does _not_ include the self message, if there is
792  // one. Here, we do actually send a message to ourselves, so we
793  // include any self message in the "actual" number of receives to
794  // post.
795  //
796  // NOTE (mfh 19 Mar 2012): Epetra_MpiDistributor::DoPosts()
797  // doesn't (re)allocate its array of requests. That happens in
798  // CreateFromSends(), ComputeRecvs_(), DoReversePosts() (on
799  // demand), or Resize_().
800  const size_type actualNumReceives = as<size_type>(plan.getNumReceives()) +
801  as<size_type>(plan.hasSelfMessage() ? 1 : 0);
802  requestsSend_.resize(0);
803 
804  {
805  for (size_type i = 0; i < actualNumReceives; ++i) {
806  if (plan.getProcsFrom()[i] == myRank) { // Receiving from myself
807  selfReceiveOffset = importStarts[i]; // Remember the self-recv offset
808  }
809  }
810  }
811 
812  ProfilingRegion pss("Tpetra::Distributor::doPostSends sends");
813 
814  // setup scan through getProcsTo() list starting with higher numbered procs
815  // (should help balance message traffic)
816  //
817  // FIXME (mfh 20 Feb 2013) Why haven't we precomputed this?
818  // It doesn't depend on the input at all.
819  size_t numBlocks = plan.getNumSends() + plan.hasSelfMessage();
820  size_t procIndex = 0;
821  while ((procIndex < numBlocks) && (plan.getProcsTo()[procIndex] < myRank)) {
822  ++procIndex;
823  }
824  if (procIndex == numBlocks) {
825  procIndex = 0;
826  }
827 
828  size_t selfNum = 0;
829  size_t selfIndex = 0;
830 
831  if (plan.getIndicesTo().is_null()) {
832  const char isend_region[] = "Tpetra::Distributor::doPostSends MPI_Isend FAST";
833  const char send_region[] = "Tpetra::Distributor::doPostSends MPI_Send FAST";
834  ProfilingRegion pssf((sendType == Details::DISTRIBUTOR_ISEND) ? isend_region : send_region);
835 
836  // Data are already blocked (laid out) by process, so we don't
837  // need a separate send buffer (besides the exports array).
838  for (size_t i = 0; i < numBlocks; ++i) {
839  size_t p = i + procIndex;
840  if (p > (numBlocks - 1)) {
841  p -= numBlocks;
842  }
843 
844  if (plan.getProcsTo()[p] != myRank) {
845  if (exportLengths[p] == 0) {
846  // Do not attempt to send messages of length 0.
847  continue;
848  }
849 
850  exports_view_type tmpSend = subview_offset(exports, exportStarts[p], exportLengths[p]);
851 
852  if (sendType == Details::DISTRIBUTOR_ISEND) {
853  // NOTE: This looks very similar to the tmpSend above, but removing
854  // tmpSendBuf and uses tmpSend leads to a performance hit on Arm
855  // SerialNode builds
856  exports_view_type tmpSendBuf =
857  subview_offset(exports, exportStarts[p], exportLengths[p]);
858  requestsSend_.push_back(isend<int>(tmpSendBuf, plan.getProcsTo()[p],
859  mpiTag_, *plan.getComm()));
860  } else { // DISTRIBUTOR_SEND
861  send<int>(tmpSend,
862  as<int>(tmpSend.size()),
863  plan.getProcsTo()[p], mpiTag_, *plan.getComm());
864  }
865  } else { // "Sending" the message to myself
866  selfNum = p;
867  }
868  }
869 
870  if (plan.hasSelfMessage()) {
871  // This is how we "send a message to ourself": we copy from
872  // the export buffer to the import buffer. That saves
873  // Teuchos::Comm implementations other than MpiComm (in
874  // particular, SerialComm) the trouble of implementing self
875  // messages correctly. (To do this right, SerialComm would
876  // need internal buffer space for messages, keyed on the
877  // message's tag.)
878  deep_copy_offset(imports, exports, selfReceiveOffset,
879  exportStarts[selfNum], exportLengths[selfNum]);
880  }
881 
882  } else { // data are not blocked by proc, use send buffer
883  ProfilingRegion psss("Tpetra::Distributor::doPostSends: MPI_Send SLOW");
884 
885  using Packet = typename ExpView::non_const_value_type;
886  using Layout = typename ExpView::array_layout;
887  using Device = typename ExpView::device_type;
888  using Mem = typename ExpView::memory_traits;
889 
890  // This buffer is long enough for only one message at a time.
891  // Thus, we use DISTRIBUTOR_SEND always in this case, regardless
892  // of sendType requested by user.
893  // This code path formerly errored out with message:
894  // Tpetra::Distributor::doPosts(3 args):
895  // The "send buffer" code path
896  // doesn't currently work with nonblocking sends.
897  // Now, we opt to just do the communication in a way that works.
898 #ifdef HAVE_TPETRA_DEBUG
899  if (sendType != Details::DISTRIBUTOR_SEND) {
900  if (plan.getComm()->getRank() == 0)
901  std::cout << "The requested Tpetra send type "
903  << " requires Distributor data to be ordered by"
904  << " the receiving processor rank. Since these"
905  << " data are not ordered, Tpetra will use Send"
906  << " instead." << std::endl;
907  }
908 #endif
909 
910  size_t maxSendLength = 0;
911  for (size_t i = 0; i < numBlocks; ++i) {
912  size_t p = i + procIndex;
913  if (p > (numBlocks - 1)) {
914  p -= numBlocks;
915  }
916 
917  size_t sendArrayOffset = 0;
918  size_t j = plan.getStartsTo()[p];
919  for (size_t k = 0; k < plan.getLengthsTo()[p]; ++k, ++j) {
920  sendArrayOffset += exportLengths[j];
921  }
922  maxSendLength = std::max(maxSendLength, sendArrayOffset);
923  }
924  Kokkos::View<Packet*, Layout, Device, Mem> sendArray("sendArray", maxSendLength);
925 
926  for (size_t i = 0; i < numBlocks; ++i) {
927  size_t p = i + procIndex;
928  if (p > (numBlocks - 1)) {
929  p -= numBlocks;
930  }
931 
932  if (plan.getProcsTo()[p] != myRank) {
933  size_t sendArrayOffset = 0;
934  size_t j = plan.getStartsTo()[p];
935  for (size_t k = 0; k < plan.getLengthsTo()[p]; ++k, ++j) {
936  packOffset(sendArray, exports, sendArrayOffset, exportStarts[j], exportLengths[j]);
937  sendArrayOffset += exportLengths[j];
938  }
939  typename ExpView::execution_space().fence();
940 
941  ImpView tmpSend =
942  subview_offset(sendArray, size_t(0), sendArrayOffset);
943 
944  send<int>(tmpSend,
945  as<int>(tmpSend.size()),
946  plan.getProcsTo()[p], mpiTag_, *plan.getComm());
947  } else { // "Sending" the message to myself
948  selfNum = p;
949  selfIndex = plan.getStartsTo()[p];
950  }
951  }
952 
953  if (plan.hasSelfMessage()) {
954  for (size_t k = 0; k < plan.getLengthsTo()[selfNum]; ++k) {
955  packOffset(imports, exports, selfReceiveOffset, exportStarts[selfIndex], exportLengths[selfIndex]);
956  selfReceiveOffset += exportLengths[selfIndex];
957  ++selfIndex;
958  }
959  }
960  }
961 }
962 
963 } // namespace Tpetra::Details
964 
965 #endif
Add specializations of Teuchos::Details::MpiTypeTraits for Kokkos::complex&lt;float&gt; and Kokkos::complex...
Declaration of Tpetra::Details::Profiling, a scope guard for Kokkos Profiling.
std::string DistributorSendTypeEnumToString(EDistributorSendType sendType)
Convert an EDistributorSendType enum value to a string.
Stand-alone utility functions and macros.
EDistributorSendType
The type of MPI send that Distributor should use.