Tpetra parallel linear algebra  Version of the Day
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
Tpetra_Details_ReadTriples.hpp
Go to the documentation of this file.
1 // @HEADER
2 // *****************************************************************************
3 // Tpetra: Templated Linear Algebra Services Package
4 //
5 // Copyright 2008 NTESS and the Tpetra contributors.
6 // SPDX-License-Identifier: BSD-3-Clause
7 // *****************************************************************************
8 // @HEADER
9 
10 #ifndef TPETRA_DETAILS_READTRIPLES_HPP
11 #define TPETRA_DETAILS_READTRIPLES_HPP
12 
21 
22 #include "TpetraCore_config.h"
23 #include "Tpetra_Details_PackTriples.hpp"
24 #include "Kokkos_ArithTraits.hpp"
25 #include "Teuchos_MatrixMarket_generic.hpp"
26 #include "Teuchos_CommHelpers.hpp"
27 #include <iostream>
28 #include <typeinfo> // for debugging
29 
30 namespace Tpetra {
31 namespace Details {
32 
33 //
34 // Search for "SKIP DOWN TO HERE" (omit quotes) for the "public"
35 // interface. I put "public" in quotes because it's public only for
36 // Tpetra developers, NOT for Tpetra users.
37 //
38 
39 namespace Impl {
40 
41 // mfh 01 Feb 2017: Unfortunately,
42 // Teuchos::MatrixMarket::readComplexData requires Teuchos to have
43 // complex arithmetic support enabled. To avoid this issue, I
44 // reimplement the function here. It's not very long.
45 
78 template<class OrdinalType, class RealType>
79 bool
80 readComplexData (std::istream& istr,
81  OrdinalType& rowIndex,
82  OrdinalType& colIndex,
83  RealType& realPart,
84  RealType& imagPart,
85  const std::size_t lineNumber,
86  const bool tolerant)
87 {
88  using ::Teuchos::MatrixMarket::readRealData;
89 
90  RealType the_realPart, the_imagPart;
91  if (! readRealData (istr, rowIndex, colIndex, the_realPart, lineNumber, tolerant)) {
92  if (tolerant) {
93  return false;
94  }
95  else {
96  std::ostringstream os;
97  os << "Failed to read pattern data and/or real value from line "
98  << lineNumber << " of input";
99  throw std::invalid_argument(os.str());
100  }
101  }
102  if (istr.eof ()) {
103  if (tolerant) {
104  return false;
105  }
106  else {
107  std::ostringstream os;
108  os << "No more data after real value on line "
109  << lineNumber << " of input";
110  throw std::invalid_argument (os.str ());
111  }
112  }
113  istr >> the_imagPart;
114  if (istr.fail ()) {
115  if (tolerant) {
116  return false;
117  }
118  else {
119  std::ostringstream os;
120  os << "Failed to get imaginary value from line "
121  << lineNumber << " of input";
122  throw std::invalid_argument (os.str ());
123  }
124  }
125  realPart = the_realPart;
126  imagPart = the_imagPart;
127  return true;
128 }
129 
130 
140 template<class SC,
141  class GO,
142  const bool isComplex = ::Kokkos::ArithTraits<SC>::is_complex>
143 struct ReadLine {
164  static int
165  readLine (std::function<int (const GO, const GO, const SC&)> processTriple,
166  const std::string& line,
167  const std::size_t lineNumber,
168  const bool tolerant = false,
169  std::ostream* errStrm = NULL,
170  const bool debug = false);
171 };
172 
180 template<class SC, class GO>
181 struct ReadLine<SC, GO, true> {
202  static int
203  readLine (std::function<int (const GO, const GO, const SC&)> processTriple,
204  const std::string& line,
205  const std::size_t lineNumber,
206  const bool tolerant = false,
207  std::ostream* errStrm = NULL,
208  const bool debug = false)
209  {
210  using ::Teuchos::MatrixMarket::checkCommentLine;
211  typedef typename ::Kokkos::ArithTraits<SC>::mag_type real_type;
212  using std::endl;
213 
214  GO rowInd, colInd;
215  real_type realPart, imagPart;
216  std::istringstream istr (line);
217  bool success = true;
218  try {
219  // Use the version of this function in this file, not the
220  // version in Teuchos_MatrixMarket_generic.hpp, because the
221  // latter only exists if HAVE_TEUCHOS_COMPLEX is defined.
222  success = readComplexData (istr, rowInd, colInd, realPart, imagPart,
223  lineNumber, tolerant);
224  }
225  catch (std::exception& e) {
226  success = false;
227  if (errStrm != NULL) {
228  std::ostringstream os;
229  os << "readLine: readComplexData threw an exception: " << e.what ()
230  << endl;
231  *errStrm << os.str ();
232  }
233  }
234 
235  if (success) {
236  // if (debug && errStrm != NULL) {
237  // std::ostringstream os;
238  // os << "readLine: Got entry: row=" << rowInd << ", col=" << colInd
239  // << ", realPart=" << realPart << ", imagPart=" << imagPart
240  // << std::endl;
241  // *errStrm << os.str ();
242  // }
243  // This line may have side effects.
244  const int errCode =
245  processTriple (rowInd, colInd, SC (realPart, imagPart));
246  if (errCode != 0 && errStrm != NULL) {
247  std::ostringstream os;
248  os << "readLine: processTriple returned " << errCode << " != 0."
249  << endl;
250  *errStrm << os.str ();
251  }
252  return errCode;
253  }
254  else {
255  return -1;
256  }
257  }
258 };
259 
267 template<class SC, class GO>
268 struct ReadLine<SC, GO, false> {
289  static int
290  readLine (std::function<int (const GO, const GO, const SC&)> processTriple,
291  const std::string& line,
292  const std::size_t lineNumber,
293  const bool tolerant = false,
294  std::ostream* errStrm = NULL,
295  const bool debug = false)
296  {
297  using ::Teuchos::MatrixMarket::checkCommentLine;
298  using ::Teuchos::MatrixMarket::readRealData;
299  using std::endl;
300 
301  GO rowInd, colInd;
302  SC val;
303  std::istringstream istr (line);
304  bool success = true;
305  try {
306  success = readRealData (istr, rowInd, colInd, val,
307  lineNumber, tolerant);
308  }
309  catch (std::exception& e) {
310  success = false;
311  if (errStrm != NULL) {
312  std::ostringstream os;
313  os << "readLine: readRealData threw an exception: " << e.what ()
314  << endl;
315  *errStrm << os.str ();
316  }
317  }
318 
319  if (success) {
320  if (debug && errStrm != NULL) {
321  std::ostringstream os;
322  os << "readLine: Got entry: row=" << rowInd << ", col=" << colInd
323  << ", val=" << val << std::endl;
324  *errStrm << os.str ();
325  }
326  // This line may have side effects.
327  const int errCode = processTriple (rowInd, colInd, val);
328  if (errCode != 0 && errStrm != NULL) {
329  std::ostringstream os;
330  os << "readLine: processTriple returned " << errCode << " != 0."
331  << endl;
332  *errStrm << os.str ();
333  }
334  return errCode;
335  }
336  else {
337  return -1;
338  }
339  }
340 };
341 
367 template<class SC, class GO>
368 int
369 readLine (std::function<int (const GO, const GO, const SC&)> processTriple,
370  const std::string& line,
371  const std::size_t lineNumber,
372  const bool tolerant = false,
373  std::ostream* errStrm = NULL,
374  const bool debug = false)
375 {
376  return ReadLine<SC, GO>::readLine (processTriple, line, lineNumber,
377  tolerant, errStrm, debug);
378 }
379 
410 template<class SC, class GO>
411 int
412 readTriples (std::istream& inputStream,
413  std::size_t& curLineNum,
414  std::size_t& numTriplesRead,
415  std::function<int (const GO, const GO, const SC&)> processTriple,
416  const std::size_t maxNumTriplesToRead,
417  const bool tolerant = false,
418  std::ostream* errStrm = NULL,
419  const bool debug = false)
420 {
421  using Teuchos::MatrixMarket::checkCommentLine;
422  using std::endl;
423  using std::size_t;
424 
425  numTriplesRead = 0; // output argument only
426  if (inputStream.eof ()) {
427  return 0; // no error, just nothing left to read
428  }
429  else if (inputStream.fail ()) {
430  if (errStrm != NULL) {
431  *errStrm << "Input stream reports a failure (not the same as "
432  "end-of-file)." << endl;
433  }
434  return -1;
435  }
436 
437  std::string line;
438  std::vector<size_t> badLineNumbers;
439  int errCode = 0; // 0 means success
440 
441  bool inputStreamCanStillBeRead = std::getline (inputStream, line).good ();
442  ++curLineNum; // we read the line; we can't put it back
443  while (inputStreamCanStillBeRead && numTriplesRead < maxNumTriplesToRead) {
444  // if (debug && errStrm != NULL) {
445  // std::ostringstream os;
446  // os << "readTriples: Got line: \"" << line << "\"" << std::endl;
447  // *errStrm << os.str ();
448  // }
449  size_t start, size;
450 
451  const bool isCommentLine =
452  checkCommentLine (line, start, size, curLineNum, tolerant);
453  if (isCommentLine) {
454  // Move on to the next line, if there is a next line.
455  inputStreamCanStillBeRead = std::getline (inputStream, line).good ();
456  ++curLineNum; // we read another line; we can't put it back
457  continue; // move on to the next line
458  }
459  else { // not a comment line; should have a sparse matrix entry
460  const std::string theLine = line.substr (start, size);
461  // If the line has a valid sparse matrix entry, extract it and
462  // hand it off to the processTriple closure.
463  const int curErrCode =
464  readLine (processTriple, theLine, curLineNum, tolerant, errStrm, debug);
465  if (curErrCode != 0) {
466  errCode = curErrCode;
467  badLineNumbers.push_back (curLineNum);
468  }
469  else {
470  ++numTriplesRead;
471  }
472  if (numTriplesRead < maxNumTriplesToRead) {
473  inputStreamCanStillBeRead = std::getline (inputStream, line).good ();
474  }
475  }
476  } // while there are lines to read and we need more triples
477 
478  if (errCode != 0 && errStrm != NULL) {
479  const size_t numBadLines = badLineNumbers.size ();
480  *errStrm << "Encountered " << numBadLines << " bad line"
481  << (numBadLines != size_t (1) ? "s" : "")
482  << ": [";
483  for (size_t k = 0; k < numBadLines; ++k) {
484  *errStrm << badLineNumbers[k];
485  if (k + 1 < numBadLines) {
486  *errStrm << ", ";
487  }
488  }
489  *errStrm << "]" << endl;
490  }
491  if (! inputStream.eof () && inputStream.fail ()) {
492  if (errCode == 0) {
493  errCode = -1;
494  }
495  if (errStrm != NULL) {
496  *errStrm << "The input stream is not at end-of-file, "
497  "but is in a bad state." << endl;
498  }
499  }
500  return errCode;
501 }
502 
528 template<class SC, class GO>
529 int
530 readAndSendOneBatchOfTriples (std::istream& inputStream,
531  std::size_t& curLineNum,
532  std::size_t& numEntRead,
533  ::Teuchos::ArrayRCP<int>& sizeBuf,
534  ::Teuchos::ArrayRCP<char>& msgBuf,
535  std::vector<GO>& rowInds,
536  std::vector<GO>& colInds,
537  std::vector<SC>& vals,
538  const std::size_t maxNumEntPerMsg,
539  const int destRank,
540  const ::Teuchos::Comm<int>& comm,
541  const bool tolerant = false,
542  std::ostream* errStrm = NULL,
543  const bool debug = false)
544 {
545  using ::Tpetra::Details::countPackTriplesCount;
546  using ::Tpetra::Details::countPackTriples;
547  using ::Tpetra::Details::packTriplesCount;
548  using ::Tpetra::Details::packTriples;
549  using ::Teuchos::isend;
550  using std::endl;
551 
552  using ::Kokkos::ArithTraits;
553  // constexpr int sizeTag = 42 + (ArithTraits<SC>::is_complex ? 100 : 0);
554  // constexpr int msgTag = 43 + (ArithTraits<SC>::is_complex ? 100 : 0);
555  constexpr int sizeTag = 42;
556  constexpr int msgTag = 43;
557  //constexpr int srcRank = 0;
558  int errCode = 0;
559 
560  // This doesn't actually deallocate memory; it just changes the size
561  // back to zero, so that push_back starts over from the beginning.
562  rowInds.resize (0);
563  colInds.resize (0);
564  vals.resize (0);
565  // Closure that adds the new matrix entry to the above temp arrays.
566  auto processTriple = [&rowInds, &colInds, &vals]
567  (const GO rowInd, const GO colInd, const SC& val) {
568  try {
569  rowInds.push_back (rowInd);
570  colInds.push_back (colInd);
571  vals.push_back (val);
572  }
573  catch (...) {
574  return -1;
575  }
576  return 0;
577  };
578  numEntRead = 0; // output argument
579  errCode = readTriples<SC, GO> (inputStream, curLineNum, numEntRead,
580  processTriple, maxNumEntPerMsg, tolerant,
581  errStrm, debug);
582  if (debug && errStrm != NULL) {
583  std::ostringstream os;
584  os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
585  << ", GO=" << typeid (GO).name () << ": "
586  << "readAndSendOneBatchOfTriples: readTriples read "
587  << numEntRead << " matrix entries, and returned errCode="
588  << errCode << "." << std::endl;
589  *errStrm << os.str ();
590  }
591  if (numEntRead != rowInds.size () ||
592  numEntRead != colInds.size () ||
593  numEntRead != vals.size ()) {
594  if (errStrm != NULL) {
595  *errStrm << "readTriples size results are not consistent. "
596  << "numEntRead = " << numEntRead
597  << ", rowInds.size() = " << rowInds.size ()
598  << ", colInds.size() = " << colInds.size ()
599  << ", and vals.size() = " << vals.size () << "."
600  << std::endl;
601  }
602  if (errCode == 0) {
603  errCode = -1;
604  }
605  }
606 
607  // We don't consider reading having "failed" if we've reached
608  // end-of-file before reading maxNumEntPerMsg entries. It's OK if
609  // we got fewer triples than that. Furthermore, we have to send at
610  // least one message to the destination process, even if the read
611  // from the file failed.
612 
613  if (numEntRead == 0 || errCode != 0) {
614  // Send a message size of zero to the receiving process, to tell
615  // it that we have no triples to send, or that there was an error
616  // reading. The latter just means that we "go through the
617  // motions," then broadcast the error code.
618  sizeBuf[0] = 0;
619  if (debug && errStrm != NULL) {
620  std::ostringstream os;
621  os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
622  << ", GO=" << typeid (GO).name () << ": "
623  << "Post send (size=0, errCode=" << errCode << ") "
624  << "to " << destRank << " with tag " << sizeTag << endl;
625  *errStrm << os.str ();
626  }
627  send (sizeBuf.getRawPtr (), 1, destRank, sizeTag, comm);
628  return errCode;
629  }
630  else { // we read a nonzero # of triples, without error
631  const int numEnt = static_cast<int> (numEntRead);
632  int countSize = 0; // output argument
633  int triplesSize = 0; // output argument
634 
635  errCode = countPackTriplesCount (comm, countSize, errStrm);
636  // countSize should never be nonpositive, since we have to pack an
637  // integer size.
638  if (countSize <= 0 && errCode == 0) {
639  errCode = -1;
640  }
641 
642  if (errCode != 0) {
643  // Send zero to the receiving process, to tell it about the error.
644  sizeBuf[0] = 0;
645  if (debug && errStrm != NULL) {
646  std::ostringstream os;
647  os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
648  << ", GO=" << typeid (GO).name () << ": "
649  << "Post send (size=0, error case) to " << destRank
650  << " with tag " << sizeTag << endl;
651  *errStrm << os.str ();
652  }
653  send (sizeBuf.getRawPtr (), 1, destRank, sizeTag, comm);
654  return errCode;
655  }
656  else { // countPackTriplesCount succeeded
657  errCode = countPackTriples<SC, GO> (numEnt, comm, triplesSize, errStrm);
658  if (errCode != 0) {
659  // Send a message size of zero to the receiving process, to
660  // tell it that there was an error counting.
661  sizeBuf[0] = 0;
662  if (debug && errStrm != NULL) {
663  std::ostringstream os;
664  os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
665  << ", GO=" << typeid (GO).name () << ": "
666  << "Post send (size=0, error case) to " << destRank
667  << " with tag " << sizeTag << endl;
668  *errStrm << os.str ();
669  }
670  send (sizeBuf.getRawPtr (), 1, destRank, sizeTag, comm);
671  return errCode;
672  }
673  else { // countPackTriples succeeded; message packed & ready to send
674  // Send the message size (in bytes). We can use a nonblocking
675  // send here, and try to overlap with message packing.
676  const int outBufSize = countSize + triplesSize;
677  sizeBuf[0] = outBufSize;
678  if (debug && errStrm != NULL) {
679  std::ostringstream os;
680  os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
681  << ", GO=" << typeid (GO).name () << ": "
682  << "Post isend (size=" << sizeBuf[0] << ") to " << destRank
683  << " with tag " << sizeTag << endl;
684  *errStrm << os.str ();
685  }
686  auto sizeReq = isend<int, int> (sizeBuf, destRank, sizeTag, comm);
687 
688  msgBuf.resize (outBufSize);
689  char* outBuf = msgBuf.getRawPtr ();
690 
691  // If anything goes wrong with packing, send the pack buffer
692  // anyway, since the receiving process expects a message.
693  int outBufCurPos = 0; // input/output argument
694  errCode = packTriplesCount (numEnt, outBuf, outBufSize,
695  outBufCurPos, comm, errStrm);
696  if (errCode == 0) {
697  errCode = packTriples<SC, GO> (rowInds.data (), colInds.data (),
698  vals.data (), numEnt, outBuf,
699  outBufSize, outBufCurPos, comm,
700  errStrm);
701  }
702  if (debug && errStrm != NULL) {
703  std::ostringstream os;
704  os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
705  << ", GO=" << typeid (GO).name () << ": "
706  << "Post isend (packed data) to " << destRank
707  << " with tag " << msgTag << endl;
708  *errStrm << os.str ();
709  }
710  auto msgReq = isend<int, char> (msgBuf, destRank, msgTag, comm);
711 
712  // Wait on the two messages. It doesn't matter in what order
713  // we send them, because they have different tags. The
714  // receiving process will wait on the first message first, in
715  // order to get the size of the second message.
716  if (debug && errStrm != NULL) {
717  std::ostringstream os;
718  os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
719  << ", GO=" << typeid (GO).name () << ": "
720  << "Wait on isend (size)" << endl;
721  *errStrm << os.str ();
722  }
723  sizeReq->wait ();
724  if (debug && errStrm != NULL) {
725  std::ostringstream os;
726  os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
727  << ", GO=" << typeid (GO).name () << ": "
728  << "Wait on isend (packed data)" << endl;
729  *errStrm << os.str ();
730  }
731  msgReq->wait ();
732 
733  // This doesn't actually deallocate; it just resets sizes to zero.
734  rowInds.clear ();
735  colInds.clear ();
736  vals.clear ();
737  }
738  }
739  }
740  return errCode;
741 }
742 
750 
778 template<class SC, class GO, class CommRequestPtr>
779 int
780 recvOneBatchOfTriples (std::vector<GO>& rowInds,
781  std::vector<GO>& colInds,
782  std::vector<SC>& vals,
783  int& numEnt,
784  ::Teuchos::ArrayRCP<int>& sizeBuf,
785  ::Teuchos::ArrayRCP<char>& msgBuf,
786  CommRequestPtr& sizeReq,
787  const int srcRank,
788  const ::Teuchos::Comm<int>& comm,
789  const bool tolerant = false,
790  std::ostream* errStrm = NULL,
791  const bool debug = false)
792 {
793  using ::Tpetra::Details::unpackTriplesCount;
794  using ::Tpetra::Details::unpackTriples;
795  using ::Kokkos::ArithTraits;
796 
798  //constexpr int msgTag = 43 + (ArithTraits<SC>::is_complex ? 100 : 0);
799  //constexpr int sizeTag = 42;
800  constexpr int msgTag = 43;
801  int errCode = 0; // return value
802  numEnt = 0; // output argument
803 
804  // Wait on the ireceive we preposted before calling this function.
805  if (debug && errStrm != NULL) {
806  std::ostringstream os;
807  os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
808  << ", GO=" << typeid (GO).name () << ": "
809  << "Wait on irecv (size)" << std::endl;
810  *errStrm << os.str ();
811  }
812  sizeReq->wait ();
813  sizeReq = CommRequestPtr (NULL);
814  const int inBufSize = sizeBuf[0];
815  if (debug && errStrm != NULL) {
816  std::ostringstream os;
817  os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
818  << ", GO=" << typeid (GO).name () << ": "
819  << "Received size: sizeBuf[0]=" << sizeBuf[0] << std::endl;
820  *errStrm << os.str ();
821  }
822 
823  if (inBufSize == 0) {
824  numEnt = 0;
825  rowInds.resize (0);
826  colInds.resize (0);
827  vals.resize (0);
828  }
829  else {
830  msgBuf.resize (inBufSize);
831  char* inBuf = msgBuf.getRawPtr ();
832 
833  if (debug && errStrm != NULL) {
834  std::ostringstream os;
835  os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
836  << ", GO=" << typeid (GO).name () << ": "
837  << "Post irecv (packed data) " << "from " << srcRank
838  << " with tag " << msgTag << std::endl;
839  *errStrm << os.str ();
840  }
841  auto msgReq = ::Teuchos::ireceive (msgBuf, srcRank, msgTag, comm);
842  if (debug && errStrm != NULL) {
843  std::ostringstream os;
844  os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
845  << ", GO=" << typeid (GO).name () << ": "
846  << "Wait on irecv (packed data)" << std::endl;
847  *errStrm << os.str ();
848  }
849  msgReq->wait ();
850 
851  int inBufCurPos = 0; // output argument
852  errCode = unpackTriplesCount (inBuf, inBufSize, inBufCurPos,
853  numEnt, comm, errStrm);
854  if (errCode == 0) {
855  rowInds.resize (numEnt);
856  colInds.resize (numEnt);
857  vals.resize (numEnt);
858  errCode = unpackTriples<SC, GO> (inBuf, inBufSize, inBufCurPos,
859  rowInds.data (), colInds.data (),
860  vals.data (), numEnt, comm, errStrm);
861  }
862  }
863  return errCode;
864 }
865 
866 } // namespace Impl
867 
868 //
869 // SKIP DOWN TO HERE FOR "PUBLIC" INTERFACE
870 //
871 
905 template<class SC, class GO>
906 int
907 readAndDealOutTriples (std::istream& inputStream, // only valid on Proc 0
908  std::size_t& curLineNum, // only valid on Proc 0
909  std::size_t& totalNumEntRead, // only valid on Proc 0
910  std::function<int (const GO, const GO, const SC&)> processTriple,
911  const std::size_t maxNumEntPerMsg,
912  const ::Teuchos::Comm<int>& comm,
913  const bool tolerant = false,
914  std::ostream* errStrm = NULL,
915  const bool debug = false)
916 {
917  using Kokkos::ArithTraits;
918  using std::endl;
919  using std::size_t;
920 
921  constexpr int srcRank = 0;
922  //constexpr int sizeTag = 42 + (ArithTraits<SC>::is_complex ? 100 : 0);
924  constexpr int sizeTag = 42;
925  //constexpr int msgTag = 43;
926  const int myRank = comm.getRank ();
927  const int numProcs = comm.getSize ();
928  int errCode = 0;
929 
930  ::Teuchos::ArrayRCP<int> sizeBuf (1);
931  ::Teuchos::ArrayRCP<char> msgBuf; // to be resized as needed
932 
933  // Temporary storage for reading & packing (on Process srcRank) or
934  // unpacking (every other process) triples.
935  std::vector<GO> rowInds;
936  std::vector<GO> colInds;
937  std::vector<SC> vals;
938  rowInds.reserve (maxNumEntPerMsg);
939  colInds.reserve (maxNumEntPerMsg);
940  vals.reserve (maxNumEntPerMsg);
941 
942  totalNumEntRead = 0;
943  if (myRank == srcRank) {
944  // Loop around through all the processes, including this one, over
945  // and over until we reach the end of the file, or an error occurs.
946  int destRank = 0;
947  bool lastMessageWasLegitZero = false;
948  for ( ;
949  ! inputStream.eof () && errCode == 0;
950  destRank = (destRank + 1) % numProcs) {
951 
952  size_t curNumEntRead = 0; // output argument of below
953  if (destRank == srcRank) {
954  // We can read and process the triples directly. We don't
955  // need to use intermediate storage, because we don't need to
956  // pack and send the triples.
957  const int readErrCode =
958  Impl::readTriples<SC, GO> (inputStream, curLineNum, curNumEntRead,
959  processTriple, maxNumEntPerMsg, tolerant,
960  errStrm, debug);
961  if (debug && errStrm != NULL) {
962  std::ostringstream os;
963  os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
964  << ", GO=" << typeid (GO).name () << ": "
965  << "(dest=src) readTriples returned curNumEntRead="
966  << curNumEntRead << ", errCode=" << readErrCode << endl;
967  *errStrm << os.str ();
968  }
969  errCode = (readErrCode != 0) ? readErrCode : errCode;
970  }
971  else {
972  if (false && debug && errStrm != NULL) {
973  std::ostringstream os;
974  os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
975  << ", GO=" << typeid (GO).name () << ": "
976  << "Calling readAndSend... with destRank=" << destRank << endl;
977  *errStrm << os.str ();
978  }
979  // Read, pack, and send the triples to destRank.
980  const int readAndSendErrCode =
981  Impl::readAndSendOneBatchOfTriples<SC, GO> (inputStream, curLineNum,
982  curNumEntRead,
983  sizeBuf, msgBuf,
984  rowInds, colInds, vals,
985  maxNumEntPerMsg, destRank,
986  comm, tolerant, errStrm,
987  debug);
988  totalNumEntRead += curNumEntRead;
989  if (debug && errStrm != NULL) {
990  std::ostringstream os;
991  os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
992  << ", GO=" << typeid (GO).name () << ": "
993  << "readAndSend... with destRank=" << destRank
994  << " returned curNumEntRead=" << curNumEntRead
995  << ", errCode=" << readAndSendErrCode << endl;
996  *errStrm << os.str ();
997  }
998  errCode = (readAndSendErrCode != 0) ? readAndSendErrCode : errCode;
999  if (readAndSendErrCode == 0 && curNumEntRead == 0) {
1000  lastMessageWasLegitZero = true;
1001  if (debug && errStrm != NULL) {
1002  std::ostringstream os;
1003  os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
1004  << ", GO=" << typeid (GO).name () << ": "
1005  << "Last send to " << destRank << " with tag " << sizeTag
1006  << " was legit zero, counts as termination" << endl;
1007  *errStrm << os.str ();
1008  }
1009  }
1010  }
1011  } // loop around through processes until done reading file, or error
1012 
1013  // Loop around through the remaining processes, and tell them that
1014  // we're done, by sending zero. If the last message we sent to
1015  // destRank was zero, then skip that process, since it only
1016  // expects one message of size zero. Note that destRank got
1017  // incremented mod numProcs at end of loop, so we have to
1018  // decrement it mod numProcs.
1019  destRank = (destRank - 1) % numProcs;
1020  if (destRank < 0) { // C mod operator does not promise positivity
1021  destRank = destRank + numProcs;
1022  }
1023 
1024  const int startRank = lastMessageWasLegitZero ? (destRank+1) : destRank;
1025  for (int outRank = startRank; outRank < numProcs; ++outRank) {
1026  if (outRank != srcRank) {
1027  if (debug && errStrm != NULL) {
1028  std::ostringstream os;
1029  os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
1030  << ", GO=" << typeid (GO).name () << ": "
1031  << "Post send (size, termination msg) to " << outRank
1032  << " with tag " << sizeTag << "(was last message legit zero? "
1033  << (lastMessageWasLegitZero ? "true" : "false") << ")" << endl;
1034  *errStrm << os.str ();
1035  }
1036  sizeBuf[0] = 0;
1037  ::Teuchos::send (sizeBuf.getRawPtr (), 1, outRank, sizeTag, comm);
1038  }
1039  }
1040  }
1041  else {
1042  while (true) {
1043  // Prepost a message to receive the size (in bytes) of the
1044  // incoming packet.
1045  sizeBuf[0] = 0; // superfluous, but safe
1046  if (debug && errStrm != NULL) {
1047  std::ostringstream os;
1048  os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
1049  << ", GO=" << typeid (GO).name () << ": "
1050  << "Post irecv (size) from " << srcRank
1051  << " with tag " << sizeTag << std::endl;
1052  *errStrm << os.str ();
1053  }
1054  auto sizeReq = ::Teuchos::ireceive (sizeBuf, srcRank, sizeTag, comm);
1055 
1056  int numEnt = 0; // output argument
1057  const int recvErrCode =
1058  Impl::recvOneBatchOfTriples (rowInds, colInds, vals, numEnt, sizeBuf,
1059  msgBuf, sizeReq, srcRank, comm, tolerant,
1060  errStrm, debug);
1061  if (debug && errStrm != NULL) {
1062  std::ostringstream os;
1063  os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
1064  << ", GO=" << typeid (GO).name () << ": "
1065  << "recvOneBatchOfTriples returned numEnt=" << numEnt
1066  << ", errCode=" << recvErrCode << endl;
1067  *errStrm << os.str ();
1068  }
1069  errCode = (recvErrCode != 0) ? recvErrCode : errCode;
1070 
1071  if (numEnt != static_cast<int> (rowInds.size ()) ||
1072  numEnt != static_cast<int> (colInds.size ()) ||
1073  numEnt != static_cast<int> (vals.size ())) {
1074  errCode = (errCode == 0) ? -1 : errCode;
1075  if (errStrm != NULL) {
1076  *errStrm << "recvOneBatchOfTriples produced inconsistent data sizes. "
1077  << "numEnt = " << numEnt
1078  << ", rowInds.size() = " << rowInds.size ()
1079  << ", colInds.size() = " << colInds.size ()
1080  << ", vals.size() = " << vals.size () << "."
1081  << endl;
1082  }
1083  } // if sizes inconsistent
1084 
1085  // Sending zero items is how Process srcRank tells this process
1086  // that it (Process srcRank) is done sending out data.
1087  if (numEnt == 0) {
1088  break;
1089  }
1090 
1091  for (int k = 0; k < numEnt && errCode == 0; ++k) {
1092  const int curErrCode = processTriple (rowInds[k], colInds[k], vals[k]);
1093  errCode = (curErrCode == 0) ? errCode : curErrCode;
1094  }
1095  } // while we still get messages from srcRank
1096  }
1097 
1098  if (debug && errStrm != NULL) {
1099  std::ostringstream os;
1100  os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
1101  << ", GO=" << typeid (GO).name () << ": "
1102  << "Done with send/recv loop" << endl;
1103  *errStrm << os.str ();
1104  }
1105  // Do a bitwise OR to get an error code that is nonzero if and only
1106  // if any process' local error code is nonzero.
1107  using ::Teuchos::outArg;
1108  using ::Teuchos::REDUCE_BOR;
1109  using ::Teuchos::reduceAll;
1110  const int lclErrCode = errCode;
1111  reduceAll<int, int> (comm, REDUCE_BOR, lclErrCode, outArg (errCode));
1112  return errCode;
1113 }
1114 
1115 } // namespace Details
1116 } // namespace Tpetra
1117 
1118 #endif // TPETRA_DETAILS_READTRIPLES_HPP
int packTriplesCount(const int, char[], const int, int &, const ::Teuchos::Comm< int > &, std::ostream *errStrm)
Pack the count (number) of matrix triples.
static int readLine(std::function< int(const GO, const GO, const SC &)> processTriple, const std::string &line, const std::size_t lineNumber, const bool tolerant=false, std::ostream *errStrm=NULL, const bool debug=false)
Take a line from the Matrix Market file or input stream, and process the sparse matrix entry in that ...
int readAndDealOutTriples(std::istream &inputStream, std::size_t &curLineNum, std::size_t &totalNumEntRead, std::function< int(const GO, const GO, const SC &)> processTriple, const std::size_t maxNumEntPerMsg, const ::Teuchos::Comm< int > &comm, const bool tolerant=false, std::ostream *errStrm=NULL, const bool debug=false)
On Process 0 in the given communicator, read sparse matrix entries (in chunks of at most maxNumEntPer...
Implementation of the readLine stand-alone function in this namespace (see below).
int unpackTriplesCount(const char[], const int, int &, int &, const ::Teuchos::Comm< int > &, std::ostream *errStrm)
Unpack just the count of triples from the given input buffer.
static int readLine(std::function< int(const GO, const GO, const SC &)> processTriple, const std::string &line, const std::size_t lineNumber, const bool tolerant=false, std::ostream *errStrm=NULL, const bool debug=false)
Take a line from the Matrix Market file or input stream, and process the sparse matrix entry in that ...
int countPackTriplesCount(const ::Teuchos::Comm< int > &, int &size, std::ostream *errStrm)
Compute the buffer size required by packTriples for packing the number of matrix entries (&quot;triples&quot;)...
void start()
Start the deep_copy counter.
static int readLine(std::function< int(const GO, const GO, const SC &)> processTriple, const std::string &line, const std::size_t lineNumber, const bool tolerant=false, std::ostream *errStrm=NULL, const bool debug=false)
Take a line from the Matrix Market file or input stream, and process the sparse matrix entry in that ...