10 #include "Tpetra_Details_Ialltofewv.hpp"
17 #include <Kokkos_Core.hpp>
26 struct ProfilingRegion {
27 ProfilingRegion() =
delete;
28 ProfilingRegion(
const ProfilingRegion &other) =
delete;
29 ProfilingRegion(ProfilingRegion &&other) =
delete;
31 ProfilingRegion(
const std::string &name) {
32 Kokkos::Profiling::pushRegion(name);
35 Kokkos::Profiling::popRegion();
46 KOKKOS_INLINE_FUNCTION
bool is_compatible(
const MemcpyArg &arg) {
47 return (0 == (uintptr_t(arg.dst) %
sizeof(T))) && (0 == (uintptr_t(arg.src) &
sizeof(T))) && (0 == (arg.count %
sizeof(T)));
50 template <
typename T,
typename Member>
51 KOKKOS_INLINE_FUNCTION
void team_memcpy_as(
const Member &member,
void *dst,
void *
const src,
size_t count) {
53 Kokkos::TeamThreadRange(member, count),
55 reinterpret_cast<T *
>(dst)[i] = reinterpret_cast<T const *>(src)[i];
59 template <
typename Member>
60 KOKKOS_INLINE_FUNCTION
void team_memcpy(
const Member &member, MemcpyArg &arg) {
61 if (is_compatible<uint64_t>(arg)) {
62 team_memcpy_as<uint64_t>(member, arg.dst, arg.src, arg.count /
sizeof(uint64_t));
63 }
else if (is_compatible<uint32_t>(arg)) {
64 team_memcpy_as<uint32_t>(member, arg.dst, arg.src, arg.count /
sizeof(uint32_t));
66 team_memcpy_as<uint8_t>(member, arg.dst, arg.src, arg.count);
72 namespace Tpetra::Details {
74 struct Ialltofewv::Cache::impl {
76 : rootBufDev(
"rootBufDev", 0)
77 , rootBufHost(
"rootBufHost", 0)
78 , aggBufDev(
"aggBufDev", 0)
79 , aggBufHost(
"rootBufHost", 0)
80 , argsDev(
"argsDev", 0)
81 , argsHost(
"argsHost", 0)
93 , aggBufHostSize_(0) {}
96 Kokkos::View<uint8_t *, typename Kokkos::DefaultExecutionSpace::memory_space> rootBufDev;
97 Kokkos::View<uint8_t *, typename Kokkos::DefaultHostExecutionSpace::memory_space> rootBufHost;
98 Kokkos::View<char *, typename Kokkos::DefaultExecutionSpace::memory_space> aggBufDev;
99 Kokkos::View<char *, typename Kokkos::DefaultHostExecutionSpace::memory_space> aggBufHost;
100 Kokkos::View<MemcpyArg *, typename Kokkos::DefaultExecutionSpace::memory_space> argsDev;
101 Kokkos::View<MemcpyArg *, typename Kokkos::DefaultHostExecutionSpace::memory_space> argsHost;
110 size_t rootBufDevSize_, aggBufDevSize_;
111 size_t argsDevSize_, argsHostSize_;
112 size_t rootBufHostSize_, aggBufHostSize_;
114 template <
typename ExecSpace>
115 auto get_rootBuf(
size_t size) {
117 if constexpr (std::is_same_v<ExecSpace, Kokkos::DefaultExecutionSpace>) {
118 if (rootBufDev.extent(0) < size) {
119 Kokkos::resize(Kokkos::WithoutInitializing, rootBufDev, size);
120 rootBufDevSize_ = size;
124 return Kokkos::subview(rootBufDev, Kokkos::pair{size_t(0), size});
126 if (rootBufHost.extent(0) < size) {
127 Kokkos::resize(Kokkos::WithoutInitializing, rootBufHost, size);
128 rootBufHostSize_ = size;
132 return Kokkos::subview(rootBufHost, Kokkos::pair{size_t(0), size});
136 template <
typename ExecSpace>
137 auto get_aggBuf(
size_t size) {
139 if constexpr (std::is_same_v<ExecSpace, Kokkos::DefaultExecutionSpace>) {
140 if (aggBufDev.extent(0) < size) {
141 Kokkos::resize(Kokkos::WithoutInitializing, aggBufDev, size);
142 aggBufHostSize_ = size;
146 return Kokkos::subview(aggBufDev, Kokkos::pair{size_t(0), size});
148 if (aggBufHost.extent(0) < size) {
149 Kokkos::resize(Kokkos::WithoutInitializing, aggBufHost, size);
150 aggBufHostSize_ = size;
154 return Kokkos::subview(aggBufHost, Kokkos::pair{size_t(0), size});
158 template <
typename ExecSpace>
159 auto get_args(
size_t size) {
161 if constexpr (std::is_same_v<ExecSpace, Kokkos::DefaultExecutionSpace>) {
162 if (argsDev.extent(0) < size) {
163 Kokkos::resize(Kokkos::WithoutInitializing, argsDev, size);
164 argsHostSize_ = size;
168 return Kokkos::subview(argsDev, Kokkos::pair{size_t(0), size});
170 if (argsHost.extent(0) < size) {
171 Kokkos::resize(Kokkos::WithoutInitializing, argsHost, size);
172 argsHostSize_ = size;
176 return Kokkos::subview(argsHost, Kokkos::pair{size_t(0), size});
181 Ialltofewv::Cache::Cache() =
default;
182 Ialltofewv::Cache::~Cache() =
default;
185 template <
typename RecvExecSpace>
186 int wait_impl(Ialltofewv::Req &req, Ialltofewv::Cache &cache) {
188 req.completed =
true;
192 if (0 == req.nroots) {
196 ProfilingRegion pr(
"alltofewv::wait");
200 cache.pimpl = std::make_shared<Ialltofewv::Cache::impl>();
203 const int rank = [&]() ->
int {
205 MPI_Comm_rank(req.comm, &_rank);
209 const int size = [&]() ->
int {
211 MPI_Comm_size(req.comm, &_size);
215 const size_t sendSize = [&]() ->
size_t {
217 MPI_Type_size(req.sendtype, &_size);
221 const size_t recvSize = [&]() ->
size_t {
223 MPI_Type_size(req.recvtype, &_size);
228 const bool isRoot = std::find(req.roots, req.roots + req.nroots, rank) != req.roots + req.nroots;
230 const int AGG_TAG = req.tag + 0;
231 const int ROOT_TAG = req.tag + 1;
240 const int naggs = std::sqrt(
size_t(size) *
size_t(req.nroots)) + 0.5;
243 const int srcsPerAgg = (size + naggs - 1) / naggs;
246 const int myAgg = rank / srcsPerAgg * srcsPerAgg;
250 std::vector<int> groupSendCounts(
size_t(req.nroots) *
size_t(srcsPerAgg));
251 std::vector<MPI_Request> reqs;
253 reqs.reserve(srcsPerAgg);
255 for (
int si = 0; si < srcsPerAgg && si + rank < size; ++si) {
259 if (
size_t(si) * req.nroots + req.nroots > groupSendCounts.size()) {
260 std::stringstream ss;
261 ss << __FILE__ <<
":" << __LINE__
262 <<
" [" << rank <<
"] tpetra internal Ialltofewv error: OOB access in recv buffer\n";
263 std::cerr << ss.str();
266 MPI_Irecv(&groupSendCounts[
size_t(si) *
size_t(req.nroots)], req.nroots, MPI_INT, si + rank,
267 req.tag, req.comm, &rreq);
268 reqs.push_back(rreq);
272 MPI_Send(req.sendcounts, req.nroots, MPI_INT, myAgg, req.tag, req.comm);
274 MPI_Waitall(reqs.size(), reqs.data(), MPI_STATUSES_IGNORE);
282 auto aggBuf = cache.pimpl->get_aggBuf<RecvExecSpace>(0);
283 std::vector<size_t> rootCount(req.nroots, 0);
286 for (
int si = 0; si < srcsPerAgg && si + rank < size; ++si) {
287 for (
int ri = 0; ri < req.nroots; ++ri) {
288 int count = groupSendCounts[si * req.nroots + ri];
289 rootCount[ri] += count;
290 aggBytes += count * sendSize;
294 aggBuf = cache.pimpl->get_aggBuf<RecvExecSpace>(aggBytes);
302 reqs.reserve(srcsPerAgg + req.nroots);
307 for (
int ri = 0; ri < req.nroots; ++ri) {
308 for (
int si = 0; si < srcsPerAgg && si + rank < size; ++si) {
310 const int count = groupSendCounts[si * req.nroots + ri];
313 if (displ + count * sendSize > aggBuf.size()) {
314 std::stringstream ss;
315 ss << __FILE__ <<
":" << __LINE__
316 <<
" [" << rank <<
"] tpetra internal Ialltofewv error: OOB access in send buffer\n";
317 std::cerr << ss.str();
322 MPI_Irecv(aggBuf.data() + displ, count, req.sendtype, si + rank, req.tag, req.comm, &rreq);
323 reqs.push_back(rreq);
324 displ += size_t(count) * sendSize;
329 reqs.reserve(req.nroots);
333 for (
int ri = 0; ri < req.nroots; ++ri) {
334 const size_t displ = size_t(req.sdispls[ri]) * sendSize;
335 const int count = req.sendcounts[ri];
338 MPI_Isend(&reinterpret_cast<const char *>(req.sendbuf)[displ], req.sendcounts[ri],
339 req.sendtype, myAgg, AGG_TAG, req.comm, &sreq);
340 reqs.push_back(sreq);
344 MPI_Waitall(reqs.size(), reqs.data(), MPI_STATUSES_IGNORE);
349 auto rootBuf = cache.pimpl->get_rootBuf<RecvExecSpace>(0);
353 const size_t totalRecvd = recvSize * [&]() ->
size_t {
355 for (
int i = 0; i < size; ++i) {
356 acc += req.recvcounts[i];
360 rootBuf = cache.pimpl->get_rootBuf<RecvExecSpace>(totalRecvd);
366 for (
int aggSrc = 0; aggSrc < size; aggSrc += srcsPerAgg) {
369 for (
int origSrc = aggSrc;
370 origSrc < aggSrc + srcsPerAgg && origSrc < size; ++origSrc) {
371 count += req.recvcounts[origSrc];
376 MPI_Irecv(rootBuf.data() + displ, count, req.recvtype, aggSrc, ROOT_TAG, req.comm, &rreq);
377 reqs.push_back(rreq);
378 displ += size_t(count) * recvSize;
388 for (
int ri = 0; ri < req.nroots; ++ri) {
389 const size_t count = rootCount[ri];
392 MPI_Send(aggBuf.data() + displ, count, req.sendtype, req.roots[ri], ROOT_TAG, req.comm);
393 displ += count * sendSize;
398 MPI_Waitall(reqs.size(), reqs.data(), MPI_STATUSES_IGNORE);
403 auto args = cache.pimpl->get_args<RecvExecSpace>(size);
404 auto args_h = Kokkos::create_mirror_view(Kokkos::WithoutInitializing, args);
407 for (
int sRank = 0; sRank < size; ++sRank) {
408 const size_t dstOff = req.rdispls[sRank] * recvSize;
410 void *dst = &
reinterpret_cast<char *
>(req.recvbuf)[dstOff];
411 void *
const src = rootBuf.data() + srcOff;
412 const size_t count = req.recvcounts[sRank] * recvSize;
413 args_h(sRank) = MemcpyArg{dst, src, count};
416 if (srcOff + count > rootBuf.extent(0)) {
417 std::stringstream ss;
418 ss << __FILE__ <<
":" << __LINE__ <<
" Tpetra internal Ialltofewv error: src access OOB in memcpy\n";
419 std::cerr << ss.str();
427 using Policy = Kokkos::TeamPolicy<RecvExecSpace>;
428 Policy policy(size, Kokkos::AUTO);
429 Kokkos::parallel_for(
430 "Tpetra::Details::Ialltofewv: apply rdispls to contiguous root buffer", policy,
431 KOKKOS_LAMBDA(
typename Policy::member_type member) {
432 team_memcpy(member, args(member.league_rank()));
434 Kokkos::fence(
"Tpetra::Details::Ialltofewv: after apply rdispls to contiguous root buffer");
441 int Ialltofewv::wait(Req &req) {
443 return wait_impl<Kokkos::DefaultExecutionSpace>(req, cache_);
445 return wait_impl<Kokkos::DefaultHostExecutionSpace>(req, cache_);
449 int Ialltofewv::get_status(
const Req &req,
int *flag, MPI_Status * )
const {
450 *flag = req.completed;
void deep_copy(MultiVector< DS, DL, DG, DN > &dst, const MultiVector< SS, SL, SG, SN > &src)
Copy the contents of the MultiVector src into dst.
void finalize()
Finalize Tpetra.