Tpetra parallel linear algebra  Version of the Day
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
Tpetra_Details_Ialltofewv.cpp
1 // @HEADER
2 // *****************************************************************************
3 // Tpetra: Templated Linear Algebra Services Package
4 //
5 // Copyright 2008 NTESS and the Tpetra contributors.
6 // SPDX-License-Identifier: BSD-3-Clause
7 // *****************************************************************************
8 // @HEADER
9 
10 #include "Tpetra_Details_Ialltofewv.hpp"
11 
12 #include <algorithm>
13 #include <cmath>
14 #include <vector>
15 
16 #include <mpi.h>
17 #include <Kokkos_Core.hpp>
18 
19 #ifndef NDEBUG
20 #include <iostream>
21 #include <sstream>
22 #endif
23 
24 namespace {
25 
26 struct ProfilingRegion {
27  ProfilingRegion() = delete;
28  ProfilingRegion(const ProfilingRegion &other) = delete;
29  ProfilingRegion(ProfilingRegion &&other) = delete;
30 
31  ProfilingRegion(const std::string &name) {
32  Kokkos::Profiling::pushRegion(name);
33  }
34  ~ProfilingRegion() {
35  Kokkos::Profiling::popRegion();
36  }
37 };
38 
39 struct MemcpyArg {
40  void *dst;
41  void *src;
42  size_t count;
43 };
44 
45 template <typename T>
46 KOKKOS_INLINE_FUNCTION bool is_compatible(const MemcpyArg &arg) {
47  return (0 == (uintptr_t(arg.dst) % sizeof(T))) && (0 == (uintptr_t(arg.src) & sizeof(T))) && (0 == (arg.count % sizeof(T)));
48 }
49 
50 template <typename T, typename Member>
51 KOKKOS_INLINE_FUNCTION void team_memcpy_as(const Member &member, void *dst, void *const src, size_t count) {
52  Kokkos::parallel_for(
53  Kokkos::TeamThreadRange(member, count),
54  [&](size_t i) {
55  reinterpret_cast<T *>(dst)[i] = reinterpret_cast<T const *>(src)[i];
56  });
57 }
58 
59 template <typename Member>
60 KOKKOS_INLINE_FUNCTION void team_memcpy(const Member &member, MemcpyArg &arg) {
61  if (is_compatible<uint64_t>(arg)) {
62  team_memcpy_as<uint64_t>(member, arg.dst, arg.src, arg.count / sizeof(uint64_t));
63  } else if (is_compatible<uint32_t>(arg)) {
64  team_memcpy_as<uint32_t>(member, arg.dst, arg.src, arg.count / sizeof(uint32_t));
65  } else {
66  team_memcpy_as<uint8_t>(member, arg.dst, arg.src, arg.count);
67  }
68 }
69 
70 } // namespace
71 
72 namespace Tpetra::Details {
73 
74 struct Ialltofewv::Cache::impl {
75  impl()
76  : rootBufDev("rootBufDev", 0)
77  , rootBufHost("rootBufHost", 0)
78  , aggBufDev("aggBufDev", 0)
79  , aggBufHost("rootBufHost", 0)
80  , argsDev("argsDev", 0)
81  , argsHost("argsHost", 0)
82  , rootBufGets_(0)
83  , rootBufHits_(0)
84  , aggBufGets_(0)
85  , aggBufHits_(0)
86  , argsGets_(0)
87  , argsHits_(0)
88  , rootBufDevSize_(0)
89  , aggBufDevSize_(0)
90  , argsDevSize_(0)
91  , argsHostSize_(0)
92  , rootBufHostSize_(0)
93  , aggBufHostSize_(0) {}
94 
95  // cached views
96  Kokkos::View<uint8_t *, typename Kokkos::DefaultExecutionSpace::memory_space> rootBufDev;
97  Kokkos::View<uint8_t *, typename Kokkos::DefaultHostExecutionSpace::memory_space> rootBufHost;
98  Kokkos::View<char *, typename Kokkos::DefaultExecutionSpace::memory_space> aggBufDev;
99  Kokkos::View<char *, typename Kokkos::DefaultHostExecutionSpace::memory_space> aggBufHost;
100  Kokkos::View<MemcpyArg *, typename Kokkos::DefaultExecutionSpace::memory_space> argsDev;
101  Kokkos::View<MemcpyArg *, typename Kokkos::DefaultHostExecutionSpace::memory_space> argsHost;
102 
103  size_t rootBufGets_;
104  size_t rootBufHits_;
105  size_t aggBufGets_;
106  size_t aggBufHits_;
107  size_t argsGets_;
108  size_t argsHits_;
109 
110  size_t rootBufDevSize_, aggBufDevSize_;
111  size_t argsDevSize_, argsHostSize_;
112  size_t rootBufHostSize_, aggBufHostSize_;
113 
114  template <typename ExecSpace>
115  auto get_rootBuf(size_t size) {
116  ++rootBufGets_;
117  if constexpr (std::is_same_v<ExecSpace, Kokkos::DefaultExecutionSpace>) {
118  if (rootBufDev.extent(0) < size) {
119  Kokkos::resize(Kokkos::WithoutInitializing, rootBufDev, size);
120  rootBufDevSize_ = size;
121  } else {
122  ++rootBufHits_;
123  }
124  return Kokkos::subview(rootBufDev, Kokkos::pair{size_t(0), size});
125  } else {
126  if (rootBufHost.extent(0) < size) {
127  Kokkos::resize(Kokkos::WithoutInitializing, rootBufHost, size);
128  rootBufHostSize_ = size;
129  } else {
130  ++rootBufHits_;
131  }
132  return Kokkos::subview(rootBufHost, Kokkos::pair{size_t(0), size});
133  }
134  }
135 
136  template <typename ExecSpace>
137  auto get_aggBuf(size_t size) {
138  ++aggBufGets_;
139  if constexpr (std::is_same_v<ExecSpace, Kokkos::DefaultExecutionSpace>) {
140  if (aggBufDev.extent(0) < size) {
141  Kokkos::resize(Kokkos::WithoutInitializing, aggBufDev, size);
142  aggBufHostSize_ = size;
143  } else {
144  ++aggBufHits_;
145  }
146  return Kokkos::subview(aggBufDev, Kokkos::pair{size_t(0), size});
147  } else {
148  if (aggBufHost.extent(0) < size) {
149  Kokkos::resize(Kokkos::WithoutInitializing, aggBufHost, size);
150  aggBufHostSize_ = size;
151  } else {
152  ++aggBufHits_;
153  }
154  return Kokkos::subview(aggBufHost, Kokkos::pair{size_t(0), size});
155  }
156  }
157 
158  template <typename ExecSpace>
159  auto get_args(size_t size) {
160  ++argsGets_;
161  if constexpr (std::is_same_v<ExecSpace, Kokkos::DefaultExecutionSpace>) {
162  if (argsDev.extent(0) < size) {
163  Kokkos::resize(Kokkos::WithoutInitializing, argsDev, size);
164  argsHostSize_ = size;
165  } else {
166  ++argsHits_;
167  }
168  return Kokkos::subview(argsDev, Kokkos::pair{size_t(0), size});
169  } else {
170  if (argsHost.extent(0) < size) {
171  Kokkos::resize(Kokkos::WithoutInitializing, argsHost, size);
172  argsHostSize_ = size;
173  } else {
174  ++argsHits_;
175  }
176  return Kokkos::subview(argsHost, Kokkos::pair{size_t(0), size});
177  }
178  }
179 };
180 
181 Ialltofewv::Cache::Cache() = default;
182 Ialltofewv::Cache::~Cache() = default;
183 
184 namespace {
185 template <typename RecvExecSpace>
186 int wait_impl(Ialltofewv::Req &req, Ialltofewv::Cache &cache) {
187  auto finalize = [&]() -> int {
188  req.completed = true;
189  return MPI_SUCCESS;
190  };
191 
192  if (0 == req.nroots) {
193  return finalize();
194  }
195 
196  ProfilingRegion pr("alltofewv::wait");
197 
198  // lazy-init view cache
199  if (!cache.pimpl) {
200  cache.pimpl = std::make_shared<Ialltofewv::Cache::impl>();
201  }
202 
203  const int rank = [&]() -> int {
204  int _rank;
205  MPI_Comm_rank(req.comm, &_rank);
206  return _rank;
207  }();
208 
209  const int size = [&]() -> int {
210  int _size;
211  MPI_Comm_size(req.comm, &_size);
212  return _size;
213  }();
214 
215  const size_t sendSize = [&]() -> size_t {
216  int _size;
217  MPI_Type_size(req.sendtype, &_size);
218  return _size;
219  }();
220 
221  const size_t recvSize = [&]() -> size_t {
222  int _size;
223  MPI_Type_size(req.recvtype, &_size);
224  return _size;
225  }();
226 
227  // is this rank a root? linear search - nroots expected to be small
228  const bool isRoot = std::find(req.roots, req.roots + req.nroots, rank) != req.roots + req.nroots;
229 
230  const int AGG_TAG = req.tag + 0;
231  const int ROOT_TAG = req.tag + 1;
232 
233  // Balance the number of incoming messages at each phase:
234  // Aggregation = size / naggs * nroots
235  // Root = naggs
236  // so
237  // size / naggs * nroots = naggs
238  // size * nroots = naggs^2
239  // naggs = sqrt(size * nroots)
240  const int naggs = std::sqrt(size_t(size) * size_t(req.nroots)) + /*rounding*/ 0.5;
241 
242  // how many srcs go to each aggregator
243  const int srcsPerAgg = (size + naggs - 1) / naggs;
244 
245  // the aggregator I send to
246  const int myAgg = rank / srcsPerAgg * srcsPerAgg;
247 
248  // ensure aggregators know how much data each rank is sending to the root
249  // [si * nroots + r1] -> how much si'th rank in group wants to send to root ri
250  std::vector<int> groupSendCounts(size_t(req.nroots) * size_t(srcsPerAgg));
251  std::vector<MPI_Request> reqs;
252  if (rank == myAgg) {
253  reqs.reserve(srcsPerAgg);
254  // recv counts from each member of my group
255  for (int si = 0; si < srcsPerAgg && si + rank < size; ++si) {
256  MPI_Request rreq;
257 
258 #ifndef NDEBUG
259  if (size_t(si) * req.nroots + req.nroots > groupSendCounts.size()) {
260  std::stringstream ss;
261  ss << __FILE__ << ":" << __LINE__
262  << " [" << rank << "] tpetra internal Ialltofewv error: OOB access in recv buffer\n";
263  std::cerr << ss.str();
264  }
265 #endif
266  MPI_Irecv(&groupSendCounts[size_t(si) * size_t(req.nroots)], req.nroots, MPI_INT, si + rank,
267  req.tag, req.comm, &rreq);
268  reqs.push_back(rreq);
269  }
270  }
271  // send sendcounts to aggregator
272  MPI_Send(req.sendcounts, req.nroots, MPI_INT, myAgg, req.tag, req.comm);
273 
274  MPI_Waitall(reqs.size(), reqs.data(), MPI_STATUSES_IGNORE);
275  reqs.resize(0);
276 
277  // at this point, in each aggregator, groupSendCounts holds the send counts
278  // from each member of the aggregator. The first nroots entries are from the first rank,
279  // the second nroots entries are from the second rank, etc
280 
281  // a temporary buffer to aggregate data. Data for a root is contiguous.
282  auto aggBuf = cache.pimpl->get_aggBuf<RecvExecSpace>(0);
283  std::vector<size_t> rootCount(req.nroots, 0); // [ri] the count of data held for root ri
284  if (rank == myAgg) {
285  size_t aggBytes = 0;
286  for (int si = 0; si < srcsPerAgg && si + rank < size; ++si) {
287  for (int ri = 0; ri < req.nroots; ++ri) {
288  int count = groupSendCounts[si * req.nroots + ri];
289  rootCount[ri] += count;
290  aggBytes += count * sendSize;
291  }
292  }
293 
294  aggBuf = cache.pimpl->get_aggBuf<RecvExecSpace>(aggBytes);
295  }
296  // now, on the aggregator ranks,
297  // * aggBuf is resized to accomodate all incoming data
298  // * rootCount holds how much data i hold for each root
299 
300  // Send the actual data to the aggregator
301  if (rank == myAgg) {
302  reqs.reserve(srcsPerAgg + req.nroots);
303  // receive from all ranks in my group
304  size_t displ = 0;
305  // senders will send in root order, so we will recv in that order as well
306  // this puts all data for a root contiguous in the aggregation buffer
307  for (int ri = 0; ri < req.nroots; ++ri) {
308  for (int si = 0; si < srcsPerAgg && si + rank < size; ++si) {
309  // receive data for the ri'th root from the si'th sender
310  const int count = groupSendCounts[si * req.nroots + ri];
311  if (count) {
312 #ifndef NDEBUG
313  if (displ + count * sendSize > aggBuf.size()) {
314  std::stringstream ss;
315  ss << __FILE__ << ":" << __LINE__
316  << " [" << rank << "] tpetra internal Ialltofewv error: OOB access in send buffer\n";
317  std::cerr << ss.str();
318  }
319 #endif
320  MPI_Request rreq;
321  // &aggBuf(displ)
322  MPI_Irecv(aggBuf.data() + displ, count, req.sendtype, si + rank, req.tag, req.comm, &rreq);
323  reqs.push_back(rreq);
324  displ += size_t(count) * sendSize;
325  }
326  }
327  }
328  } else {
329  reqs.reserve(req.nroots); // prepare for one send per root
330  }
331 
332  // send data to aggregator
333  for (int ri = 0; ri < req.nroots; ++ri) {
334  const size_t displ = size_t(req.sdispls[ri]) * sendSize;
335  const int count = req.sendcounts[ri];
336  if (count) {
337  MPI_Request sreq;
338  MPI_Isend(&reinterpret_cast<const char *>(req.sendbuf)[displ], req.sendcounts[ri],
339  req.sendtype, myAgg, AGG_TAG, req.comm, &sreq);
340  reqs.push_back(sreq);
341  }
342  }
343 
344  MPI_Waitall(reqs.size(), reqs.data(), MPI_STATUSES_IGNORE);
345  reqs.resize(0);
346 
347  // if I am a root, receive data from each aggregator
348  // The aggregator will send contiguous data, which we may need to spread out according to rdispls
349  auto rootBuf = cache.pimpl->get_rootBuf<RecvExecSpace>(0);
350  if (isRoot) {
351  reqs.reserve(naggs); // receive from each aggregator
352 
353  const size_t totalRecvd = recvSize * [&]() -> size_t {
354  size_t acc = 0;
355  for (int i = 0; i < size; ++i) {
356  acc += req.recvcounts[i];
357  }
358  return acc;
359  }();
360  rootBuf = cache.pimpl->get_rootBuf<RecvExecSpace>(totalRecvd);
361 
362  // Receive data from each aggregator.
363  // Aggregators send data in order of the ranks they're aggregating,
364  // which is also the order the root needs in its recv buffer.
365  size_t displ = 0;
366  for (int aggSrc = 0; aggSrc < size; aggSrc += srcsPerAgg) {
367  // tally up the total data to recv from the sending aggregator
368  int count = 0;
369  for (int origSrc = aggSrc;
370  origSrc < aggSrc + srcsPerAgg && origSrc < size; ++origSrc) {
371  count += req.recvcounts[origSrc];
372  }
373 
374  if (count) {
375  MPI_Request rreq;
376  MPI_Irecv(rootBuf.data() + displ, count, req.recvtype, aggSrc, ROOT_TAG, req.comm, &rreq);
377  reqs.push_back(rreq);
378  displ += size_t(count) * recvSize;
379  }
380  }
381  }
382 
383  // if I am an aggregator, forward data to the roots
384  // To each root, send my data in order of the ranks that sent to me
385  // which is the order the recvers expect
386  if (rank == myAgg) {
387  size_t displ = 0;
388  for (int ri = 0; ri < req.nroots; ++ri) {
389  const size_t count = rootCount[ri];
390  if (count) {
391  // &aggBuf[displ]
392  MPI_Send(aggBuf.data() + displ, count, req.sendtype, req.roots[ri], ROOT_TAG, req.comm);
393  displ += count * sendSize;
394  }
395  }
396  }
397 
398  MPI_Waitall(reqs.size(), reqs.data(), MPI_STATUSES_IGNORE);
399 
400  // at root, copy data from contiguous buffer into recv buffer
401  if (isRoot) {
402  // set up src and dst for each block
403  auto args = cache.pimpl->get_args<RecvExecSpace>(size);
404  auto args_h = Kokkos::create_mirror_view(Kokkos::WithoutInitializing, args);
405 
406  size_t srcOff = 0;
407  for (int sRank = 0; sRank < size; ++sRank) {
408  const size_t dstOff = req.rdispls[sRank] * recvSize;
409 
410  void *dst = &reinterpret_cast<char *>(req.recvbuf)[dstOff];
411  void *const src = rootBuf.data() + srcOff; // &rootBuf(srcOff);
412  const size_t count = req.recvcounts[sRank] * recvSize;
413  args_h(sRank) = MemcpyArg{dst, src, count};
414 
415 #ifndef NDEBUG
416  if (srcOff + count > rootBuf.extent(0)) {
417  std::stringstream ss;
418  ss << __FILE__ << ":" << __LINE__ << " Tpetra internal Ialltofewv error: src access OOB in memcpy\n";
419  std::cerr << ss.str();
420  }
421 #endif
422  srcOff += count;
423  }
424 
425  // Actually copy the data
426  Kokkos::deep_copy(args, args_h);
427  using Policy = Kokkos::TeamPolicy<RecvExecSpace>;
428  Policy policy(size, Kokkos::AUTO);
429  Kokkos::parallel_for(
430  "Tpetra::Details::Ialltofewv: apply rdispls to contiguous root buffer", policy,
431  KOKKOS_LAMBDA(typename Policy::member_type member) {
432  team_memcpy(member, args(member.league_rank()));
433  });
434  Kokkos::fence("Tpetra::Details::Ialltofewv: after apply rdispls to contiguous root buffer");
435  }
436 
437  return finalize();
438 }
439 } // namespace
440 
441 int Ialltofewv::wait(Req &req) {
442  if (req.devAccess) {
443  return wait_impl<Kokkos::DefaultExecutionSpace>(req, cache_);
444  } else {
445  return wait_impl<Kokkos::DefaultHostExecutionSpace>(req, cache_);
446  }
447 }
448 
449 int Ialltofewv::get_status(const Req &req, int *flag, MPI_Status * /*status*/) const {
450  *flag = req.completed;
451  return MPI_SUCCESS;
452 }
453 
454 } // namespace Tpetra::Details
void deep_copy(MultiVector< DS, DL, DG, DN > &dst, const MultiVector< SS, SL, SG, SN > &src)
Copy the contents of the MultiVector src into dst.
void finalize()
Finalize Tpetra.