Tpetra parallel linear algebra  Version of the Day
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
Tpetra_Details_ReadTriples.hpp
Go to the documentation of this file.
1 // @HEADER
2 // *****************************************************************************
3 // Tpetra: Templated Linear Algebra Services Package
4 //
5 // Copyright 2008 NTESS and the Tpetra contributors.
6 // SPDX-License-Identifier: BSD-3-Clause
7 // *****************************************************************************
8 // @HEADER
9 
10 #ifndef TPETRA_DETAILS_READTRIPLES_HPP
11 #define TPETRA_DETAILS_READTRIPLES_HPP
12 
21 
22 #include "TpetraCore_config.h"
23 #include "Tpetra_Details_PackTriples.hpp"
24 #if KOKKOS_VERSION >= 40799
25 #include "KokkosKernels_ArithTraits.hpp"
26 #else
27 #include "Kokkos_ArithTraits.hpp"
28 #endif
29 #include "Teuchos_MatrixMarket_generic.hpp"
30 #include "Teuchos_CommHelpers.hpp"
31 #include <iostream>
32 #include <typeinfo> // for debugging
33 
34 namespace Tpetra {
35 namespace Details {
36 
37 //
38 // Search for "SKIP DOWN TO HERE" (omit quotes) for the "public"
39 // interface. I put "public" in quotes because it's public only for
40 // Tpetra developers, NOT for Tpetra users.
41 //
42 
43 namespace Impl {
44 
45 // mfh 01 Feb 2017: Unfortunately,
46 // Teuchos::MatrixMarket::readComplexData requires Teuchos to have
47 // complex arithmetic support enabled. To avoid this issue, I
48 // reimplement the function here. It's not very long.
49 
82 template <class OrdinalType, class RealType>
83 bool readComplexData(std::istream& istr,
84  OrdinalType& rowIndex,
85  OrdinalType& colIndex,
86  RealType& realPart,
87  RealType& imagPart,
88  const std::size_t lineNumber,
89  const bool tolerant) {
90  using ::Teuchos::MatrixMarket::readRealData;
91 
92  RealType the_realPart, the_imagPart;
93  if (!readRealData(istr, rowIndex, colIndex, the_realPart, lineNumber, tolerant)) {
94  if (tolerant) {
95  return false;
96  } else {
97  std::ostringstream os;
98  os << "Failed to read pattern data and/or real value from line "
99  << lineNumber << " of input";
100  throw std::invalid_argument(os.str());
101  }
102  }
103  if (istr.eof()) {
104  if (tolerant) {
105  return false;
106  } else {
107  std::ostringstream os;
108  os << "No more data after real value on line "
109  << lineNumber << " of input";
110  throw std::invalid_argument(os.str());
111  }
112  }
113  istr >> the_imagPart;
114  if (istr.fail()) {
115  if (tolerant) {
116  return false;
117  } else {
118  std::ostringstream os;
119  os << "Failed to get imaginary value from line "
120  << lineNumber << " of input";
121  throw std::invalid_argument(os.str());
122  }
123  }
124  realPart = the_realPart;
125  imagPart = the_imagPart;
126  return true;
127 }
128 
138 template <class SC,
139  class GO,
140 #if KOKKOS_VERSION >= 40799
141  const bool isComplex = ::KokkosKernels::ArithTraits<SC>::is_complex>
142 #else
143  const bool isComplex = ::Kokkos::ArithTraits<SC>::is_complex>
144 #endif
145 struct ReadLine {
166  static int
167  readLine(std::function<int(const GO, const GO, const SC&)> processTriple,
168  const std::string& line,
169  const std::size_t lineNumber,
170  const bool tolerant = false,
171  std::ostream* errStrm = NULL,
172  const bool debug = false);
173 };
174 
182 template <class SC, class GO>
183 struct ReadLine<SC, GO, true> {
204  static int
205  readLine(std::function<int(const GO, const GO, const SC&)> processTriple,
206  const std::string& line,
207  const std::size_t lineNumber,
208  const bool tolerant = false,
209  std::ostream* errStrm = NULL,
210  const bool debug = false) {
211  using ::Teuchos::MatrixMarket::checkCommentLine;
212 #if KOKKOS_VERSION >= 40799
213  typedef typename ::KokkosKernels::ArithTraits<SC>::mag_type real_type;
214 #else
215  typedef typename ::Kokkos::ArithTraits<SC>::mag_type real_type;
216 #endif
217  using std::endl;
218 
219  GO rowInd, colInd;
220  real_type realPart, imagPart;
221  std::istringstream istr(line);
222  bool success = true;
223  try {
224  // Use the version of this function in this file, not the
225  // version in Teuchos_MatrixMarket_generic.hpp, because the
226  // latter only exists if HAVE_TEUCHOS_COMPLEX is defined.
227  success = readComplexData(istr, rowInd, colInd, realPart, imagPart,
228  lineNumber, tolerant);
229  } catch (std::exception& e) {
230  success = false;
231  if (errStrm != NULL) {
232  std::ostringstream os;
233  os << "readLine: readComplexData threw an exception: " << e.what()
234  << endl;
235  *errStrm << os.str();
236  }
237  }
238 
239  if (success) {
240  // if (debug && errStrm != NULL) {
241  // std::ostringstream os;
242  // os << "readLine: Got entry: row=" << rowInd << ", col=" << colInd
243  // << ", realPart=" << realPart << ", imagPart=" << imagPart
244  // << std::endl;
245  // *errStrm << os.str ();
246  // }
247  // This line may have side effects.
248  const int errCode =
249  processTriple(rowInd, colInd, SC(realPart, imagPart));
250  if (errCode != 0 && errStrm != NULL) {
251  std::ostringstream os;
252  os << "readLine: processTriple returned " << errCode << " != 0."
253  << endl;
254  *errStrm << os.str();
255  }
256  return errCode;
257  } else {
258  return -1;
259  }
260  }
261 };
262 
270 template <class SC, class GO>
271 struct ReadLine<SC, GO, false> {
292  static int
293  readLine(std::function<int(const GO, const GO, const SC&)> processTriple,
294  const std::string& line,
295  const std::size_t lineNumber,
296  const bool tolerant = false,
297  std::ostream* errStrm = NULL,
298  const bool debug = false) {
299  using std::endl;
300  using ::Teuchos::MatrixMarket::checkCommentLine;
301  using ::Teuchos::MatrixMarket::readRealData;
302 
303  GO rowInd, colInd;
304  SC val;
305  std::istringstream istr(line);
306  bool success = true;
307  try {
308  success = readRealData(istr, rowInd, colInd, val,
309  lineNumber, tolerant);
310  } catch (std::exception& e) {
311  success = false;
312  if (errStrm != NULL) {
313  std::ostringstream os;
314  os << "readLine: readRealData threw an exception: " << e.what()
315  << endl;
316  *errStrm << os.str();
317  }
318  }
319 
320  if (success) {
321  if (debug && errStrm != NULL) {
322  std::ostringstream os;
323  os << "readLine: Got entry: row=" << rowInd << ", col=" << colInd
324  << ", val=" << val << std::endl;
325  *errStrm << os.str();
326  }
327  // This line may have side effects.
328  const int errCode = processTriple(rowInd, colInd, val);
329  if (errCode != 0 && errStrm != NULL) {
330  std::ostringstream os;
331  os << "readLine: processTriple returned " << errCode << " != 0."
332  << endl;
333  *errStrm << os.str();
334  }
335  return errCode;
336  } else {
337  return -1;
338  }
339  }
340 };
341 
367 template <class SC, class GO>
368 int readLine(std::function<int(const GO, const GO, const SC&)> processTriple,
369  const std::string& line,
370  const std::size_t lineNumber,
371  const bool tolerant = false,
372  std::ostream* errStrm = NULL,
373  const bool debug = false) {
374  return ReadLine<SC, GO>::readLine(processTriple, line, lineNumber,
375  tolerant, errStrm, debug);
376 }
377 
408 template <class SC, class GO>
409 int readTriples(std::istream& inputStream,
410  std::size_t& curLineNum,
411  std::size_t& numTriplesRead,
412  std::function<int(const GO, const GO, const SC&)> processTriple,
413  const std::size_t maxNumTriplesToRead,
414  const bool tolerant = false,
415  std::ostream* errStrm = NULL,
416  const bool debug = false) {
417  using std::endl;
418  using std::size_t;
419  using Teuchos::MatrixMarket::checkCommentLine;
420 
421  numTriplesRead = 0; // output argument only
422  if (inputStream.eof()) {
423  return 0; // no error, just nothing left to read
424  } else if (inputStream.fail()) {
425  if (errStrm != NULL) {
426  *errStrm << "Input stream reports a failure (not the same as "
427  "end-of-file)."
428  << endl;
429  }
430  return -1;
431  }
432 
433  std::string line;
434  std::vector<size_t> badLineNumbers;
435  int errCode = 0; // 0 means success
436 
437  bool inputStreamCanStillBeRead = std::getline(inputStream, line).good();
438  ++curLineNum; // we read the line; we can't put it back
439  while (inputStreamCanStillBeRead && numTriplesRead < maxNumTriplesToRead) {
440  // if (debug && errStrm != NULL) {
441  // std::ostringstream os;
442  // os << "readTriples: Got line: \"" << line << "\"" << std::endl;
443  // *errStrm << os.str ();
444  // }
445  size_t start, size;
446 
447  const bool isCommentLine =
448  checkCommentLine(line, start, size, curLineNum, tolerant);
449  if (isCommentLine) {
450  // Move on to the next line, if there is a next line.
451  inputStreamCanStillBeRead = std::getline(inputStream, line).good();
452  ++curLineNum; // we read another line; we can't put it back
453  continue; // move on to the next line
454  } else { // not a comment line; should have a sparse matrix entry
455  const std::string theLine = line.substr(start, size);
456  // If the line has a valid sparse matrix entry, extract it and
457  // hand it off to the processTriple closure.
458  const int curErrCode =
459  readLine(processTriple, theLine, curLineNum, tolerant, errStrm, debug);
460  if (curErrCode != 0) {
461  errCode = curErrCode;
462  badLineNumbers.push_back(curLineNum);
463  } else {
464  ++numTriplesRead;
465  }
466  if (numTriplesRead < maxNumTriplesToRead) {
467  inputStreamCanStillBeRead = std::getline(inputStream, line).good();
468  }
469  }
470  } // while there are lines to read and we need more triples
471 
472  if (errCode != 0 && errStrm != NULL) {
473  const size_t numBadLines = badLineNumbers.size();
474  *errStrm << "Encountered " << numBadLines << " bad line"
475  << (numBadLines != size_t(1) ? "s" : "")
476  << ": [";
477  for (size_t k = 0; k < numBadLines; ++k) {
478  *errStrm << badLineNumbers[k];
479  if (k + 1 < numBadLines) {
480  *errStrm << ", ";
481  }
482  }
483  *errStrm << "]" << endl;
484  }
485  if (!inputStream.eof() && inputStream.fail()) {
486  if (errCode == 0) {
487  errCode = -1;
488  }
489  if (errStrm != NULL) {
490  *errStrm << "The input stream is not at end-of-file, "
491  "but is in a bad state."
492  << endl;
493  }
494  }
495  return errCode;
496 }
497 
523 template <class SC, class GO>
524 int readAndSendOneBatchOfTriples(std::istream& inputStream,
525  std::size_t& curLineNum,
526  std::size_t& numEntRead,
527  ::Teuchos::ArrayRCP<int>& sizeBuf,
528  ::Teuchos::ArrayRCP<char>& msgBuf,
529  std::vector<GO>& rowInds,
530  std::vector<GO>& colInds,
531  std::vector<SC>& vals,
532  const std::size_t maxNumEntPerMsg,
533  const int destRank,
534  const ::Teuchos::Comm<int>& comm,
535  const bool tolerant = false,
536  std::ostream* errStrm = NULL,
537  const bool debug = false) {
538  using std::endl;
539  using ::Teuchos::isend;
540  using ::Tpetra::Details::countPackTriples;
541  using ::Tpetra::Details::countPackTriplesCount;
542  using ::Tpetra::Details::packTriples;
543  using ::Tpetra::Details::packTriplesCount;
544 
545 #if KOKKOS_VERSION >= 40799
546  using ::KokkosKernels::ArithTraits;
547 #else
548  using ::Kokkos::ArithTraits;
549 #endif
550  // constexpr int sizeTag = 42 + (ArithTraits<SC>::is_complex ? 100 : 0);
551  // constexpr int msgTag = 43 + (ArithTraits<SC>::is_complex ? 100 : 0);
552  constexpr int sizeTag = 42;
553  constexpr int msgTag = 43;
554  // constexpr int srcRank = 0;
555  int errCode = 0;
556 
557  // This doesn't actually deallocate memory; it just changes the size
558  // back to zero, so that push_back starts over from the beginning.
559  rowInds.resize(0);
560  colInds.resize(0);
561  vals.resize(0);
562  // Closure that adds the new matrix entry to the above temp arrays.
563  auto processTriple = [&rowInds, &colInds, &vals](const GO rowInd, const GO colInd, const SC& val) {
564  try {
565  rowInds.push_back(rowInd);
566  colInds.push_back(colInd);
567  vals.push_back(val);
568  } catch (...) {
569  return -1;
570  }
571  return 0;
572  };
573  numEntRead = 0; // output argument
574  errCode = readTriples<SC, GO>(inputStream, curLineNum, numEntRead,
575  processTriple, maxNumEntPerMsg, tolerant,
576  errStrm, debug);
577  if (debug && errStrm != NULL) {
578  std::ostringstream os;
579  os << "Proc " << comm.getRank() << ", SC=" << typeid(SC).name()
580  << ", GO=" << typeid(GO).name() << ": "
581  << "readAndSendOneBatchOfTriples: readTriples read "
582  << numEntRead << " matrix entries, and returned errCode="
583  << errCode << "." << std::endl;
584  *errStrm << os.str();
585  }
586  if (numEntRead != rowInds.size() ||
587  numEntRead != colInds.size() ||
588  numEntRead != vals.size()) {
589  if (errStrm != NULL) {
590  *errStrm << "readTriples size results are not consistent. "
591  << "numEntRead = " << numEntRead
592  << ", rowInds.size() = " << rowInds.size()
593  << ", colInds.size() = " << colInds.size()
594  << ", and vals.size() = " << vals.size() << "."
595  << std::endl;
596  }
597  if (errCode == 0) {
598  errCode = -1;
599  }
600  }
601 
602  // We don't consider reading having "failed" if we've reached
603  // end-of-file before reading maxNumEntPerMsg entries. It's OK if
604  // we got fewer triples than that. Furthermore, we have to send at
605  // least one message to the destination process, even if the read
606  // from the file failed.
607 
608  if (numEntRead == 0 || errCode != 0) {
609  // Send a message size of zero to the receiving process, to tell
610  // it that we have no triples to send, or that there was an error
611  // reading. The latter just means that we "go through the
612  // motions," then broadcast the error code.
613  sizeBuf[0] = 0;
614  if (debug && errStrm != NULL) {
615  std::ostringstream os;
616  os << "Proc " << comm.getRank() << ", SC=" << typeid(SC).name()
617  << ", GO=" << typeid(GO).name() << ": "
618  << "Post send (size=0, errCode=" << errCode << ") "
619  << "to " << destRank << " with tag " << sizeTag << endl;
620  *errStrm << os.str();
621  }
622  send(sizeBuf.getRawPtr(), 1, destRank, sizeTag, comm);
623  return errCode;
624  } else { // we read a nonzero # of triples, without error
625  const int numEnt = static_cast<int>(numEntRead);
626  int countSize = 0; // output argument
627  int triplesSize = 0; // output argument
628 
629  errCode = countPackTriplesCount(comm, countSize, errStrm);
630  // countSize should never be nonpositive, since we have to pack an
631  // integer size.
632  if (countSize <= 0 && errCode == 0) {
633  errCode = -1;
634  }
635 
636  if (errCode != 0) {
637  // Send zero to the receiving process, to tell it about the error.
638  sizeBuf[0] = 0;
639  if (debug && errStrm != NULL) {
640  std::ostringstream os;
641  os << "Proc " << comm.getRank() << ", SC=" << typeid(SC).name()
642  << ", GO=" << typeid(GO).name() << ": "
643  << "Post send (size=0, error case) to " << destRank
644  << " with tag " << sizeTag << endl;
645  *errStrm << os.str();
646  }
647  send(sizeBuf.getRawPtr(), 1, destRank, sizeTag, comm);
648  return errCode;
649  } else { // countPackTriplesCount succeeded
650  errCode = countPackTriples<SC, GO>(numEnt, comm, triplesSize, errStrm);
651  if (errCode != 0) {
652  // Send a message size of zero to the receiving process, to
653  // tell it that there was an error counting.
654  sizeBuf[0] = 0;
655  if (debug && errStrm != NULL) {
656  std::ostringstream os;
657  os << "Proc " << comm.getRank() << ", SC=" << typeid(SC).name()
658  << ", GO=" << typeid(GO).name() << ": "
659  << "Post send (size=0, error case) to " << destRank
660  << " with tag " << sizeTag << endl;
661  *errStrm << os.str();
662  }
663  send(sizeBuf.getRawPtr(), 1, destRank, sizeTag, comm);
664  return errCode;
665  } else { // countPackTriples succeeded; message packed & ready to send
666  // Send the message size (in bytes). We can use a nonblocking
667  // send here, and try to overlap with message packing.
668  const int outBufSize = countSize + triplesSize;
669  sizeBuf[0] = outBufSize;
670  if (debug && errStrm != NULL) {
671  std::ostringstream os;
672  os << "Proc " << comm.getRank() << ", SC=" << typeid(SC).name()
673  << ", GO=" << typeid(GO).name() << ": "
674  << "Post isend (size=" << sizeBuf[0] << ") to " << destRank
675  << " with tag " << sizeTag << endl;
676  *errStrm << os.str();
677  }
678  auto sizeReq = isend<int, int>(sizeBuf, destRank, sizeTag, comm);
679 
680  msgBuf.resize(outBufSize);
681  char* outBuf = msgBuf.getRawPtr();
682 
683  // If anything goes wrong with packing, send the pack buffer
684  // anyway, since the receiving process expects a message.
685  int outBufCurPos = 0; // input/output argument
686  errCode = packTriplesCount(numEnt, outBuf, outBufSize,
687  outBufCurPos, comm, errStrm);
688  if (errCode == 0) {
689  errCode = packTriples<SC, GO>(rowInds.data(), colInds.data(),
690  vals.data(), numEnt, outBuf,
691  outBufSize, outBufCurPos, comm,
692  errStrm);
693  }
694  if (debug && errStrm != NULL) {
695  std::ostringstream os;
696  os << "Proc " << comm.getRank() << ", SC=" << typeid(SC).name()
697  << ", GO=" << typeid(GO).name() << ": "
698  << "Post isend (packed data) to " << destRank
699  << " with tag " << msgTag << endl;
700  *errStrm << os.str();
701  }
702  auto msgReq = isend<int, char>(msgBuf, destRank, msgTag, comm);
703 
704  // Wait on the two messages. It doesn't matter in what order
705  // we send them, because they have different tags. The
706  // receiving process will wait on the first message first, in
707  // order to get the size of the second message.
708  if (debug && errStrm != NULL) {
709  std::ostringstream os;
710  os << "Proc " << comm.getRank() << ", SC=" << typeid(SC).name()
711  << ", GO=" << typeid(GO).name() << ": "
712  << "Wait on isend (size)" << endl;
713  *errStrm << os.str();
714  }
715  sizeReq->wait();
716  if (debug && errStrm != NULL) {
717  std::ostringstream os;
718  os << "Proc " << comm.getRank() << ", SC=" << typeid(SC).name()
719  << ", GO=" << typeid(GO).name() << ": "
720  << "Wait on isend (packed data)" << endl;
721  *errStrm << os.str();
722  }
723  msgReq->wait();
724 
725  // This doesn't actually deallocate; it just resets sizes to zero.
726  rowInds.clear();
727  colInds.clear();
728  vals.clear();
729  }
730  }
731  }
732  return errCode;
733 }
734 
742 
770 template <class SC, class GO, class CommRequestPtr>
771 int recvOneBatchOfTriples(std::vector<GO>& rowInds,
772  std::vector<GO>& colInds,
773  std::vector<SC>& vals,
774  int& numEnt,
775  ::Teuchos::ArrayRCP<int>& sizeBuf,
776  ::Teuchos::ArrayRCP<char>& msgBuf,
777  CommRequestPtr& sizeReq,
778  const int srcRank,
779  const ::Teuchos::Comm<int>& comm,
780  const bool tolerant = false,
781  std::ostream* errStrm = NULL,
782  const bool debug = false) {
783 #if KOKKOS_VERSION >= 40799
784  using ::KokkosKernels::ArithTraits;
785 #else
786  using ::Kokkos::ArithTraits;
787 #endif
788  using ::Tpetra::Details::unpackTriples;
789  using ::Tpetra::Details::unpackTriplesCount;
790 
792  // constexpr int msgTag = 43 + (ArithTraits<SC>::is_complex ? 100 : 0);
793  // constexpr int sizeTag = 42;
794  constexpr int msgTag = 43;
795  int errCode = 0; // return value
796  numEnt = 0; // output argument
797 
798  // Wait on the ireceive we preposted before calling this function.
799  if (debug && errStrm != NULL) {
800  std::ostringstream os;
801  os << "Proc " << comm.getRank() << ", SC=" << typeid(SC).name()
802  << ", GO=" << typeid(GO).name() << ": "
803  << "Wait on irecv (size)" << std::endl;
804  *errStrm << os.str();
805  }
806  sizeReq->wait();
807  sizeReq = CommRequestPtr(NULL);
808  const int inBufSize = sizeBuf[0];
809  if (debug && errStrm != NULL) {
810  std::ostringstream os;
811  os << "Proc " << comm.getRank() << ", SC=" << typeid(SC).name()
812  << ", GO=" << typeid(GO).name() << ": "
813  << "Received size: sizeBuf[0]=" << sizeBuf[0] << std::endl;
814  *errStrm << os.str();
815  }
816 
817  if (inBufSize == 0) {
818  numEnt = 0;
819  rowInds.resize(0);
820  colInds.resize(0);
821  vals.resize(0);
822  } else {
823  msgBuf.resize(inBufSize);
824  char* inBuf = msgBuf.getRawPtr();
825 
826  if (debug && errStrm != NULL) {
827  std::ostringstream os;
828  os << "Proc " << comm.getRank() << ", SC=" << typeid(SC).name()
829  << ", GO=" << typeid(GO).name() << ": "
830  << "Post irecv (packed data) "
831  << "from " << srcRank
832  << " with tag " << msgTag << std::endl;
833  *errStrm << os.str();
834  }
835  auto msgReq = ::Teuchos::ireceive(msgBuf, srcRank, msgTag, comm);
836  if (debug && errStrm != NULL) {
837  std::ostringstream os;
838  os << "Proc " << comm.getRank() << ", SC=" << typeid(SC).name()
839  << ", GO=" << typeid(GO).name() << ": "
840  << "Wait on irecv (packed data)" << std::endl;
841  *errStrm << os.str();
842  }
843  msgReq->wait();
844 
845  int inBufCurPos = 0; // output argument
846  errCode = unpackTriplesCount(inBuf, inBufSize, inBufCurPos,
847  numEnt, comm, errStrm);
848  if (errCode == 0) {
849  rowInds.resize(numEnt);
850  colInds.resize(numEnt);
851  vals.resize(numEnt);
852  errCode = unpackTriples<SC, GO>(inBuf, inBufSize, inBufCurPos,
853  rowInds.data(), colInds.data(),
854  vals.data(), numEnt, comm, errStrm);
855  }
856  }
857  return errCode;
858 }
859 
860 } // namespace Impl
861 
862 //
863 // SKIP DOWN TO HERE FOR "PUBLIC" INTERFACE
864 //
865 
899 template <class SC, class GO>
900 int readAndDealOutTriples(std::istream& inputStream, // only valid on Proc 0
901  std::size_t& curLineNum, // only valid on Proc 0
902  std::size_t& totalNumEntRead, // only valid on Proc 0
903  std::function<int(const GO, const GO, const SC&)> processTriple,
904  const std::size_t maxNumEntPerMsg,
905  const ::Teuchos::Comm<int>& comm,
906  const bool tolerant = false,
907  std::ostream* errStrm = NULL,
908  const bool debug = false) {
909 #if KOKKOS_VERSION >= 40799
910  using KokkosKernels::ArithTraits;
911 #else
912  using Kokkos::ArithTraits;
913 #endif
914  using std::endl;
915  using std::size_t;
916 
917  constexpr int srcRank = 0;
918  // constexpr int sizeTag = 42 + (ArithTraits<SC>::is_complex ? 100 : 0);
920  constexpr int sizeTag = 42;
921  // constexpr int msgTag = 43;
922  const int myRank = comm.getRank();
923  const int numProcs = comm.getSize();
924  int errCode = 0;
925 
926  ::Teuchos::ArrayRCP<int> sizeBuf(1);
927  ::Teuchos::ArrayRCP<char> msgBuf; // to be resized as needed
928 
929  // Temporary storage for reading & packing (on Process srcRank) or
930  // unpacking (every other process) triples.
931  std::vector<GO> rowInds;
932  std::vector<GO> colInds;
933  std::vector<SC> vals;
934  rowInds.reserve(maxNumEntPerMsg);
935  colInds.reserve(maxNumEntPerMsg);
936  vals.reserve(maxNumEntPerMsg);
937 
938  totalNumEntRead = 0;
939  if (myRank == srcRank) {
940  // Loop around through all the processes, including this one, over
941  // and over until we reach the end of the file, or an error occurs.
942  int destRank = 0;
943  bool lastMessageWasLegitZero = false;
944  for (;
945  !inputStream.eof() && errCode == 0;
946  destRank = (destRank + 1) % numProcs) {
947  size_t curNumEntRead = 0; // output argument of below
948  if (destRank == srcRank) {
949  // We can read and process the triples directly. We don't
950  // need to use intermediate storage, because we don't need to
951  // pack and send the triples.
952  const int readErrCode =
953  Impl::readTriples<SC, GO>(inputStream, curLineNum, curNumEntRead,
954  processTriple, maxNumEntPerMsg, tolerant,
955  errStrm, debug);
956  if (debug && errStrm != NULL) {
957  std::ostringstream os;
958  os << "Proc " << comm.getRank() << ", SC=" << typeid(SC).name()
959  << ", GO=" << typeid(GO).name() << ": "
960  << "(dest=src) readTriples returned curNumEntRead="
961  << curNumEntRead << ", errCode=" << readErrCode << endl;
962  *errStrm << os.str();
963  }
964  errCode = (readErrCode != 0) ? readErrCode : errCode;
965  } else {
966  if (false && debug && errStrm != NULL) {
967  std::ostringstream os;
968  os << "Proc " << comm.getRank() << ", SC=" << typeid(SC).name()
969  << ", GO=" << typeid(GO).name() << ": "
970  << "Calling readAndSend... with destRank=" << destRank << endl;
971  *errStrm << os.str();
972  }
973  // Read, pack, and send the triples to destRank.
974  const int readAndSendErrCode =
975  Impl::readAndSendOneBatchOfTriples<SC, GO>(inputStream, curLineNum,
976  curNumEntRead,
977  sizeBuf, msgBuf,
978  rowInds, colInds, vals,
979  maxNumEntPerMsg, destRank,
980  comm, tolerant, errStrm,
981  debug);
982  totalNumEntRead += curNumEntRead;
983  if (debug && errStrm != NULL) {
984  std::ostringstream os;
985  os << "Proc " << comm.getRank() << ", SC=" << typeid(SC).name()
986  << ", GO=" << typeid(GO).name() << ": "
987  << "readAndSend... with destRank=" << destRank
988  << " returned curNumEntRead=" << curNumEntRead
989  << ", errCode=" << readAndSendErrCode << endl;
990  *errStrm << os.str();
991  }
992  errCode = (readAndSendErrCode != 0) ? readAndSendErrCode : errCode;
993  if (readAndSendErrCode == 0 && curNumEntRead == 0) {
994  lastMessageWasLegitZero = true;
995  if (debug && errStrm != NULL) {
996  std::ostringstream os;
997  os << "Proc " << comm.getRank() << ", SC=" << typeid(SC).name()
998  << ", GO=" << typeid(GO).name() << ": "
999  << "Last send to " << destRank << " with tag " << sizeTag
1000  << " was legit zero, counts as termination" << endl;
1001  *errStrm << os.str();
1002  }
1003  }
1004  }
1005  } // loop around through processes until done reading file, or error
1006 
1007  // Loop around through the remaining processes, and tell them that
1008  // we're done, by sending zero. If the last message we sent to
1009  // destRank was zero, then skip that process, since it only
1010  // expects one message of size zero. Note that destRank got
1011  // incremented mod numProcs at end of loop, so we have to
1012  // decrement it mod numProcs.
1013  destRank = (destRank - 1) % numProcs;
1014  if (destRank < 0) { // C mod operator does not promise positivity
1015  destRank = destRank + numProcs;
1016  }
1017 
1018  const int startRank = lastMessageWasLegitZero ? (destRank + 1) : destRank;
1019  for (int outRank = startRank; outRank < numProcs; ++outRank) {
1020  if (outRank != srcRank) {
1021  if (debug && errStrm != NULL) {
1022  std::ostringstream os;
1023  os << "Proc " << comm.getRank() << ", SC=" << typeid(SC).name()
1024  << ", GO=" << typeid(GO).name() << ": "
1025  << "Post send (size, termination msg) to " << outRank
1026  << " with tag " << sizeTag << "(was last message legit zero? "
1027  << (lastMessageWasLegitZero ? "true" : "false") << ")" << endl;
1028  *errStrm << os.str();
1029  }
1030  sizeBuf[0] = 0;
1031  ::Teuchos::send(sizeBuf.getRawPtr(), 1, outRank, sizeTag, comm);
1032  }
1033  }
1034  } else {
1035  while (true) {
1036  // Prepost a message to receive the size (in bytes) of the
1037  // incoming packet.
1038  sizeBuf[0] = 0; // superfluous, but safe
1039  if (debug && errStrm != NULL) {
1040  std::ostringstream os;
1041  os << "Proc " << comm.getRank() << ", SC=" << typeid(SC).name()
1042  << ", GO=" << typeid(GO).name() << ": "
1043  << "Post irecv (size) from " << srcRank
1044  << " with tag " << sizeTag << std::endl;
1045  *errStrm << os.str();
1046  }
1047  auto sizeReq = ::Teuchos::ireceive(sizeBuf, srcRank, sizeTag, comm);
1048 
1049  int numEnt = 0; // output argument
1050  const int recvErrCode =
1051  Impl::recvOneBatchOfTriples(rowInds, colInds, vals, numEnt, sizeBuf,
1052  msgBuf, sizeReq, srcRank, comm, tolerant,
1053  errStrm, debug);
1054  if (debug && errStrm != NULL) {
1055  std::ostringstream os;
1056  os << "Proc " << comm.getRank() << ", SC=" << typeid(SC).name()
1057  << ", GO=" << typeid(GO).name() << ": "
1058  << "recvOneBatchOfTriples returned numEnt=" << numEnt
1059  << ", errCode=" << recvErrCode << endl;
1060  *errStrm << os.str();
1061  }
1062  errCode = (recvErrCode != 0) ? recvErrCode : errCode;
1063 
1064  if (numEnt != static_cast<int>(rowInds.size()) ||
1065  numEnt != static_cast<int>(colInds.size()) ||
1066  numEnt != static_cast<int>(vals.size())) {
1067  errCode = (errCode == 0) ? -1 : errCode;
1068  if (errStrm != NULL) {
1069  *errStrm << "recvOneBatchOfTriples produced inconsistent data sizes. "
1070  << "numEnt = " << numEnt
1071  << ", rowInds.size() = " << rowInds.size()
1072  << ", colInds.size() = " << colInds.size()
1073  << ", vals.size() = " << vals.size() << "."
1074  << endl;
1075  }
1076  } // if sizes inconsistent
1077 
1078  // Sending zero items is how Process srcRank tells this process
1079  // that it (Process srcRank) is done sending out data.
1080  if (numEnt == 0) {
1081  break;
1082  }
1083 
1084  for (int k = 0; k < numEnt && errCode == 0; ++k) {
1085  const int curErrCode = processTriple(rowInds[k], colInds[k], vals[k]);
1086  errCode = (curErrCode == 0) ? errCode : curErrCode;
1087  }
1088  } // while we still get messages from srcRank
1089  }
1090 
1091  if (debug && errStrm != NULL) {
1092  std::ostringstream os;
1093  os << "Proc " << comm.getRank() << ", SC=" << typeid(SC).name()
1094  << ", GO=" << typeid(GO).name() << ": "
1095  << "Done with send/recv loop" << endl;
1096  *errStrm << os.str();
1097  }
1098  // Do a bitwise OR to get an error code that is nonzero if and only
1099  // if any process' local error code is nonzero.
1100  using ::Teuchos::outArg;
1101  using ::Teuchos::REDUCE_BOR;
1102  using ::Teuchos::reduceAll;
1103  const int lclErrCode = errCode;
1104  reduceAll<int, int>(comm, REDUCE_BOR, lclErrCode, outArg(errCode));
1105  return errCode;
1106 }
1107 
1108 } // namespace Details
1109 } // namespace Tpetra
1110 
1111 #endif // TPETRA_DETAILS_READTRIPLES_HPP
int packTriplesCount(const int, char[], const int, int &, const ::Teuchos::Comm< int > &, std::ostream *errStrm)
Pack the count (number) of matrix triples.
static int readLine(std::function< int(const GO, const GO, const SC &)> processTriple, const std::string &line, const std::size_t lineNumber, const bool tolerant=false, std::ostream *errStrm=NULL, const bool debug=false)
Take a line from the Matrix Market file or input stream, and process the sparse matrix entry in that ...
int readAndDealOutTriples(std::istream &inputStream, std::size_t &curLineNum, std::size_t &totalNumEntRead, std::function< int(const GO, const GO, const SC &)> processTriple, const std::size_t maxNumEntPerMsg, const ::Teuchos::Comm< int > &comm, const bool tolerant=false, std::ostream *errStrm=NULL, const bool debug=false)
On Process 0 in the given communicator, read sparse matrix entries (in chunks of at most maxNumEntPer...
Implementation of the readLine stand-alone function in this namespace (see below).
int unpackTriplesCount(const char[], const int, int &, int &, const ::Teuchos::Comm< int > &, std::ostream *errStrm)
Unpack just the count of triples from the given input buffer.
static int readLine(std::function< int(const GO, const GO, const SC &)> processTriple, const std::string &line, const std::size_t lineNumber, const bool tolerant=false, std::ostream *errStrm=NULL, const bool debug=false)
Take a line from the Matrix Market file or input stream, and process the sparse matrix entry in that ...
int countPackTriplesCount(const ::Teuchos::Comm< int > &, int &size, std::ostream *errStrm)
Compute the buffer size required by packTriples for packing the number of matrix entries (&quot;triples&quot;)...
void start()
Start the deep_copy counter.
static int readLine(std::function< int(const GO, const GO, const SC &)> processTriple, const std::string &line, const std::size_t lineNumber, const bool tolerant=false, std::ostream *errStrm=NULL, const bool debug=false)
Take a line from the Matrix Market file or input stream, and process the sparse matrix entry in that ...