FEI  Version of the Day
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Groups Pages
fei_CommUtils.hpp
1 
2 /*--------------------------------------------------------------------*/
3 /* Copyright 2007 Sandia Corporation. */
4 /* Under the terms of Contract DE-AC04-94AL85000, there is a */
5 /* non-exclusive license for use of this work by or on behalf */
6 /* of the U.S. Government. Export of this program may require */
7 /* a license from the United States Government. */
8 /*--------------------------------------------------------------------*/
9 
10 #ifndef _fei_CommUtils_hpp_
11 #define _fei_CommUtils_hpp_
12 
13 #include <fei_macros.hpp>
14 #include <fei_mpi.h>
15 #include <fei_mpiTraits.hpp>
16 #include <fei_chk_mpi.hpp>
17 #include <fei_iostream.hpp>
18 #include <fei_CommMap.hpp>
19 #include <fei_TemplateUtils.hpp>
20 #include <snl_fei_RaggedTable.hpp>
21 
22 #include <vector>
23 #include <set>
24 #include <map>
25 
26 #include <fei_ErrMacros.hpp>
27 #undef fei_file
28 #define fei_file "fei_CommUtils.hpp"
29 
30 namespace fei {
31 
35 int localProc(MPI_Comm comm);
36 
40 int numProcs(MPI_Comm comm);
41 
42 void Barrier(MPI_Comm comm);
43 
49 int mirrorProcs(MPI_Comm comm, std::vector<int>& toProcs, std::vector<int>& fromProcs);
50 
51 typedef snl_fei::RaggedTable<std::map<int,std::set<int>*>,std::set<int> > comm_map;
52 
55 int mirrorCommPattern(MPI_Comm comm, comm_map* inPattern, comm_map*& outPattern);
56 
61 int Allreduce(MPI_Comm comm, bool localBool, bool& globalBool);
62 
78 int exchangeIntData(MPI_Comm comm,
79  const std::vector<int>& sendProcs,
80  std::vector<int>& sendData,
81  const std::vector<int>& recvProcs,
82  std::vector<int>& recvData);
83 
84 //----------------------------------------------------------------------------
90 template<class T>
91 int GlobalMax(MPI_Comm comm, std::vector<T>& local, std::vector<T>& global)
92 {
93 #ifdef FEI_SER
94  global = local;
95 #else
96 
97  MPI_Datatype mpi_dtype = fei::mpiTraits<T>::mpi_type();
98 
99  try {
100  global.resize(local.size());
101  }
102  catch(std::runtime_error& exc) {
103  fei::console_out() << exc.what()<<FEI_ENDL;
104  return(-1);
105  }
106 
107  CHK_MPI( MPI_Allreduce(&(local[0]), &(global[0]),
108  local.size(), mpi_dtype, MPI_MAX, comm) );
109 #endif
110 
111  return(0);
112 }
113 
114 //----------------------------------------------------------------------------
120 template<class T>
121 int GlobalMax(MPI_Comm comm, T local, T& global)
122 {
123 #ifdef FEI_SER
124  global = local;
125 #else
126  MPI_Datatype mpi_dtype = fei::mpiTraits<T>::mpi_type();
127 
128  CHK_MPI( MPI_Allreduce(&local, &global, 1, mpi_dtype, MPI_MAX, comm) );
129 #endif
130  return(0);
131 }
132 
133 //----------------------------------------------------------------------------
139 template<class T>
140 int GlobalMin(MPI_Comm comm, std::vector<T>& local, std::vector<T>& global)
141 {
142 #ifdef FEI_SER
143  global = local;
144 #else
145 
146  MPI_Datatype mpi_dtype = fei::mpiTraits<T>::mpi_type();
147 
148  try {
149  global.resize(local.size());
150  }
151  catch(std::runtime_error& exc) {
152  fei::console_out() << exc.what()<<FEI_ENDL;
153  return(-1);
154  }
155 
156  CHK_MPI( MPI_Allreduce(&(local[0]), &(global[0]),
157  local.size(), mpi_dtype, MPI_MIN, comm) );
158 #endif
159 
160  return(0);
161 }
162 
163 //----------------------------------------------------------------------------
169 template<class T>
170 int GlobalMin(MPI_Comm comm, T local, T& global)
171 {
172 #ifdef FEI_SER
173  global = local;
174 #else
175  MPI_Datatype mpi_dtype = fei::mpiTraits<T>::mpi_type();
176 
177  CHK_MPI( MPI_Allreduce(&local, &global, 1, mpi_dtype, MPI_MIN, comm) );
178 #endif
179  return(0);
180 }
181 
182 //----------------------------------------------------------------------------
188 template<class T>
189 int GlobalSum(MPI_Comm comm, std::vector<T>& local, std::vector<T>& global)
190 {
191 #ifdef FEI_SER
192  global = local;
193 #else
194  global.resize(local.size());
195 
196  MPI_Datatype mpi_dtype = fei::mpiTraits<T>::mpi_type();
197 
198  CHK_MPI( MPI_Allreduce(&(local[0]), &(global[0]),
199  local.size(), mpi_dtype, MPI_SUM, comm) );
200 #endif
201  return(0);
202 }
203 
204 //----------------------------------------------------------------------------
206 template<class T>
207 int GlobalSum(MPI_Comm comm, T local, T& global)
208 {
209 #ifdef FEI_SER
210  global = local;
211 #else
212  MPI_Datatype mpi_dtype = fei::mpiTraits<T>::mpi_type();
213 
214  CHK_MPI( MPI_Allreduce(&local, &global, 1, mpi_dtype, MPI_SUM, comm) );
215 #endif
216  return(0);
217 }
218 
219 
222 template<class T>
223 int Allgatherv(MPI_Comm comm,
224  std::vector<T>& sendbuf,
225  std::vector<int>& recvLengths,
226  std::vector<T>& recvbuf)
227 {
228 #ifdef FEI_SER
229  //If we're in serial mode, just copy sendbuf to recvbuf and return.
230 
231  recvbuf = sendbuf;
232  recvLengths.resize(1);
233  recvLengths[0] = sendbuf.size();
234 #else
235  int numProcs = 1;
236  MPI_Comm_size(comm, &numProcs);
237 
238  try {
239 
240  MPI_Datatype mpi_dtype = fei::mpiTraits<T>::mpi_type();
241 
242  std::vector<int> tmpInt(numProcs, 0);
243 
244  int len = sendbuf.size();
245  int* tmpBuf = &tmpInt[0];
246 
247  recvLengths.resize(numProcs);
248  int* recvLenPtr = &recvLengths[0];
249 
250  CHK_MPI( MPI_Allgather(&len, 1, MPI_INT, recvLenPtr, 1, MPI_INT, comm) );
251 
252  int displ = 0;
253  for(int i=0; i<numProcs; i++) {
254  tmpBuf[i] = displ;
255  displ += recvLenPtr[i];
256  }
257 
258  if (displ == 0) {
259  recvbuf.resize(0);
260  return(0);
261  }
262 
263  recvbuf.resize(displ);
264 
265  T* sendbufPtr = sendbuf.size()>0 ? &sendbuf[0] : NULL;
266 
267  CHK_MPI( MPI_Allgatherv(sendbufPtr, len, mpi_dtype,
268  &recvbuf[0], &recvLengths[0], tmpBuf,
269  mpi_dtype, comm) );
270 
271  }
272  catch(std::runtime_error& exc) {
273  fei::console_out() << exc.what() << FEI_ENDL;
274  return(-1);
275  }
276 #endif
277 
278  return(0);
279 }
280 
281 //------------------------------------------------------------------------
282 template<class T>
283 int Bcast(MPI_Comm comm, std::vector<T>& sendbuf, int sourceProc)
284 {
285 #ifndef FEI_SER
286  MPI_Datatype mpi_dtype = fei::mpiTraits<T>::mpi_type();
287 
288  CHK_MPI(MPI_Bcast(&sendbuf[0], sendbuf.size(), mpi_dtype,
289  sourceProc, comm) );
290 #endif
291  return(0);
292 }
293 
294 //------------------------------------------------------------------------
302 template<typename T>
303 int exchangeCommMapData(MPI_Comm comm,
304  const typename CommMap<T>::Type& sendCommMap,
305  typename CommMap<T>::Type& recvCommMap,
306  bool recvProcsKnownOnEntry = false,
307  bool recvLengthsKnownOnEntry = false)
308 {
309  if (!recvProcsKnownOnEntry) {
310  recvCommMap.clear();
311  }
312 
313 #ifndef FEI_SER
314  int tag = 11120;
315  MPI_Datatype mpi_dtype = fei::mpiTraits<T>::mpi_type();
316 
317  std::vector<int> sendProcs;
318  fei::copyKeysToVector(sendCommMap, sendProcs);
319  std::vector<int> recvProcs;
320 
321  if (recvProcsKnownOnEntry) {
322  fei::copyKeysToVector(recvCommMap, recvProcs);
323  }
324  else {
325  mirrorProcs(comm, sendProcs, recvProcs);
326  for(size_t i=0; i<recvProcs.size(); ++i) {
327  addItemsToCommMap<T>(recvProcs[i], 0, NULL, recvCommMap);
328  }
329  }
330 
331  if (!recvLengthsKnownOnEntry) {
332  std::vector<int> tmpIntData(sendProcs.size());
333  std::vector<int> recvLengths(recvProcs.size());
334 
336  s_iter = sendCommMap.begin(), s_end = sendCommMap.end();
337 
338  for(size_t i=0; s_iter != s_end; ++s_iter, ++i) {
339  tmpIntData[i] = s_iter->second.size();
340  }
341 
342  if ( exchangeIntData(comm, sendProcs, tmpIntData, recvProcs, recvLengths) != 0) {
343  return(-1);
344  }
345  for(size_t i=0; i<recvProcs.size(); ++i) {
346  std::vector<T>& rdata = recvCommMap[recvProcs[i]];
347  rdata.resize(recvLengths[i]);
348  }
349  }
350 
351  //launch Irecv's for recv-data:
352  std::vector<MPI_Request> mpiReqs;
353  mpiReqs.resize(recvProcs.size());
354 
356  r_iter = recvCommMap.begin(), r_end = recvCommMap.end();
357 
358  size_t req_offset = 0;
359  for(; r_iter != r_end; ++r_iter) {
360  int rproc = r_iter->first;
361  std::vector<T>& recv_vec = r_iter->second;
362  int len = recv_vec.size();
363  T* recv_buf = len > 0 ? &recv_vec[0] : NULL;
364 
365  CHK_MPI( MPI_Irecv(recv_buf, len, mpi_dtype, rproc,
366  tag, comm, &mpiReqs[req_offset++]) );
367  }
368 
369  //send the send-data:
370 
372  s_iter = sendCommMap.begin(), s_end = sendCommMap.end();
373 
374  for(; s_iter != s_end; ++s_iter) {
375  int sproc = s_iter->first;
376  const std::vector<T>& send_vec = s_iter->second;
377  int len = send_vec.size();
378  T* send_buf = len>0 ? const_cast<T*>(&send_vec[0]) : NULL;
379 
380  CHK_MPI( MPI_Send(send_buf, len, mpi_dtype, sproc, tag, comm) );
381  }
382 
383  //complete the Irecvs:
384  for(size_t i=0; i<mpiReqs.size(); ++i) {
385  int index;
386  MPI_Status status;
387  CHK_MPI( MPI_Waitany(mpiReqs.size(), &mpiReqs[0], &index, &status) );
388  }
389 
390 #endif
391  return(0);
392 }
393 
394 
395 //------------------------------------------------------------------------
396 template<class T>
397 int exchangeData(MPI_Comm comm,
398  std::vector<int>& sendProcs,
399  std::vector<std::vector<T> >& sendData,
400  std::vector<int>& recvProcs,
401  bool recvDataLengthsKnownOnEntry,
402  std::vector<std::vector<T> >& recvData)
403 {
404  if (sendProcs.size() == 0 && recvProcs.size() == 0) return(0);
405  if (sendProcs.size() != sendData.size()) return(-1);
406 #ifndef FEI_SER
407  std::vector<MPI_Request> mpiReqs;
408  mpiReqs.resize(recvProcs.size());
409 
410  int tag = 11119;
411  MPI_Datatype mpi_dtype = fei::mpiTraits<T>::mpi_type();
412 
413  if (!recvDataLengthsKnownOnEntry) {
414  std::vector<int> tmpIntData(sendData.size());
415  std::vector<int> recvLengths(recvProcs.size());
416  for(unsigned i=0; i<sendData.size(); ++i) {
417  tmpIntData[i] = sendData[i].size();
418  }
419 
420  if ( exchangeIntData(comm, sendProcs, tmpIntData, recvProcs, recvLengths) != 0) {
421  return(-1);
422  }
423  for(unsigned i=0; i<recvProcs.size(); ++i) {
424  recvData[i].resize(recvLengths[i]);
425  }
426  }
427 
428  //launch Irecv's for recvData:
429 
430  size_t numRecvProcs = recvProcs.size();
431  int req_offset = 0;
432  int localProc = fei::localProc(comm);
433  for(size_t i=0; i<recvProcs.size(); ++i) {
434  if (recvProcs[i] == localProc) {--numRecvProcs; continue; }
435 
436  int len = recvData[i].size();
437  std::vector<T>& recv_vec = recvData[i];
438  T* recv_buf = len > 0 ? &recv_vec[0] : NULL;
439 
440  CHK_MPI( MPI_Irecv(recv_buf, len, mpi_dtype, recvProcs[i],
441  tag, comm, &mpiReqs[req_offset++]) );
442  }
443 
444  //send the sendData:
445 
446  for(size_t i=0; i<sendProcs.size(); ++i) {
447  if (sendProcs[i] == localProc) continue;
448 
449  std::vector<T>& send_buf = sendData[i];
450  CHK_MPI( MPI_Send(&send_buf[0], sendData[i].size(), mpi_dtype,
451  sendProcs[i], tag, comm) );
452  }
453 
454  //complete the Irecvs:
455  for(size_t i=0; i<numRecvProcs; ++i) {
456  if (recvProcs[i] == localProc) continue;
457  int index;
458  MPI_Status status;
459  CHK_MPI( MPI_Waitany(numRecvProcs, &mpiReqs[0], &index, &status) );
460  }
461 
462 #endif
463  return(0);
464 }
465 
466 //------------------------------------------------------------------------
467 template<class T>
468 int exchangeData(MPI_Comm comm,
469  std::vector<int>& sendProcs,
470  std::vector<std::vector<T>*>& sendData,
471  std::vector<int>& recvProcs,
472  bool recvLengthsKnownOnEntry,
473  std::vector<std::vector<T>*>& recvData)
474 {
475  if (sendProcs.size() == 0 && recvProcs.size() == 0) return(0);
476  if (sendProcs.size() != sendData.size()) return(-1);
477 #ifndef FEI_SER
478  int tag = 11115;
479  MPI_Datatype mpi_dtype = fei::mpiTraits<T>::mpi_type();
480  std::vector<MPI_Request> mpiReqs;
481 
482  try {
483  mpiReqs.resize(recvProcs.size());
484 
485  if (!recvLengthsKnownOnEntry) {
486  std::vector<int> tmpIntData;
487  tmpIntData.resize(sendData.size());
488  std::vector<int> recvLens(sendData.size());
489  for(unsigned i=0; i<sendData.size(); ++i) {
490  tmpIntData[i] = (int)sendData[i]->size();
491  }
492 
493  if (exchangeIntData(comm, sendProcs, tmpIntData, recvProcs, recvLens) != 0) {
494  return(-1);
495  }
496 
497  for(unsigned i=0; i<recvLens.size(); ++i) {
498  recvData[i]->resize(recvLens[i]);
499  }
500  }
501  }
502  catch(std::runtime_error& exc) {
503  fei::console_out() << exc.what() << FEI_ENDL;
504  return(-1);
505  }
506 
507  //launch Irecv's for recvData:
508 
509  size_t numRecvProcs = recvProcs.size();
510  int req_offset = 0;
511  int localProc = fei::localProc(comm);
512  for(unsigned i=0; i<recvProcs.size(); ++i) {
513  if (recvProcs[i] == localProc) {--numRecvProcs; continue;}
514 
515  size_t len = recvData[i]->size();
516  std::vector<T>& rbuf = *recvData[i];
517 
518  CHK_MPI( MPI_Irecv(&rbuf[0], (int)len, mpi_dtype,
519  recvProcs[i], tag, comm, &mpiReqs[req_offset++]) );
520  }
521 
522  //send the sendData:
523 
524  for(unsigned i=0; i<sendProcs.size(); ++i) {
525  if (sendProcs[i] == localProc) continue;
526 
527  std::vector<T>& sbuf = *sendData[i];
528  CHK_MPI( MPI_Send(&sbuf[0], (int)sbuf.size(), mpi_dtype,
529  sendProcs[i], tag, comm) );
530  }
531 
532  //complete the Irecv's:
533  for(unsigned i=0; i<numRecvProcs; ++i) {
534  if (recvProcs[i] == localProc) continue;
535  int index;
536  MPI_Status status;
537  CHK_MPI( MPI_Waitany((int)numRecvProcs, &mpiReqs[0], &index, &status) );
538  }
539 
540 #endif
541  return(0);
542 }
543 
544 
545 //------------------------------------------------------------------------
563 template<class T>
564 class MessageHandler {
565 public:
566  virtual ~MessageHandler(){}
567 
571  virtual std::vector<int>& getSendProcs() = 0;
572 
576  virtual std::vector<int>& getRecvProcs() = 0;
577 
581  virtual int getSendMessageLength(int destProc, int& messageLength) = 0;
582 
586  virtual int getSendMessage(int destProc, std::vector<T>& message) = 0;
587 
591  virtual int processRecvMessage(int srcProc, std::vector<T>& message) = 0;
592 };//class MessageHandler
593 
594 
595 //------------------------------------------------------------------------
596 template<class T>
597 int exchange(MPI_Comm comm, MessageHandler<T>* msgHandler)
598 {
599 #ifdef FEI_SER
600  (void)msgHandler;
601 #else
602  int numProcs = fei::numProcs(comm);
603  if (numProcs < 2) return(0);
604 
605  std::vector<int>& sendProcs = msgHandler->getSendProcs();
606  int numSendProcs = sendProcs.size();
607  std::vector<int>& recvProcs = msgHandler->getRecvProcs();
608  int numRecvProcs = recvProcs.size();
609  int i;
610 
611  if (numSendProcs < 1 && numRecvProcs < 1) {
612  return(0);
613  }
614 
615  std::vector<int> sendMsgLengths(numSendProcs);
616 
617  for(i=0; i<numSendProcs; ++i) {
618  CHK_ERR( msgHandler->getSendMessageLength(sendProcs[i], sendMsgLengths[i]) );
619  }
620 
621  std::vector<std::vector<T> > recvMsgs(numRecvProcs);
622 
623  std::vector<std::vector<T> > sendMsgs(numSendProcs);
624  for(i=0; i<numSendProcs; ++i) {
625  CHK_ERR( msgHandler->getSendMessage(sendProcs[i], sendMsgs[i]) );
626  }
627 
628  CHK_ERR( exchangeData(comm, sendProcs, sendMsgs,
629  recvProcs, false, recvMsgs) );
630 
631  for(i=0; i<numRecvProcs; ++i) {
632  std::vector<T>& recvMsg = recvMsgs[i];
633  CHK_ERR( msgHandler->processRecvMessage(recvProcs[i], recvMsg ) );
634  }
635 #endif
636 
637  return(0);
638 }
639 
640 
641 } //namespace fei
642 
643 #endif // _fei_CommUtils_hpp_
644 
virtual int getSendMessageLength(int destProc, int &messageLength)=0
int GlobalSum(MPI_Comm comm, std::vector< T > &local, std::vector< T > &global)
int Allgatherv(MPI_Comm comm, std::vector< T > &sendbuf, std::vector< int > &recvLengths, std::vector< T > &recvbuf)
int mirrorCommPattern(MPI_Comm comm, comm_map *inPattern, comm_map *&outPattern)
virtual std::vector< int > & getRecvProcs()=0
virtual int getSendMessage(int destProc, std::vector< T > &message)=0
virtual int processRecvMessage(int srcProc, std::vector< T > &message)=0
int exchangeCommMapData(MPI_Comm comm, const typename CommMap< T >::Type &sendCommMap, typename CommMap< T >::Type &recvCommMap, bool recvProcsKnownOnEntry=false, bool recvLengthsKnownOnEntry=false)
void copyKeysToVector(const MAP_TYPE &map_obj, std::vector< int > &keyvector)
int mirrorProcs(MPI_Comm comm, std::vector< int > &toProcs, std::vector< int > &fromProcs)
virtual std::vector< int > & getSendProcs()=0
int GlobalMin(MPI_Comm comm, std::vector< T > &local, std::vector< T > &global)
int Allreduce(MPI_Comm comm, bool localBool, bool &globalBool)
std::ostream & console_out()
int GlobalMax(MPI_Comm comm, std::vector< T > &local, std::vector< T > &global)
int exchangeIntData(MPI_Comm comm, const std::vector< int > &sendProcs, std::vector< int > &sendData, const std::vector< int > &recvProcs, std::vector< int > &recvData)
int localProc(MPI_Comm comm)
int numProcs(MPI_Comm comm)