Tpetra parallel linear algebra  Version of the Day
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
Tpetra_Details_Ialltofewv.cpp
1 // @HEADER
2 // *****************************************************************************
3 // Tpetra: Templated Linear Algebra Services Package
4 //
5 // Copyright 2008 NTESS and the Tpetra contributors.
6 // SPDX-License-Identifier: BSD-3-Clause
7 // *****************************************************************************
8 // @HEADER
9 
10 #include "Tpetra_Details_Ialltofewv.hpp"
11 
12 #include <algorithm>
13 #include <cmath>
14 #include <vector>
15 
16 #include <mpi.h>
17 #include <Kokkos_Core.hpp>
18 
19 #ifndef NDEBUG
20 #include <iostream>
21 #include <sstream>
22 #endif
23 
24 namespace {
25 
26  struct ProfilingRegion {
27  ProfilingRegion() = delete;
28  ProfilingRegion(const ProfilingRegion &other) = delete;
29  ProfilingRegion(ProfilingRegion &&other) = delete;
30 
31  ProfilingRegion(const std::string &name) {
32  Kokkos::Profiling::pushRegion(name);
33  }
34  ~ProfilingRegion() {
35  Kokkos::Profiling::popRegion();
36  }
37 };
38 
39  struct MemcpyArg {
40  void *dst;
41  void *src;
42  size_t count;
43 };
44 
45 
46 template <typename T>
47 KOKKOS_INLINE_FUNCTION bool is_compatible(const MemcpyArg &arg) {
48  return (0 == (uintptr_t(arg.dst) % sizeof(T)))
49  && (0 == (uintptr_t(arg.src) & sizeof(T)))
50  && (0 == (arg.count % sizeof(T)));
51 }
52 
53 template <typename T, typename Member>
54 KOKKOS_INLINE_FUNCTION void team_memcpy_as(const Member &member, void *dst, void *const src, size_t count) {
55  Kokkos::parallel_for(
56  Kokkos::TeamThreadRange(member, count),
57  [&] (size_t i) {
58  reinterpret_cast<T *>(dst)[i] = reinterpret_cast<T const *>(src)[i];
59  }
60  );
61 }
62 
63 template <typename Member>
64 KOKKOS_INLINE_FUNCTION void team_memcpy(const Member &member, MemcpyArg &arg) {
65  if (is_compatible<uint64_t>(arg)) {
66  team_memcpy_as<uint64_t>(member, arg.dst, arg.src, arg.count / sizeof(uint64_t));
67  } else if (is_compatible<uint32_t>(arg)) {
68  team_memcpy_as<uint32_t>(member, arg.dst, arg.src, arg.count / sizeof(uint32_t));
69  } else {
70  team_memcpy_as<uint8_t>(member, arg.dst, arg.src, arg.count);
71  }
72 }
73 
74 } // namespace
75 
76 namespace Tpetra::Details {
77 
78 struct Ialltofewv::Cache::impl {
79 
80  impl() :
81  rootBufDev("rootBufDev"), rootBufHost("rootBufHost"),
82  aggBufDev("aggBufDev"), aggBufHost("rootBufHost"),
83  argsDev("argsDev"), argsHost("argsHost"),
84  rootBufGets_(0), rootBufHits_(0),
85  aggBufGets_(0), aggBufHits_(0),
86  argsGets_(0), argsHits_(0),
87  rootBufDevSize_(0), aggBufDevSize_(0),
88  argsDevSize_(0), argsHostSize_(0),
89  rootBufHostSize_(0), aggBufHostSize_(0)
90 {}
91 
92  // cached views
93  Kokkos::View<uint8_t *, typename Kokkos::DefaultExecutionSpace::memory_space> rootBufDev;
94  Kokkos::View<uint8_t *, typename Kokkos::DefaultHostExecutionSpace::memory_space> rootBufHost;
95  Kokkos::View<char *, typename Kokkos::DefaultExecutionSpace::memory_space> aggBufDev;
96  Kokkos::View<char *, typename Kokkos::DefaultHostExecutionSpace::memory_space> aggBufHost;
97  Kokkos::View<MemcpyArg *, typename Kokkos::DefaultExecutionSpace::memory_space> argsDev;
98  Kokkos::View<MemcpyArg *, typename Kokkos::DefaultHostExecutionSpace::memory_space> argsHost;
99 
100  size_t rootBufGets_;
101  size_t rootBufHits_;
102  size_t aggBufGets_;
103  size_t aggBufHits_;
104  size_t argsGets_;
105  size_t argsHits_;
106 
107  size_t rootBufDevSize_, aggBufDevSize_;
108  size_t argsDevSize_, argsHostSize_;
109  size_t rootBufHostSize_, aggBufHostSize_;
110 
111  template <typename ExecSpace>
112  auto get_rootBuf(size_t size) {
113  ++rootBufGets_;
114  if constexpr(std::is_same_v<ExecSpace, Kokkos::DefaultExecutionSpace>) {
115  if (rootBufDev.extent(0) < size) {
116  Kokkos::resize(Kokkos::WithoutInitializing, rootBufDev, size);
117  rootBufDevSize_ = size;
118  } else {
119  ++rootBufHits_;
120  }
121  return Kokkos::subview(rootBufDev, Kokkos::pair{size_t(0), size});
122  } else {
123  if (rootBufHost.extent(0) < size) {
124  Kokkos::resize(Kokkos::WithoutInitializing, rootBufHost, size);
125  rootBufHostSize_ = size;
126  } else {
127  ++rootBufHits_;
128  }
129  return Kokkos::subview(rootBufHost, Kokkos::pair{size_t(0), size});
130  }
131  }
132 
133  template <typename ExecSpace>
134  auto get_aggBuf(size_t size) {
135  ++aggBufGets_;
136  if constexpr(std::is_same_v<ExecSpace, Kokkos::DefaultExecutionSpace>) {
137  if (aggBufDev.extent(0) < size) {
138  Kokkos::resize(Kokkos::WithoutInitializing, aggBufDev, size);
139  aggBufHostSize_ = size;
140  } else {
141  ++aggBufHits_;
142  }
143  return Kokkos::subview(aggBufDev, Kokkos::pair{size_t(0), size});
144  } else {
145  if (aggBufHost.extent(0) < size) {
146  Kokkos::resize(Kokkos::WithoutInitializing, aggBufHost, size);
147  aggBufHostSize_ = size;
148  } else {
149  ++aggBufHits_;
150  }
151  return Kokkos::subview(aggBufHost, Kokkos::pair{size_t(0), size});
152  }
153  }
154 
155  template <typename ExecSpace>
156  auto get_args(size_t size) {
157  ++argsGets_;
158  if constexpr(std::is_same_v<ExecSpace, Kokkos::DefaultExecutionSpace>) {
159  if (argsDev.extent(0) < size) {
160  Kokkos::resize(Kokkos::WithoutInitializing, argsDev, size);
161  argsHostSize_ = size;
162  } else {
163  ++argsHits_;
164  }
165  return Kokkos::subview(argsDev, Kokkos::pair{size_t(0), size});
166  } else {
167  if (argsHost.extent(0) < size) {
168  Kokkos::resize(Kokkos::WithoutInitializing, argsHost, size);
169  argsHostSize_ = size;
170  } else {
171  ++argsHits_;
172  }
173  return Kokkos::subview(argsHost, Kokkos::pair{size_t(0), size});
174  }
175  }
176 
177 };
178 
179 Ialltofewv::Cache::Cache() = default;
180 Ialltofewv::Cache::~Cache() = default;
181 
182 namespace {
183 template <typename RecvExecSpace>
184 int wait_impl(Ialltofewv::Req &req, Ialltofewv::Cache &cache) {
185  auto finalize = [&]() -> int {
186  req.completed = true;
187  return MPI_SUCCESS;
188  };
189 
190  if (0 == req.nroots) {
191  return finalize();
192  }
193 
194  ProfilingRegion pr("alltofewv::wait");
195 
196  // lazy-init view cache
197  if (!cache.pimpl) {
198  cache.pimpl = std::make_shared<Ialltofewv::Cache::impl>();
199  }
200 
201  const int rank = [&]() -> int {
202  int _rank;
203  MPI_Comm_rank(req.comm, &_rank);
204  return _rank;
205  }();
206 
207  const int size = [&]() -> int {
208  int _size;
209  MPI_Comm_size(req.comm, &_size);
210  return _size;
211  }();
212 
213  const size_t sendSize = [&]() -> size_t {
214  int _size;
215  MPI_Type_size(req.sendtype, &_size);
216  return _size;
217  }();
218 
219  const size_t recvSize = [&]() -> size_t {
220  int _size;
221  MPI_Type_size(req.recvtype, &_size);
222  return _size;
223  }();
224 
225  // is this rank a root? linear search - nroots expected to be small
226  const bool isRoot = std::find(req.roots, req.roots + req.nroots, rank) != req.roots + req.nroots;
227 
228  const int AGG_TAG = req.tag + 0;
229  const int ROOT_TAG = req.tag + 1;
230 
231  // Balance the number of incoming messages at each phase:
232  // Aggregation = size / naggs * nroots
233  // Root = naggs
234  // so
235  // size / naggs * nroots = naggs
236  // size * nroots = naggs^2
237  // naggs = sqrt(size * nroots)
238  const int naggs = std::sqrt(size_t(size) * size_t(req.nroots)) + /*rounding*/ 0.5;
239 
240  // how many srcs go to each aggregator
241  const int srcsPerAgg = (size + naggs - 1) / naggs;
242 
243  // the aggregator I send to
244  const int myAgg = rank / srcsPerAgg * srcsPerAgg;
245 
246  // ensure aggregators know how much data each rank is sending to the root
247  // [si * nroots + r1] -> how much si'th rank in group wants to send to root ri
248  std::vector<int> groupSendCounts(size_t(req.nroots) * size_t(srcsPerAgg));
249  std::vector<MPI_Request> reqs;
250  if (rank == myAgg) {
251  reqs.reserve(srcsPerAgg);
252  // recv counts from each member of my group
253  for (int si = 0; si < srcsPerAgg && si + rank < size; ++si) {
254  MPI_Request rreq;
255 
256 #ifndef NDEBUG
257  if (size_t(si) * req.nroots + req.nroots > groupSendCounts.size()) {
258  std::stringstream ss;
259  ss << __FILE__ << ":" << __LINE__
260  << " [" << rank << "] tpetra internal Ialltofewv error: OOB access in recv buffer\n";
261  std::cerr << ss.str();
262  }
263 #endif
264  MPI_Irecv(&groupSendCounts[size_t(si) * size_t(req.nroots)], req.nroots, MPI_INT, si + rank,
265  req.tag, req.comm, &rreq);
266  reqs.push_back(rreq);
267  }
268  }
269  // send sendcounts to aggregator
270  MPI_Send(req.sendcounts, req.nroots, MPI_INT, myAgg, req.tag, req.comm);
271 
272  MPI_Waitall(reqs.size(), reqs.data(), MPI_STATUSES_IGNORE);
273  reqs.resize(0);
274 
275  // at this point, in each aggregator, groupSendCounts holds the send counts
276  // from each member of the aggregator. The first nroots entries are from the first rank,
277  // the second nroots entries are from the second rank, etc
278 
279  // a temporary buffer to aggregate data. Data for a root is contiguous.
280  auto aggBuf = cache.pimpl->get_aggBuf<RecvExecSpace>(0);
281  std::vector<size_t> rootCount(req.nroots, 0); // [ri] the count of data held for root ri
282  if (rank == myAgg) {
283  size_t aggBytes = 0;
284  for (int si = 0; si < srcsPerAgg && si + rank < size; ++si) {
285  for (int ri = 0; ri < req.nroots; ++ri) {
286  int count = groupSendCounts[si * req.nroots + ri];
287  rootCount[ri] += count;
288  aggBytes += count * sendSize;
289  }
290  }
291 
292  aggBuf = cache.pimpl->get_aggBuf<RecvExecSpace>(aggBytes);
293  }
294  // now, on the aggregator ranks,
295  // * aggBuf is resized to accomodate all incoming data
296  // * rootCount holds how much data i hold for each root
297 
298  // Send the actual data to the aggregator
299  if (rank == myAgg) {
300  reqs.reserve(srcsPerAgg + req.nroots);
301  // receive from all ranks in my group
302  size_t displ = 0;
303  // senders will send in root order, so we will recv in that order as well
304  // this puts all data for a root contiguous in the aggregation buffer
305  for (int ri = 0; ri < req.nroots; ++ri) {
306  for (int si = 0; si < srcsPerAgg && si + rank < size; ++si) {
307  // receive data for the ri'th root from the si'th sender
308  const int count = groupSendCounts[si * req.nroots + ri];
309  if (count) {
310 #ifndef NDEBUG
311  if (displ + count * sendSize > aggBuf.size()) {
312  std::stringstream ss;
313  ss << __FILE__ << ":" << __LINE__
314  << " [" << rank << "] tpetra internal Ialltofewv error: OOB access in send buffer\n";
315  std::cerr << ss.str();
316  }
317 #endif
318  MPI_Request rreq;
319  // &aggBuf(displ)
320  MPI_Irecv(aggBuf.data() + displ, count, req.sendtype, si + rank, req.tag, req.comm, &rreq);
321  reqs.push_back(rreq);
322  displ += size_t(count) * sendSize;
323  }
324  }
325  }
326  } else {
327  reqs.reserve(req.nroots); // prepare for one send per root
328  }
329 
330  // send data to aggregator
331  for (int ri = 0; ri < req.nroots; ++ri) {
332  const size_t displ = size_t(req.sdispls[ri]) * sendSize;
333  const int count = req.sendcounts[ri];
334  if (count) {
335  MPI_Request sreq;
336  MPI_Isend(&reinterpret_cast<const char *>(req.sendbuf)[displ], req.sendcounts[ri],
337  req.sendtype, myAgg, AGG_TAG, req.comm, &sreq);
338  reqs.push_back(sreq);
339  }
340  }
341 
342  MPI_Waitall(reqs.size(), reqs.data(), MPI_STATUSES_IGNORE);
343  reqs.resize(0);
344 
345  // if I am a root, receive data from each aggregator
346  // The aggregator will send contiguous data, which we may need to spread out according to rdispls
347  auto rootBuf = cache.pimpl->get_rootBuf<RecvExecSpace>(0);
348  if (isRoot) {
349  reqs.reserve(naggs); // receive from each aggregator
350 
351  const size_t totalRecvd = recvSize * [&]() -> size_t {
352  size_t acc = 0;
353  for (int i = 0; i < size; ++i) {
354  acc += req.recvcounts[i];
355  }
356  return acc;
357  }();
358  rootBuf = cache.pimpl->get_rootBuf<RecvExecSpace>(totalRecvd);
359 
360  // Receive data from each aggregator.
361  // Aggregators send data in order of the ranks they're aggregating,
362  // which is also the order the root needs in its recv buffer.
363  size_t displ = 0;
364  for (int aggSrc = 0; aggSrc < size; aggSrc += srcsPerAgg) {
365 
366  // tally up the total data to recv from the sending aggregator
367  int count = 0;
368  for (int origSrc = aggSrc;
369  origSrc < aggSrc + srcsPerAgg && origSrc < size; ++origSrc) {
370  count += req.recvcounts[origSrc];
371  }
372 
373  if (count) {
374  MPI_Request rreq;
375  MPI_Irecv(rootBuf.data() + displ, count, req.recvtype, aggSrc, ROOT_TAG, req.comm, &rreq);
376  reqs.push_back(rreq);
377  displ += size_t(count) * recvSize;
378  }
379  }
380  }
381 
382  // if I am an aggregator, forward data to the roots
383  // To each root, send my data in order of the ranks that sent to me
384  // which is the order the recvers expect
385  if (rank == myAgg) {
386  size_t displ = 0;
387  for (int ri = 0; ri < req.nroots; ++ri) {
388  const size_t count = rootCount[ri];
389  if (count) {
390  // &aggBuf[displ]
391  MPI_Send(aggBuf.data() + displ, count, req.sendtype, req.roots[ri], ROOT_TAG, req.comm);
392  displ += count * sendSize;
393  }
394  }
395  }
396 
397  MPI_Waitall(reqs.size(), reqs.data(), MPI_STATUSES_IGNORE);
398 
399  // at root, copy data from contiguous buffer into recv buffer
400  if (isRoot) {
401 
402  // set up src and dst for each block
403  auto args = cache.pimpl->get_args<RecvExecSpace>(size);
404  auto args_h = Kokkos::create_mirror_view(Kokkos::WithoutInitializing, args);
405 
406  size_t srcOff = 0;
407  for (int sRank = 0; sRank < size; ++sRank) {
408  const size_t dstOff = req.rdispls[sRank] * recvSize;
409 
410  void *dst = &reinterpret_cast<char *>(req.recvbuf)[dstOff];
411  void *const src = rootBuf.data() + srcOff; // &rootBuf(srcOff);
412  const size_t count = req.recvcounts[sRank] * recvSize;
413  args_h(sRank) = MemcpyArg{dst, src, count};
414 
415 #ifndef NDEBUG
416  if (srcOff + count > rootBuf.extent(0)) {
417  std::stringstream ss;
418  ss << __FILE__ << ":" << __LINE__ << " Tpetra internal Ialltofewv error: src access OOB in memcpy\n";
419  std::cerr << ss.str();
420  }
421 #endif
422  srcOff += count;
423  }
424 
425  // Actually copy the data
426  Kokkos::deep_copy(args, args_h);
427  using Policy = Kokkos::TeamPolicy<RecvExecSpace>;
428  Policy policy(size, Kokkos::AUTO);
429  Kokkos::parallel_for("Tpetra::Details::Ialltofewv: apply rdispls to contiguous root buffer", policy,
430  KOKKOS_LAMBDA(typename Policy::member_type member){
431  team_memcpy(member, args(member.league_rank()));
432  }
433  );
434  Kokkos::fence("Tpetra::Details::Ialltofewv: after apply rdispls to contiguous root buffer");
435 
436  }
437 
438  return finalize();
439 }
440 } // namespace
441 
442 
443 int Ialltofewv::wait(Req &req) {
444  if (req.devAccess) {
445  return wait_impl<Kokkos::DefaultExecutionSpace>(req, cache_);
446  } else {
447  return wait_impl<Kokkos::DefaultHostExecutionSpace>(req, cache_);
448  }
449 }
450 
451 int Ialltofewv::get_status(const Req &req, int *flag, MPI_Status */*status*/) const {
452  *flag = req.completed;
453  return MPI_SUCCESS;
454 }
455 
456 } // namespace Tpetra::Details
void deep_copy(MultiVector< DS, DL, DG, DN > &dst, const MultiVector< SS, SL, SG, SN > &src)
Copy the contents of the MultiVector src into dst.
void finalize()
Finalize Tpetra.