Tpetra parallel linear algebra  Version of the Day
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
Tpetra_Details_iallreduce.cpp
1 // @HEADER
2 // ***********************************************************************
3 //
4 // Tpetra: Templated Linear Algebra Services Package
5 // Copyright (2008) Sandia Corporation
6 //
7 // Under the terms of Contract DE-AC04-94AL85000 with Sandia Corporation,
8 // the U.S. Government retains certain rights in this software.
9 //
10 // Redistribution and use in source and binary forms, with or without
11 // modification, are permitted provided that the following conditions are
12 // met:
13 //
14 // 1. Redistributions of source code must retain the above copyright
15 // notice, this list of conditions and the following disclaimer.
16 //
17 // 2. Redistributions in binary form must reproduce the above copyright
18 // notice, this list of conditions and the following disclaimer in the
19 // documentation and/or other materials provided with the distribution.
20 //
21 // 3. Neither the name of the Corporation nor the names of the
22 // contributors may be used to endorse or promote products derived from
23 // this software without specific prior written permission.
24 //
25 // THIS SOFTWARE IS PROVIDED BY SANDIA CORPORATION "AS IS" AND ANY
26 // EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
27 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
28 // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SANDIA CORPORATION OR THE
29 // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
30 // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
31 // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
32 // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
33 // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
34 // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
35 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
36 //
37 // Questions? Contact Michael A. Heroux (maherou@sandia.gov)
38 //
39 // ************************************************************************
40 // @HEADER
41 
43 
44 #ifdef HAVE_TPETRACORE_MPI
45 # include "Teuchos_DefaultMpiComm.hpp" // only needs to be in .cpp file
46 #endif // HAVE_TPETRACORE_MPI
47 #include "Teuchos_DefaultSerialComm.hpp" // only needs to be in .cpp file
48 
49 #ifdef HAVE_TPETRACORE_MPI
50 namespace { // (anonymous)
51 std::string getMpiErrorString (const int errCode) {
52  // Space for storing the error string returned by MPI.
53  // Leave room for null termination, since I don't know if MPI does this.
54  char errString [MPI_MAX_ERROR_STRING+1];
55  int errStringLen = MPI_MAX_ERROR_STRING; // output argument
56  (void) MPI_Error_string (errCode, errString, &errStringLen);
57  // errStringLen on output is the number of characters written.
58  // I'm not sure (the MPI 3.0 Standard doesn't say) if this
59  // includes the '\0', so I'll make sure. We reserved space for
60  // the extra '\0' if needed.
61  if (errString[errStringLen-1] != '\0') {
62  errString[errStringLen] = '\0';
63  }
64  return std::string (errString); // This copies the original string.
65 }
66 } // namespace (anonymous)
67 #endif // HAVE_TPETRACORE_MPI
68 
69 namespace Tpetra {
70 namespace Details {
71 namespace Impl {
72 
73 #ifdef HAVE_TPETRACORE_MPI
74 MpiCommRequest::
75 MpiCommRequest () :
76  req_ (MPI_REQUEST_NULL)
77 {}
78 
79 MpiCommRequest::
80 MpiCommRequest (const MPI_Request req) :
81  req_ (req)
82 {}
83 
84 void
85 MpiCommRequest::
86 waitWithStatus (MPI_Status& status)
87 {
88  if (req_ != MPI_REQUEST_NULL) {
89  MPI_Request req = req_;
90  const int err = MPI_Wait (&req, &status);
91  TEUCHOS_TEST_FOR_EXCEPTION
92  (err != MPI_SUCCESS, std::runtime_error,
93  "MpiCommRequest::waitWithStatus: MPI_Wait failed with error \""
94  << getMpiErrorString (err));
95  // MPI_Wait should set the MPI_Request to MPI_REQUEST_NULL on
96  // success. We'll do it here just to be conservative.
97  req_ = MPI_REQUEST_NULL;
98  }
99 }
100 
101 void
102 MpiCommRequest::
103 wait ()
104 {
105  if (req_ != MPI_REQUEST_NULL) {
106  MPI_Request req = req_;
107  const int err = MPI_Wait (&req, MPI_STATUS_IGNORE);
108  TEUCHOS_TEST_FOR_EXCEPTION
109  (err != MPI_SUCCESS, std::runtime_error,
110  "MpiCommRequest::wait: MPI_Wait failed with error \""
111  << getMpiErrorString (err));
112  // MPI_Wait should set the MPI_Request to MPI_REQUEST_NULL on
113  // success. We'll do it here just to be conservative.
114  req_ = MPI_REQUEST_NULL;
115  }
116 }
117 
118 void
119 MpiCommRequest::
120 cancel ()
121 {
122  if (req_ != MPI_REQUEST_NULL) {
123  const int err = MPI_Cancel (&req_);
124  TEUCHOS_TEST_FOR_EXCEPTION
125  (err != MPI_SUCCESS, std::runtime_error,
126  "MpiCommRequest::cancel: MPI_Cancel failed with the following error: "
127  << getMpiErrorString (err));
128 
129  // Wait on the request. MPI requires doing this after cancel, and
130  // promises that a wait after cancel is a local operation.
131  this->wait ();
132 
133  // The returned status may still be useful; for example, one may
134  // call MPI_Test_cancelled to test an MPI_Status from a
135  // nonblocking send. For now, we'll ignore it.
136  }
137 }
138 
139 MpiCommRequest::
140 ~MpiCommRequest ()
141 {
142  if (req_ != MPI_REQUEST_NULL) {
143  // We're in a destructor, so don't throw errors. However, if
144  // MPI_Cancel fails, it's probably a bad idea to call MPI_Wait.
145  const int err = MPI_Cancel (&req_);
146  if (err == MPI_SUCCESS) {
147  // The MPI_Cancel succeeded. Now wait on the request. Ignore
148  // any reported error, since we can't do anything about those in
149  // the destructor (other than kill the program). If successful,
150  // MPI_Wait will set the MPI_Request to MPI_REQUEST_NULL. We
151  // ignore the returned MPI_Status, since if users let the
152  // request fall out of scope, then they must not care about the
153  // status.
154  //
155  // mfh 21 Oct 2012: The MPI standard requires completing a
156  // canceled request by calling a function like MPI_Wait,
157  // MPI_Test, or MPI_Request_free. MPI_Wait on a canceled
158  // request behaves like a local operation (it does not
159  // communicate or block waiting for communication). One could
160  // also call MPI_Request_free instead of MPI_Wait, but
161  // MPI_Request_free is intended more for persistent requests
162  // (created with functions like MPI_Recv_init).
163  (void) MPI_Wait (&req_, MPI_STATUS_IGNORE);
164  }
165  }
166 }
167 
168 #endif // HAVE_TPETRACORE_MPI
169 
170 std::shared_ptr<CommRequest>
171 emptyCommRequest ()
172 {
173  return std::shared_ptr<CommRequest> (new DeferredActionCommRequest ());
174 }
175 
176 DeferredActionCommRequest::
177 DeferredActionCommRequest () :
178  action_ ([] () {}), // do nothing
179  actionTaken_ (false)
180 {}
181 
183 DeferredActionCommRequest (std::function<void () > action) :
184  action_ (action),
185  actionTaken_ (false)
186 {}
187 
188 void
191 {
192  if (! actionTaken_) {
193  action_ ();
194  actionTaken_ = true;
195  }
196 }
197 
198 void
201 {
202  actionTaken_ = true;
203 }
204 
205 #ifdef HAVE_TPETRACORE_MPI
206 
207 std::shared_ptr<CommRequest>
208 iallreduceRawVoid (const void* sendbuf,
209  void* recvbuf,
210  const int count,
211  MPI_Datatype mpiDatatype,
212  const bool mpiDatatypeNeedsFree,
213  const Teuchos::EReductionType op,
214  MPI_Comm comm)
215 {
216  MPI_Op rawOp = ::Teuchos::Details::getMpiOpForEReductionType (op);
217 
218 #if MPI_VERSION >= 3
219  const bool useMpi3 = true;
220 #else
221  const bool useMpi3 = false;
222 #endif // MPI_VERSION >= 3
223 
224  // Fix for #852: always build the fall-back (MPI_VERSION < 3)
225  // implementation.
226  if (useMpi3) {
227 #if MPI_VERSION >= 3
228  MPI_Request rawRequest = MPI_REQUEST_NULL;
229  int err = MPI_SUCCESS;
230  if (sendbuf == recvbuf) {
231  // Fix for #850. This only works if rawComm is an
232  // intracommunicator. Intercommunicators don't have an in-place
233  // option for collectives.
234  err = MPI_Iallreduce (MPI_IN_PLACE, recvbuf, count, mpiDatatype,
235  rawOp, comm, &rawRequest);
236  }
237  else {
238  err = MPI_Iallreduce (sendbuf, recvbuf, count, mpiDatatype,
239  rawOp, comm, &rawRequest);
240  }
241  TEUCHOS_TEST_FOR_EXCEPTION
242  (err != MPI_SUCCESS, std::runtime_error,
243  "MPI_Iallreduce failed with the following error: "
244  << getMpiErrorString (err));
245  if (mpiDatatypeNeedsFree) {
246  // As long as the MPI_Datatype goes into MPI_Iallreduce, it's OK
247  // to free it, even if the MPI_Iallreduce has not yet completed.
248  // There's no sense in checking the error code here.
249  (void) MPI_Type_free (&mpiDatatype);
250  }
251  return std::shared_ptr<CommRequest> (new MpiCommRequest (rawRequest));
252 #else
253  TEUCHOS_TEST_FOR_EXCEPTION
254  (true, std::logic_error, "Should never get here. "
255  "Please report this bug to the Tpetra developers.");
256 #endif // MPI_VERSION >= 3
257  }
258  else { // ! useMpi3
259  // We don't have MPI_Iallreduce. The next best thing is to defer an
260  // MPI_Allreduce call until wait. We do this by returning a
261  // "DeferredActionCommRequest," which is just a wrapped
262  // std::function.
263  //
264  // NOTE (mfh 12 Nov 2016, 14 Nov 2016) One issue with this approach
265  // is that we have to make sure that the MPI_Datatype won't go away
266  // before the MPI_Allreduce gets called. We handle this for now by
267  // calling MPI_Type_dup and stashing the destructor in the request.
268  // (Don't use the MPI_COMM_SELF trick here, unless you first check
269  // whether you've seen that MPI_Datatype before -- otherwise you'll
270  // get memory growth linear in the number of iallreduce calls.)
271  return std::shared_ptr<CommRequest> (new DeferredActionCommRequest ([=] () {
272  // It could be that this action got deferred beyond
273  // MPI_Finalize. In that case, do nothing.
274  int mpiInitialized = 0;
275  (void) MPI_Initialized (&mpiInitialized);
276  int mpiFinalized = 0;
277  (void) MPI_Finalized (&mpiFinalized);
278  if (mpiFinalized == 0 && mpiInitialized != 0) {
279  // FIXME (mfh 14 Nov 2016) Unfortunately, there is no
280  // MPI_Op_dup, so I can't guarantee that the input MPI_Op
281  // will still exist to the point where it is actually
282  // used.
283  //
284  // FIXME (mfh 14 Nov 2016) Best practice would be to
285  // duplicate the input MPI_Comm, so that we can ensure its
286  // survival to this point. However, we can't guarantee
287  // survival of the input MPI_Op, so we might as well just
288  // not bother.
289  if (mpiDatatypeNeedsFree) {
290  // Copy the original MPI_Datatype, so that we can safely
291  // defer this call past survival of the original.
292  MPI_Datatype dupDatatype;
293  (void) MPI_Type_dup (mpiDatatype, &dupDatatype);
294 #if MPI_VERSION >= 3
295  if (sendbuf == recvbuf) {
296  (void) MPI_Allreduce (MPI_IN_PLACE, recvbuf, count, dupDatatype,
297  rawOp, comm);
298  }
299  else {
300  (void) MPI_Allreduce (sendbuf, recvbuf, count, dupDatatype,
301  rawOp, comm);
302  }
303 #else // MPI_VERSION < 3
304  if (sendbuf == recvbuf) {
305  (void) MPI_Allreduce (MPI_IN_PLACE, recvbuf,
306  count, dupDatatype, rawOp, comm);
307  }
308  else {
309  // OpenMPI 1.6.5 insists on void*, not const void*, for sendbuf.
310  (void) MPI_Allreduce (const_cast<void*> (sendbuf), recvbuf,
311  count, dupDatatype, rawOp, comm);
312  }
313 #endif // MPI_VERSION >= 3
314  (void) MPI_Type_free (&dupDatatype);
315  }
316  else {
317 #if MPI_VERSION >= 3
318  if (sendbuf == recvbuf) {
319  (void) MPI_Allreduce (MPI_IN_PLACE, recvbuf, count, mpiDatatype,
320  rawOp, comm);
321  }
322  else {
323  (void) MPI_Allreduce (sendbuf, recvbuf, count, mpiDatatype,
324  rawOp, comm);
325  }
326 #else // MPI_VERSION < 3
327  if (sendbuf == recvbuf) {
328  (void) MPI_Allreduce (MPI_IN_PLACE, recvbuf,
329  count, mpiDatatype, rawOp, comm);
330  }
331  else {
332  // OpenMPI 1.6.5 insists on void*, not const void*, for sendbuf.
333  (void) MPI_Allreduce (const_cast<void*> (sendbuf), recvbuf,
334  count, mpiDatatype, rawOp, comm);
335  }
336 #endif // MPI_VERSION >= 3
337  }
338  }
339  }));
340  } // useMpi3
341 }
342 
343 #endif // HAVE_TPETRACORE_MPI
344 
345 } // namespace Impl
346 
347 std::shared_ptr<CommRequest>
348 iallreduce (const int localValue,
349  int& globalValue,
350  const ::Teuchos::EReductionType op,
351  const ::Teuchos::Comm<int>& comm)
352 {
353  using input_view_type =
354  Kokkos::View<const int*, Kokkos::HostSpace,
355  Kokkos::MemoryTraits<Kokkos::Unmanaged>>;
356  using output_view_type =
357  Kokkos::View<int*, Kokkos::HostSpace,
358  Kokkos::MemoryTraits<Kokkos::Unmanaged>>;
359 
360  input_view_type localView (&localValue, 1);
361  output_view_type globalView (&globalValue, 1);
362  return ::Tpetra::Details::iallreduce (localView, globalView, op, comm);
363 }
364 
365 } // namespace Details
366 } // namespace Tpetra
367 
368 
void wait()
Wait on this communication request to complete.
DeferredActionCommRequest()
Default constructor (take no action on wait).
void cancel()
Cancel the pending communication request, without taking the specified action.
Declaration of Tpetra::Details::iallreduce.
std::shared_ptr< CommRequest > iallreduce(const InputViewType &sendbuf, const OutputViewType &recvbuf, const ::Teuchos::EReductionType op, const ::Teuchos::Comm< int > &comm)
Nonblocking all-reduce, for either rank-1 or rank-0 Kokkos::View objects.