10 #ifndef _fei_CommUtils_hpp_
11 #define _fei_CommUtils_hpp_
13 #include <fei_macros.hpp>
15 #include <fei_mpiTraits.hpp>
16 #include <fei_chk_mpi.hpp>
17 #include <fei_iostream.hpp>
18 #include <fei_CommMap.hpp>
19 #include <fei_TemplateUtils.hpp>
20 #include <snl_fei_RaggedTable.hpp>
26 #include <fei_ErrMacros.hpp>
28 #define fei_file "fei_CommUtils.hpp"
42 void Barrier(MPI_Comm comm);
49 int mirrorProcs(MPI_Comm comm, std::vector<int>& toProcs, std::vector<int>& fromProcs);
61 int Allreduce(MPI_Comm comm,
bool localBool,
bool& globalBool);
79 const std::vector<int>& sendProcs,
80 std::vector<int>& sendData,
81 const std::vector<int>& recvProcs,
82 std::vector<int>& recvData);
91 int GlobalMax(MPI_Comm comm, std::vector<T>& local, std::vector<T>& global)
97 MPI_Datatype mpi_dtype = fei::mpiTraits<T>::mpi_type();
100 global.resize(local.size());
102 catch(std::runtime_error& exc) {
107 CHK_MPI( MPI_Allreduce(&(local[0]), &(global[0]),
108 local.size(), mpi_dtype, MPI_MAX, comm) );
121 int GlobalMax(MPI_Comm comm, T local, T& global)
126 MPI_Datatype mpi_dtype = fei::mpiTraits<T>::mpi_type();
128 CHK_MPI( MPI_Allreduce(&local, &global, 1, mpi_dtype, MPI_MAX, comm) );
140 int GlobalMin(MPI_Comm comm, std::vector<T>& local, std::vector<T>& global)
146 MPI_Datatype mpi_dtype = fei::mpiTraits<T>::mpi_type();
149 global.resize(local.size());
151 catch(std::runtime_error& exc) {
156 CHK_MPI( MPI_Allreduce(&(local[0]), &(global[0]),
157 local.size(), mpi_dtype, MPI_MIN, comm) );
170 int GlobalMin(MPI_Comm comm, T local, T& global)
175 MPI_Datatype mpi_dtype = fei::mpiTraits<T>::mpi_type();
177 CHK_MPI( MPI_Allreduce(&local, &global, 1, mpi_dtype, MPI_MIN, comm) );
189 int GlobalSum(MPI_Comm comm, std::vector<T>& local, std::vector<T>& global)
194 global.resize(local.size());
196 MPI_Datatype mpi_dtype = fei::mpiTraits<T>::mpi_type();
198 CHK_MPI( MPI_Allreduce(&(local[0]), &(global[0]),
199 local.size(), mpi_dtype, MPI_SUM, comm) );
207 int GlobalSum(MPI_Comm comm, T local, T& global)
212 MPI_Datatype mpi_dtype = fei::mpiTraits<T>::mpi_type();
214 CHK_MPI( MPI_Allreduce(&local, &global, 1, mpi_dtype, MPI_SUM, comm) );
224 std::vector<T>& sendbuf,
225 std::vector<int>& recvLengths,
226 std::vector<T>& recvbuf)
232 recvLengths.resize(1);
233 recvLengths[0] = sendbuf.size();
236 MPI_Comm_size(comm, &numProcs);
240 MPI_Datatype mpi_dtype = fei::mpiTraits<T>::mpi_type();
242 std::vector<int> tmpInt(numProcs, 0);
244 int len = sendbuf.size();
245 int* tmpBuf = &tmpInt[0];
247 recvLengths.resize(numProcs);
248 int* recvLenPtr = &recvLengths[0];
250 CHK_MPI( MPI_Allgather(&len, 1, MPI_INT, recvLenPtr, 1, MPI_INT, comm) );
255 displ += recvLenPtr[i];
263 recvbuf.resize(displ);
265 T* sendbufPtr = sendbuf.size()>0 ? &sendbuf[0] : NULL;
267 CHK_MPI( MPI_Allgatherv(sendbufPtr, len, mpi_dtype,
268 &recvbuf[0], &recvLengths[0], tmpBuf,
272 catch(std::runtime_error& exc) {
283 int Bcast(MPI_Comm comm, std::vector<T>& sendbuf,
int sourceProc)
286 MPI_Datatype mpi_dtype = fei::mpiTraits<T>::mpi_type();
288 CHK_MPI(MPI_Bcast(&sendbuf[0], sendbuf.size(), mpi_dtype,
304 const typename CommMap<T>::Type& sendCommMap,
305 typename CommMap<T>::Type& recvCommMap,
306 bool recvProcsKnownOnEntry =
false,
307 bool recvLengthsKnownOnEntry =
false)
309 if (!recvProcsKnownOnEntry) {
315 MPI_Datatype mpi_dtype = fei::mpiTraits<T>::mpi_type();
317 std::vector<int> sendProcs;
319 std::vector<int> recvProcs;
321 if (recvProcsKnownOnEntry) {
326 for(
size_t i=0; i<recvProcs.size(); ++i) {
327 addItemsToCommMap<T>(recvProcs[i], 0, NULL, recvCommMap);
331 if (!recvLengthsKnownOnEntry) {
332 std::vector<int> tmpIntData(sendProcs.size());
333 std::vector<int> recvLengths(recvProcs.size());
336 s_iter = sendCommMap.begin(), s_end = sendCommMap.end();
338 for(
size_t i=0; s_iter != s_end; ++s_iter, ++i) {
339 tmpIntData[i] = s_iter->second.size();
342 if (
exchangeIntData(comm, sendProcs, tmpIntData, recvProcs, recvLengths) != 0) {
345 for(
size_t i=0; i<recvProcs.size(); ++i) {
346 std::vector<T>& rdata = recvCommMap[recvProcs[i]];
347 rdata.resize(recvLengths[i]);
352 std::vector<MPI_Request> mpiReqs;
353 mpiReqs.resize(recvProcs.size());
356 r_iter = recvCommMap.begin(), r_end = recvCommMap.end();
358 size_t req_offset = 0;
359 for(; r_iter != r_end; ++r_iter) {
360 int rproc = r_iter->first;
361 std::vector<T>& recv_vec = r_iter->second;
362 int len = recv_vec.size();
363 T* recv_buf = len > 0 ? &recv_vec[0] : NULL;
365 CHK_MPI( MPI_Irecv(recv_buf, len, mpi_dtype, rproc,
366 tag, comm, &mpiReqs[req_offset++]) );
372 s_iter = sendCommMap.begin(), s_end = sendCommMap.end();
374 for(; s_iter != s_end; ++s_iter) {
375 int sproc = s_iter->first;
376 const std::vector<T>& send_vec = s_iter->second;
377 int len = send_vec.size();
378 T* send_buf = len>0 ?
const_cast<T*
>(&send_vec[0]) : NULL;
380 CHK_MPI( MPI_Send(send_buf, len, mpi_dtype, sproc, tag, comm) );
384 for(
size_t i=0; i<mpiReqs.size(); ++i) {
387 CHK_MPI( MPI_Waitany(mpiReqs.size(), &mpiReqs[0], &index, &status) );
397 int exchangeData(MPI_Comm comm,
398 std::vector<int>& sendProcs,
399 std::vector<std::vector<T> >& sendData,
400 std::vector<int>& recvProcs,
401 bool recvDataLengthsKnownOnEntry,
402 std::vector<std::vector<T> >& recvData)
404 if (sendProcs.size() == 0 && recvProcs.size() == 0)
return(0);
405 if (sendProcs.size() != sendData.size())
return(-1);
407 std::vector<MPI_Request> mpiReqs;
408 mpiReqs.resize(recvProcs.size());
411 MPI_Datatype mpi_dtype = fei::mpiTraits<T>::mpi_type();
413 if (!recvDataLengthsKnownOnEntry) {
414 std::vector<int> tmpIntData(sendData.size());
415 std::vector<int> recvLengths(recvProcs.size());
416 for(
unsigned i=0; i<sendData.size(); ++i) {
417 tmpIntData[i] = sendData[i].size();
420 if (
exchangeIntData(comm, sendProcs, tmpIntData, recvProcs, recvLengths) != 0) {
423 for(
unsigned i=0; i<recvProcs.size(); ++i) {
424 recvData[i].resize(recvLengths[i]);
430 size_t numRecvProcs = recvProcs.size();
433 for(
size_t i=0; i<recvProcs.size(); ++i) {
434 if (recvProcs[i] == localProc) {--numRecvProcs;
continue; }
436 int len = recvData[i].size();
437 std::vector<T>& recv_vec = recvData[i];
438 T* recv_buf = len > 0 ? &recv_vec[0] : NULL;
440 CHK_MPI( MPI_Irecv(recv_buf, len, mpi_dtype, recvProcs[i],
441 tag, comm, &mpiReqs[req_offset++]) );
446 for(
size_t i=0; i<sendProcs.size(); ++i) {
447 if (sendProcs[i] == localProc)
continue;
449 std::vector<T>& send_buf = sendData[i];
450 CHK_MPI( MPI_Send(&send_buf[0], sendData[i].size(), mpi_dtype,
451 sendProcs[i], tag, comm) );
455 for(
size_t i=0; i<numRecvProcs; ++i) {
456 if (recvProcs[i] == localProc)
continue;
459 CHK_MPI( MPI_Waitany(numRecvProcs, &mpiReqs[0], &index, &status) );
468 int exchangeData(MPI_Comm comm,
469 std::vector<int>& sendProcs,
470 std::vector<std::vector<T>*>& sendData,
471 std::vector<int>& recvProcs,
472 bool recvLengthsKnownOnEntry,
473 std::vector<std::vector<T>*>& recvData)
475 if (sendProcs.size() == 0 && recvProcs.size() == 0)
return(0);
476 if (sendProcs.size() != sendData.size())
return(-1);
479 MPI_Datatype mpi_dtype = fei::mpiTraits<T>::mpi_type();
480 std::vector<MPI_Request> mpiReqs;
483 mpiReqs.resize(recvProcs.size());
485 if (!recvLengthsKnownOnEntry) {
486 std::vector<int> tmpIntData;
487 tmpIntData.resize(sendData.size());
488 std::vector<int> recvLens(sendData.size());
489 for(
unsigned i=0; i<sendData.size(); ++i) {
490 tmpIntData[i] = (int)sendData[i]->size();
493 if (
exchangeIntData(comm, sendProcs, tmpIntData, recvProcs, recvLens) != 0) {
497 for(
unsigned i=0; i<recvLens.size(); ++i) {
498 recvData[i]->resize(recvLens[i]);
502 catch(std::runtime_error& exc) {
509 size_t numRecvProcs = recvProcs.size();
512 for(
unsigned i=0; i<recvProcs.size(); ++i) {
513 if (recvProcs[i] == localProc) {--numRecvProcs;
continue;}
515 size_t len = recvData[i]->size();
516 std::vector<T>& rbuf = *recvData[i];
518 CHK_MPI( MPI_Irecv(&rbuf[0], (
int)len, mpi_dtype,
519 recvProcs[i], tag, comm, &mpiReqs[req_offset++]) );
524 for(
unsigned i=0; i<sendProcs.size(); ++i) {
525 if (sendProcs[i] == localProc)
continue;
527 std::vector<T>& sbuf = *sendData[i];
528 CHK_MPI( MPI_Send(&sbuf[0], (
int)sbuf.size(), mpi_dtype,
529 sendProcs[i], tag, comm) );
533 for(
unsigned i=0; i<numRecvProcs; ++i) {
534 if (recvProcs[i] == localProc)
continue;
537 CHK_MPI( MPI_Waitany((
int)numRecvProcs, &mpiReqs[0], &index, &status) );
564 class MessageHandler {
566 virtual ~MessageHandler(){}
586 virtual int getSendMessage(
int destProc, std::vector<T>& message) = 0;
597 int exchange(MPI_Comm comm, MessageHandler<T>* msgHandler)
603 if (numProcs < 2)
return(0);
605 std::vector<int>& sendProcs = msgHandler->getSendProcs();
606 int numSendProcs = sendProcs.size();
607 std::vector<int>& recvProcs = msgHandler->getRecvProcs();
608 int numRecvProcs = recvProcs.size();
611 if (numSendProcs < 1 && numRecvProcs < 1) {
615 std::vector<int> sendMsgLengths(numSendProcs);
617 for(i=0; i<numSendProcs; ++i) {
618 CHK_ERR( msgHandler->getSendMessageLength(sendProcs[i], sendMsgLengths[i]) );
621 std::vector<std::vector<T> > recvMsgs(numRecvProcs);
623 std::vector<std::vector<T> > sendMsgs(numSendProcs);
624 for(i=0; i<numSendProcs; ++i) {
625 CHK_ERR( msgHandler->getSendMessage(sendProcs[i], sendMsgs[i]) );
628 CHK_ERR( exchangeData(comm, sendProcs, sendMsgs,
629 recvProcs,
false, recvMsgs) );
631 for(i=0; i<numRecvProcs; ++i) {
632 std::vector<T>& recvMsg = recvMsgs[i];
633 CHK_ERR( msgHandler->processRecvMessage(recvProcs[i], recvMsg ) );
643 #endif // _fei_CommUtils_hpp_
virtual int getSendMessageLength(int destProc, int &messageLength)=0
int GlobalSum(MPI_Comm comm, std::vector< T > &local, std::vector< T > &global)
int Allgatherv(MPI_Comm comm, std::vector< T > &sendbuf, std::vector< int > &recvLengths, std::vector< T > &recvbuf)
int mirrorCommPattern(MPI_Comm comm, comm_map *inPattern, comm_map *&outPattern)
virtual std::vector< int > & getRecvProcs()=0
virtual int getSendMessage(int destProc, std::vector< T > &message)=0
virtual int processRecvMessage(int srcProc, std::vector< T > &message)=0
int exchangeCommMapData(MPI_Comm comm, const typename CommMap< T >::Type &sendCommMap, typename CommMap< T >::Type &recvCommMap, bool recvProcsKnownOnEntry=false, bool recvLengthsKnownOnEntry=false)
void copyKeysToVector(const MAP_TYPE &map_obj, std::vector< int > &keyvector)
int mirrorProcs(MPI_Comm comm, std::vector< int > &toProcs, std::vector< int > &fromProcs)
virtual std::vector< int > & getSendProcs()=0
int GlobalMin(MPI_Comm comm, std::vector< T > &local, std::vector< T > &global)
int Allreduce(MPI_Comm comm, bool localBool, bool &globalBool)
std::ostream & console_out()
int GlobalMax(MPI_Comm comm, std::vector< T > &local, std::vector< T > &global)
int exchangeIntData(MPI_Comm comm, const std::vector< int > &sendProcs, std::vector< int > &sendData, const std::vector< int > &recvProcs, std::vector< int > &recvData)
int localProc(MPI_Comm comm)
int numProcs(MPI_Comm comm)