Tpetra parallel linear algebra  Version of the Day
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
Tpetra_Details_ReadTriples.hpp
Go to the documentation of this file.
1 // @HEADER
2 // ***********************************************************************
3 //
4 // Tpetra: Templated Linear Algebra Services Package
5 // Copyright (2008) Sandia Corporation
6 //
7 // Under the terms of Contract DE-AC04-94AL85000 with Sandia Corporation,
8 // the U.S. Government retains certain rights in this software.
9 //
10 // Redistribution and use in source and binary forms, with or without
11 // modification, are permitted provided that the following conditions are
12 // met:
13 //
14 // 1. Redistributions of source code must retain the above copyright
15 // notice, this list of conditions and the following disclaimer.
16 //
17 // 2. Redistributions in binary form must reproduce the above copyright
18 // notice, this list of conditions and the following disclaimer in the
19 // documentation and/or other materials provided with the distribution.
20 //
21 // 3. Neither the name of the Corporation nor the names of the
22 // contributors may be used to endorse or promote products derived from
23 // this software without specific prior written permission.
24 //
25 // THIS SOFTWARE IS PROVIDED BY SANDIA CORPORATION "AS IS" AND ANY
26 // EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
27 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
28 // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SANDIA CORPORATION OR THE
29 // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
30 // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
31 // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
32 // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
33 // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
34 // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
35 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
36 //
37 // Questions? Contact Michael A. Heroux (maherou@sandia.gov)
38 //
39 // ************************************************************************
40 // @HEADER
41 
42 #ifndef TPETRA_DETAILS_READTRIPLES_HPP
43 #define TPETRA_DETAILS_READTRIPLES_HPP
44 
53 
54 #include "TpetraCore_config.h"
55 #include "Tpetra_Details_PackTriples.hpp"
56 #include "Kokkos_ArithTraits.hpp"
57 #include "Teuchos_MatrixMarket_generic.hpp"
58 #include "Teuchos_CommHelpers.hpp"
59 #include <iostream>
60 #include <typeinfo> // for debugging
61 
62 namespace Tpetra {
63 namespace Details {
64 
65 //
66 // Search for "SKIP DOWN TO HERE" (omit quotes) for the "public"
67 // interface. I put "public" in quotes because it's public only for
68 // Tpetra developers, NOT for Tpetra users.
69 //
70 
71 namespace Impl {
72 
73 // mfh 01 Feb 2017: Unfortunately,
74 // Teuchos::MatrixMarket::readComplexData requires Teuchos to have
75 // complex arithmetic support enabled. To avoid this issue, I
76 // reimplement the function here. It's not very long.
77 
110 template<class OrdinalType, class RealType>
111 bool
112 readComplexData (std::istream& istr,
113  OrdinalType& rowIndex,
114  OrdinalType& colIndex,
115  RealType& realPart,
116  RealType& imagPart,
117  const std::size_t lineNumber,
118  const bool tolerant)
119 {
120  using ::Teuchos::MatrixMarket::readRealData;
121 
122  RealType the_realPart, the_imagPart;
123  if (! readRealData (istr, rowIndex, colIndex, the_realPart, lineNumber, tolerant)) {
124  if (tolerant) {
125  return false;
126  }
127  else {
128  std::ostringstream os;
129  os << "Failed to read pattern data and/or real value from line "
130  << lineNumber << " of input";
131  throw std::invalid_argument(os.str());
132  }
133  }
134  if (istr.eof ()) {
135  if (tolerant) {
136  return false;
137  }
138  else {
139  std::ostringstream os;
140  os << "No more data after real value on line "
141  << lineNumber << " of input";
142  throw std::invalid_argument (os.str ());
143  }
144  }
145  istr >> the_imagPart;
146  if (istr.fail ()) {
147  if (tolerant) {
148  return false;
149  }
150  else {
151  std::ostringstream os;
152  os << "Failed to get imaginary value from line "
153  << lineNumber << " of input";
154  throw std::invalid_argument (os.str ());
155  }
156  }
157  realPart = the_realPart;
158  imagPart = the_imagPart;
159  return true;
160 }
161 
162 
172 template<class SC,
173  class GO,
174  const bool isComplex = ::Kokkos::Details::ArithTraits<SC>::is_complex>
175 struct ReadLine {
196  static int
197  readLine (std::function<int (const GO, const GO, const SC&)> processTriple,
198  const std::string& line,
199  const std::size_t lineNumber,
200  const bool tolerant = false,
201  std::ostream* errStrm = NULL,
202  const bool debug = false);
203 };
204 
212 template<class SC, class GO>
213 struct ReadLine<SC, GO, true> {
234  static int
235  readLine (std::function<int (const GO, const GO, const SC&)> processTriple,
236  const std::string& line,
237  const std::size_t lineNumber,
238  const bool tolerant = false,
239  std::ostream* errStrm = NULL,
240  const bool debug = false)
241  {
242  using ::Teuchos::MatrixMarket::checkCommentLine;
243  typedef typename ::Kokkos::Details::ArithTraits<SC>::mag_type real_type;
244  using std::endl;
245 
246  GO rowInd, colInd;
247  real_type realPart, imagPart;
248  std::istringstream istr (line);
249  bool success = true;
250  try {
251  // Use the version of this function in this file, not the
252  // version in Teuchos_MatrixMarket_generic.hpp, because the
253  // latter only exists if HAVE_TEUCHOS_COMPLEX is defined.
254  success = readComplexData (istr, rowInd, colInd, realPart, imagPart,
255  lineNumber, tolerant);
256  }
257  catch (std::exception& e) {
258  success = false;
259  if (errStrm != NULL) {
260  std::ostringstream os;
261  os << "readLine: readComplexData threw an exception: " << e.what ()
262  << endl;
263  *errStrm << os.str ();
264  }
265  }
266 
267  if (success) {
268  // if (debug && errStrm != NULL) {
269  // std::ostringstream os;
270  // os << "readLine: Got entry: row=" << rowInd << ", col=" << colInd
271  // << ", realPart=" << realPart << ", imagPart=" << imagPart
272  // << std::endl;
273  // *errStrm << os.str ();
274  // }
275  // This line may have side effects.
276  const int errCode =
277  processTriple (rowInd, colInd, SC (realPart, imagPart));
278  if (errCode != 0 && errStrm != NULL) {
279  std::ostringstream os;
280  os << "readLine: processTriple returned " << errCode << " != 0."
281  << endl;
282  *errStrm << os.str ();
283  }
284  return errCode;
285  }
286  else {
287  return -1;
288  }
289  }
290 };
291 
299 template<class SC, class GO>
300 struct ReadLine<SC, GO, false> {
321  static int
322  readLine (std::function<int (const GO, const GO, const SC&)> processTriple,
323  const std::string& line,
324  const std::size_t lineNumber,
325  const bool tolerant = false,
326  std::ostream* errStrm = NULL,
327  const bool debug = false)
328  {
329  using ::Teuchos::MatrixMarket::checkCommentLine;
330  using ::Teuchos::MatrixMarket::readRealData;
331  using std::endl;
332 
333  GO rowInd, colInd;
334  SC val;
335  std::istringstream istr (line);
336  bool success = true;
337  try {
338  success = readRealData (istr, rowInd, colInd, val,
339  lineNumber, tolerant);
340  }
341  catch (std::exception& e) {
342  success = false;
343  if (errStrm != NULL) {
344  std::ostringstream os;
345  os << "readLine: readRealData threw an exception: " << e.what ()
346  << endl;
347  *errStrm << os.str ();
348  }
349  }
350 
351  if (success) {
352  if (debug && errStrm != NULL) {
353  std::ostringstream os;
354  os << "readLine: Got entry: row=" << rowInd << ", col=" << colInd
355  << ", val=" << val << std::endl;
356  *errStrm << os.str ();
357  }
358  // This line may have side effects.
359  const int errCode = processTriple (rowInd, colInd, val);
360  if (errCode != 0 && errStrm != NULL) {
361  std::ostringstream os;
362  os << "readLine: processTriple returned " << errCode << " != 0."
363  << endl;
364  *errStrm << os.str ();
365  }
366  return errCode;
367  }
368  else {
369  return -1;
370  }
371  }
372 };
373 
399 template<class SC, class GO>
400 int
401 readLine (std::function<int (const GO, const GO, const SC&)> processTriple,
402  const std::string& line,
403  const std::size_t lineNumber,
404  const bool tolerant = false,
405  std::ostream* errStrm = NULL,
406  const bool debug = false)
407 {
408  return ReadLine<SC, GO>::readLine (processTriple, line, lineNumber,
409  tolerant, errStrm, debug);
410 }
411 
442 template<class SC, class GO>
443 int
444 readTriples (std::istream& inputStream,
445  std::size_t& curLineNum,
446  std::size_t& numTriplesRead,
447  std::function<int (const GO, const GO, const SC&)> processTriple,
448  const std::size_t maxNumTriplesToRead,
449  const bool tolerant = false,
450  std::ostream* errStrm = NULL,
451  const bool debug = false)
452 {
453  using Teuchos::MatrixMarket::checkCommentLine;
454  using std::endl;
455  using std::size_t;
456 
457  numTriplesRead = 0; // output argument only
458  if (inputStream.eof ()) {
459  return 0; // no error, just nothing left to read
460  }
461  else if (inputStream.fail ()) {
462  if (errStrm != NULL) {
463  *errStrm << "Input stream reports a failure (not the same as "
464  "end-of-file)." << endl;
465  }
466  return -1;
467  }
468 
469  std::string line;
470  std::vector<size_t> badLineNumbers;
471  int errCode = 0; // 0 means success
472 
473  bool inputStreamCanStillBeRead = std::getline (inputStream, line).good ();
474  ++curLineNum; // we read the line; we can't put it back
475  while (inputStreamCanStillBeRead && numTriplesRead < maxNumTriplesToRead) {
476  // if (debug && errStrm != NULL) {
477  // std::ostringstream os;
478  // os << "readTriples: Got line: \"" << line << "\"" << std::endl;
479  // *errStrm << os.str ();
480  // }
481  size_t start, size;
482 
483  const bool isCommentLine =
484  checkCommentLine (line, start, size, curLineNum, tolerant);
485  if (isCommentLine) {
486  // Move on to the next line, if there is a next line.
487  inputStreamCanStillBeRead = std::getline (inputStream, line).good ();
488  ++curLineNum; // we read another line; we can't put it back
489  continue; // move on to the next line
490  }
491  else { // not a comment line; should have a sparse matrix entry
492  const std::string theLine = line.substr (start, size);
493  // If the line has a valid sparse matrix entry, extract it and
494  // hand it off to the processTriple closure.
495  const int curErrCode =
496  readLine (processTriple, theLine, curLineNum, tolerant, errStrm, debug);
497  if (curErrCode != 0) {
498  errCode = curErrCode;
499  badLineNumbers.push_back (curLineNum);
500  }
501  else {
502  ++numTriplesRead;
503  }
504  if (numTriplesRead < maxNumTriplesToRead) {
505  inputStreamCanStillBeRead = std::getline (inputStream, line).good ();
506  }
507  }
508  } // while there are lines to read and we need more triples
509 
510  if (errCode != 0 && errStrm != NULL) {
511  const size_t numBadLines = badLineNumbers.size ();
512  *errStrm << "Encountered " << numBadLines << " bad line"
513  << (numBadLines != size_t (1) ? "s" : "")
514  << ": [";
515  for (size_t k = 0; k < numBadLines; ++k) {
516  *errStrm << badLineNumbers[k];
517  if (k + 1 < numBadLines) {
518  *errStrm << ", ";
519  }
520  }
521  *errStrm << "]" << endl;
522  }
523  if (! inputStream.eof () && inputStream.fail ()) {
524  if (errCode == 0) {
525  errCode = -1;
526  }
527  if (errStrm != NULL) {
528  *errStrm << "The input stream is not at end-of-file, "
529  "but is in a bad state." << endl;
530  }
531  }
532  return errCode;
533 }
534 
560 template<class SC, class GO>
561 int
562 readAndSendOneBatchOfTriples (std::istream& inputStream,
563  std::size_t& curLineNum,
564  std::size_t& numEntRead,
565  ::Teuchos::ArrayRCP<int>& sizeBuf,
566  ::Teuchos::ArrayRCP<char>& msgBuf,
567  std::vector<GO>& rowInds,
568  std::vector<GO>& colInds,
569  std::vector<SC>& vals,
570  const std::size_t maxNumEntPerMsg,
571  const int destRank,
572  const ::Teuchos::Comm<int>& comm,
573  const bool tolerant = false,
574  std::ostream* errStrm = NULL,
575  const bool debug = false)
576 {
577  using ::Tpetra::Details::countPackTriplesCount;
578  using ::Tpetra::Details::countPackTriples;
579  using ::Tpetra::Details::packTriplesCount;
580  using ::Tpetra::Details::packTriples;
581  using ::Teuchos::isend;
582  using std::endl;
583 
584  using ::Kokkos::ArithTraits;
585  // constexpr int sizeTag = 42 + (ArithTraits<SC>::is_complex ? 100 : 0);
586  // constexpr int msgTag = 43 + (ArithTraits<SC>::is_complex ? 100 : 0);
587  constexpr int sizeTag = 42;
588  constexpr int msgTag = 43;
589  //constexpr int srcRank = 0;
590  int errCode = 0;
591 
592  // This doesn't actually deallocate memory; it just changes the size
593  // back to zero, so that push_back starts over from the beginning.
594  rowInds.resize (0);
595  colInds.resize (0);
596  vals.resize (0);
597  // Closure that adds the new matrix entry to the above temp arrays.
598  auto processTriple = [&rowInds, &colInds, &vals]
599  (const GO rowInd, const GO colInd, const SC& val) {
600  try {
601  rowInds.push_back (rowInd);
602  colInds.push_back (colInd);
603  vals.push_back (val);
604  }
605  catch (...) {
606  return -1;
607  }
608  return 0;
609  };
610  numEntRead = 0; // output argument
611  errCode = readTriples<SC, GO> (inputStream, curLineNum, numEntRead,
612  processTriple, maxNumEntPerMsg, tolerant,
613  errStrm, debug);
614  if (debug && errStrm != NULL) {
615  std::ostringstream os;
616  os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
617  << ", GO=" << typeid (GO).name () << ": "
618  << "readAndSendOneBatchOfTriples: readTriples read "
619  << numEntRead << " matrix entries, and returned errCode="
620  << errCode << "." << std::endl;
621  *errStrm << os.str ();
622  }
623  if (numEntRead != rowInds.size () ||
624  numEntRead != colInds.size () ||
625  numEntRead != vals.size ()) {
626  if (errStrm != NULL) {
627  *errStrm << "readTriples size results are not consistent. "
628  << "numEntRead = " << numEntRead
629  << ", rowInds.size() = " << rowInds.size ()
630  << ", colInds.size() = " << colInds.size ()
631  << ", and vals.size() = " << vals.size () << "."
632  << std::endl;
633  }
634  if (errCode == 0) {
635  errCode = -1;
636  }
637  }
638 
639  // We don't consider reading having "failed" if we've reached
640  // end-of-file before reading maxNumEntPerMsg entries. It's OK if
641  // we got fewer triples than that. Furthermore, we have to send at
642  // least one message to the destination process, even if the read
643  // from the file failed.
644 
645  if (numEntRead == 0 || errCode != 0) {
646  // Send a message size of zero to the receiving process, to tell
647  // it that we have no triples to send, or that there was an error
648  // reading. The latter just means that we "go through the
649  // motions," then broadcast the error code.
650  sizeBuf[0] = 0;
651  if (debug && errStrm != NULL) {
652  std::ostringstream os;
653  os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
654  << ", GO=" << typeid (GO).name () << ": "
655  << "Post send (size=0, errCode=" << errCode << ") "
656  << "to " << destRank << " with tag " << sizeTag << endl;
657  *errStrm << os.str ();
658  }
659  send (sizeBuf.getRawPtr (), 1, destRank, sizeTag, comm);
660  return errCode;
661  }
662  else { // we read a nonzero # of triples, without error
663  const int numEnt = static_cast<int> (numEntRead);
664  int countSize = 0; // output argument
665  int triplesSize = 0; // output argument
666 
667  errCode = countPackTriplesCount (comm, countSize, errStrm);
668  // countSize should never be nonpositive, since we have to pack an
669  // integer size.
670  if (countSize <= 0 && errCode == 0) {
671  errCode = -1;
672  }
673 
674  if (errCode != 0) {
675  // Send zero to the receiving process, to tell it about the error.
676  sizeBuf[0] = 0;
677  if (debug && errStrm != NULL) {
678  std::ostringstream os;
679  os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
680  << ", GO=" << typeid (GO).name () << ": "
681  << "Post send (size=0, error case) to " << destRank
682  << " with tag " << sizeTag << endl;
683  *errStrm << os.str ();
684  }
685  send (sizeBuf.getRawPtr (), 1, destRank, sizeTag, comm);
686  return errCode;
687  }
688  else { // countPackTriplesCount succeeded
689  errCode = countPackTriples<SC, GO> (numEnt, comm, triplesSize, errStrm);
690  if (errCode != 0) {
691  // Send a message size of zero to the receiving process, to
692  // tell it that there was an error counting.
693  sizeBuf[0] = 0;
694  if (debug && errStrm != NULL) {
695  std::ostringstream os;
696  os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
697  << ", GO=" << typeid (GO).name () << ": "
698  << "Post send (size=0, error case) to " << destRank
699  << " with tag " << sizeTag << endl;
700  *errStrm << os.str ();
701  }
702  send (sizeBuf.getRawPtr (), 1, destRank, sizeTag, comm);
703  return errCode;
704  }
705  else { // countPackTriples succeeded; message packed & ready to send
706  // Send the message size (in bytes). We can use a nonblocking
707  // send here, and try to overlap with message packing.
708  const int outBufSize = countSize + triplesSize;
709  sizeBuf[0] = outBufSize;
710  if (debug && errStrm != NULL) {
711  std::ostringstream os;
712  os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
713  << ", GO=" << typeid (GO).name () << ": "
714  << "Post isend (size=" << sizeBuf[0] << ") to " << destRank
715  << " with tag " << sizeTag << endl;
716  *errStrm << os.str ();
717  }
718  auto sizeReq = isend<int, int> (sizeBuf, destRank, sizeTag, comm);
719 
720  msgBuf.resize (outBufSize);
721  char* outBuf = msgBuf.getRawPtr ();
722 
723  // If anything goes wrong with packing, send the pack buffer
724  // anyway, since the receiving process expects a message.
725  int outBufCurPos = 0; // input/output argument
726  errCode = packTriplesCount (numEnt, outBuf, outBufSize,
727  outBufCurPos, comm, errStrm);
728  if (errCode == 0) {
729  errCode = packTriples<SC, GO> (rowInds.data (), colInds.data (),
730  vals.data (), numEnt, outBuf,
731  outBufSize, outBufCurPos, comm,
732  errStrm);
733  }
734  if (debug && errStrm != NULL) {
735  std::ostringstream os;
736  os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
737  << ", GO=" << typeid (GO).name () << ": "
738  << "Post isend (packed data) to " << destRank
739  << " with tag " << msgTag << endl;
740  *errStrm << os.str ();
741  }
742  auto msgReq = isend<int, char> (msgBuf, destRank, msgTag, comm);
743 
744  // Wait on the two messages. It doesn't matter in what order
745  // we send them, because they have different tags. The
746  // receiving process will wait on the first message first, in
747  // order to get the size of the second message.
748  if (debug && errStrm != NULL) {
749  std::ostringstream os;
750  os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
751  << ", GO=" << typeid (GO).name () << ": "
752  << "Wait on isend (size)" << endl;
753  *errStrm << os.str ();
754  }
755  sizeReq->wait ();
756  if (debug && errStrm != NULL) {
757  std::ostringstream os;
758  os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
759  << ", GO=" << typeid (GO).name () << ": "
760  << "Wait on isend (packed data)" << endl;
761  *errStrm << os.str ();
762  }
763  msgReq->wait ();
764 
765  // This doesn't actually deallocate; it just resets sizes to zero.
766  rowInds.clear ();
767  colInds.clear ();
768  vals.clear ();
769  }
770  }
771  }
772  return errCode;
773 }
774 
782 
810 template<class SC, class GO, class CommRequestPtr>
811 int
812 recvOneBatchOfTriples (std::vector<GO>& rowInds,
813  std::vector<GO>& colInds,
814  std::vector<SC>& vals,
815  int& numEnt,
816  ::Teuchos::ArrayRCP<int>& sizeBuf,
817  ::Teuchos::ArrayRCP<char>& msgBuf,
818  CommRequestPtr& sizeReq,
819  const int srcRank,
820  const ::Teuchos::Comm<int>& comm,
821  const bool tolerant = false,
822  std::ostream* errStrm = NULL,
823  const bool debug = false)
824 {
825  using ::Tpetra::Details::unpackTriplesCount;
826  using ::Tpetra::Details::unpackTriples;
827  using ::Kokkos::ArithTraits;
828 
830  //constexpr int msgTag = 43 + (ArithTraits<SC>::is_complex ? 100 : 0);
831  //constexpr int sizeTag = 42;
832  constexpr int msgTag = 43;
833  int errCode = 0; // return value
834  numEnt = 0; // output argument
835 
836  // Wait on the ireceive we preposted before calling this function.
837  if (debug && errStrm != NULL) {
838  std::ostringstream os;
839  os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
840  << ", GO=" << typeid (GO).name () << ": "
841  << "Wait on irecv (size)" << std::endl;
842  *errStrm << os.str ();
843  }
844  sizeReq->wait ();
845  sizeReq = CommRequestPtr (NULL);
846  const int inBufSize = sizeBuf[0];
847  if (debug && errStrm != NULL) {
848  std::ostringstream os;
849  os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
850  << ", GO=" << typeid (GO).name () << ": "
851  << "Received size: sizeBuf[0]=" << sizeBuf[0] << std::endl;
852  *errStrm << os.str ();
853  }
854 
855  if (inBufSize == 0) {
856  numEnt = 0;
857  rowInds.resize (0);
858  colInds.resize (0);
859  vals.resize (0);
860  }
861  else {
862  msgBuf.resize (inBufSize);
863  char* inBuf = msgBuf.getRawPtr ();
864 
865  if (debug && errStrm != NULL) {
866  std::ostringstream os;
867  os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
868  << ", GO=" << typeid (GO).name () << ": "
869  << "Post irecv (packed data) " << "from " << srcRank
870  << " with tag " << msgTag << std::endl;
871  *errStrm << os.str ();
872  }
873  auto msgReq = ::Teuchos::ireceive (msgBuf, srcRank, msgTag, comm);
874  if (debug && errStrm != NULL) {
875  std::ostringstream os;
876  os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
877  << ", GO=" << typeid (GO).name () << ": "
878  << "Wait on irecv (packed data)" << std::endl;
879  *errStrm << os.str ();
880  }
881  msgReq->wait ();
882 
883  int inBufCurPos = 0; // output argument
884  errCode = unpackTriplesCount (inBuf, inBufSize, inBufCurPos,
885  numEnt, comm, errStrm);
886  if (errCode == 0) {
887  rowInds.resize (numEnt);
888  colInds.resize (numEnt);
889  vals.resize (numEnt);
890  errCode = unpackTriples<SC, GO> (inBuf, inBufSize, inBufCurPos,
891  rowInds.data (), colInds.data (),
892  vals.data (), numEnt, comm, errStrm);
893  }
894  }
895  return errCode;
896 }
897 
898 } // namespace Impl
899 
900 //
901 // SKIP DOWN TO HERE FOR "PUBLIC" INTERFACE
902 //
903 
937 template<class SC, class GO>
938 int
939 readAndDealOutTriples (std::istream& inputStream, // only valid on Proc 0
940  std::size_t& curLineNum, // only valid on Proc 0
941  std::size_t& totalNumEntRead, // only valid on Proc 0
942  std::function<int (const GO, const GO, const SC&)> processTriple,
943  const std::size_t maxNumEntPerMsg,
944  const ::Teuchos::Comm<int>& comm,
945  const bool tolerant = false,
946  std::ostream* errStrm = NULL,
947  const bool debug = false)
948 {
949  using Kokkos::ArithTraits;
950  using std::endl;
951  using std::size_t;
952 
953  constexpr int srcRank = 0;
954  //constexpr int sizeTag = 42 + (ArithTraits<SC>::is_complex ? 100 : 0);
956  constexpr int sizeTag = 42;
957  //constexpr int msgTag = 43;
958  const int myRank = comm.getRank ();
959  const int numProcs = comm.getSize ();
960  int errCode = 0;
961 
962  ::Teuchos::ArrayRCP<int> sizeBuf (1);
963  ::Teuchos::ArrayRCP<char> msgBuf; // to be resized as needed
964 
965  // Temporary storage for reading & packing (on Process srcRank) or
966  // unpacking (every other process) triples.
967  std::vector<GO> rowInds;
968  std::vector<GO> colInds;
969  std::vector<SC> vals;
970  rowInds.reserve (maxNumEntPerMsg);
971  colInds.reserve (maxNumEntPerMsg);
972  vals.reserve (maxNumEntPerMsg);
973 
974  totalNumEntRead = 0;
975  if (myRank == srcRank) {
976  // Loop around through all the processes, including this one, over
977  // and over until we reach the end of the file, or an error occurs.
978  int destRank = 0;
979  bool lastMessageWasLegitZero = false;
980  for ( ;
981  ! inputStream.eof () && errCode == 0;
982  destRank = (destRank + 1) % numProcs) {
983 
984  size_t curNumEntRead = 0; // output argument of below
985  if (destRank == srcRank) {
986  // We can read and process the triples directly. We don't
987  // need to use intermediate storage, because we don't need to
988  // pack and send the triples.
989  const int readErrCode =
990  Impl::readTriples<SC, GO> (inputStream, curLineNum, curNumEntRead,
991  processTriple, maxNumEntPerMsg, tolerant,
992  errStrm, debug);
993  if (debug && errStrm != NULL) {
994  std::ostringstream os;
995  os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
996  << ", GO=" << typeid (GO).name () << ": "
997  << "(dest=src) readTriples returned curNumEntRead="
998  << curNumEntRead << ", errCode=" << readErrCode << endl;
999  *errStrm << os.str ();
1000  }
1001  errCode = (readErrCode != 0) ? readErrCode : errCode;
1002  }
1003  else {
1004  if (false && debug && errStrm != NULL) {
1005  std::ostringstream os;
1006  os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
1007  << ", GO=" << typeid (GO).name () << ": "
1008  << "Calling readAndSend... with destRank=" << destRank << endl;
1009  *errStrm << os.str ();
1010  }
1011  // Read, pack, and send the triples to destRank.
1012  const int readAndSendErrCode =
1013  Impl::readAndSendOneBatchOfTriples<SC, GO> (inputStream, curLineNum,
1014  curNumEntRead,
1015  sizeBuf, msgBuf,
1016  rowInds, colInds, vals,
1017  maxNumEntPerMsg, destRank,
1018  comm, tolerant, errStrm,
1019  debug);
1020  totalNumEntRead += curNumEntRead;
1021  if (debug && errStrm != NULL) {
1022  std::ostringstream os;
1023  os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
1024  << ", GO=" << typeid (GO).name () << ": "
1025  << "readAndSend... with destRank=" << destRank
1026  << " returned curNumEntRead=" << curNumEntRead
1027  << ", errCode=" << readAndSendErrCode << endl;
1028  *errStrm << os.str ();
1029  }
1030  errCode = (readAndSendErrCode != 0) ? readAndSendErrCode : errCode;
1031  if (readAndSendErrCode == 0 && curNumEntRead == 0) {
1032  lastMessageWasLegitZero = true;
1033  if (debug && errStrm != NULL) {
1034  std::ostringstream os;
1035  os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
1036  << ", GO=" << typeid (GO).name () << ": "
1037  << "Last send to " << destRank << " with tag " << sizeTag
1038  << " was legit zero, counts as termination" << endl;
1039  *errStrm << os.str ();
1040  }
1041  }
1042  }
1043  } // loop around through processes until done reading file, or error
1044 
1045  // Loop around through the remaining processes, and tell them that
1046  // we're done, by sending zero. If the last message we sent to
1047  // destRank was zero, then skip that process, since it only
1048  // expects one message of size zero. Note that destRank got
1049  // incremented mod numProcs at end of loop, so we have to
1050  // decrement it mod numProcs.
1051  destRank = (destRank - 1) % numProcs;
1052  if (destRank < 0) { // C mod operator does not promise positivity
1053  destRank = destRank + numProcs;
1054  }
1055 
1056  const int startRank = lastMessageWasLegitZero ? (destRank+1) : destRank;
1057  for (int outRank = startRank; outRank < numProcs; ++outRank) {
1058  if (outRank != srcRank) {
1059  if (debug && errStrm != NULL) {
1060  std::ostringstream os;
1061  os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
1062  << ", GO=" << typeid (GO).name () << ": "
1063  << "Post send (size, termination msg) to " << outRank
1064  << " with tag " << sizeTag << "(was last message legit zero? "
1065  << (lastMessageWasLegitZero ? "true" : "false") << ")" << endl;
1066  *errStrm << os.str ();
1067  }
1068  sizeBuf[0] = 0;
1069  ::Teuchos::send (sizeBuf.getRawPtr (), 1, outRank, sizeTag, comm);
1070  }
1071  }
1072  }
1073  else {
1074  while (true) {
1075  // Prepost a message to receive the size (in bytes) of the
1076  // incoming packet.
1077  sizeBuf[0] = 0; // superfluous, but safe
1078  if (debug && errStrm != NULL) {
1079  std::ostringstream os;
1080  os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
1081  << ", GO=" << typeid (GO).name () << ": "
1082  << "Post irecv (size) from " << srcRank
1083  << " with tag " << sizeTag << std::endl;
1084  *errStrm << os.str ();
1085  }
1086  auto sizeReq = ::Teuchos::ireceive (sizeBuf, srcRank, sizeTag, comm);
1087 
1088  int numEnt = 0; // output argument
1089  const int recvErrCode =
1090  Impl::recvOneBatchOfTriples (rowInds, colInds, vals, numEnt, sizeBuf,
1091  msgBuf, sizeReq, srcRank, comm, tolerant,
1092  errStrm, debug);
1093  if (debug && errStrm != NULL) {
1094  std::ostringstream os;
1095  os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
1096  << ", GO=" << typeid (GO).name () << ": "
1097  << "recvOneBatchOfTriples returned numEnt=" << numEnt
1098  << ", errCode=" << recvErrCode << endl;
1099  *errStrm << os.str ();
1100  }
1101  errCode = (recvErrCode != 0) ? recvErrCode : errCode;
1102 
1103  if (numEnt != static_cast<int> (rowInds.size ()) ||
1104  numEnt != static_cast<int> (colInds.size ()) ||
1105  numEnt != static_cast<int> (vals.size ())) {
1106  errCode = (errCode == 0) ? -1 : errCode;
1107  if (errStrm != NULL) {
1108  *errStrm << "recvOneBatchOfTriples produced inconsistent data sizes. "
1109  << "numEnt = " << numEnt
1110  << ", rowInds.size() = " << rowInds.size ()
1111  << ", colInds.size() = " << colInds.size ()
1112  << ", vals.size() = " << vals.size () << "."
1113  << endl;
1114  }
1115  } // if sizes inconsistent
1116 
1117  // Sending zero items is how Process srcRank tells this process
1118  // that it (Process srcRank) is done sending out data.
1119  if (numEnt == 0) {
1120  break;
1121  }
1122 
1123  for (int k = 0; k < numEnt && errCode == 0; ++k) {
1124  const int curErrCode = processTriple (rowInds[k], colInds[k], vals[k]);
1125  errCode = (curErrCode == 0) ? errCode : curErrCode;
1126  }
1127  } // while we still get messages from srcRank
1128  }
1129 
1130  if (debug && errStrm != NULL) {
1131  std::ostringstream os;
1132  os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
1133  << ", GO=" << typeid (GO).name () << ": "
1134  << "Done with send/recv loop" << endl;
1135  *errStrm << os.str ();
1136  }
1137  // Do a bitwise OR to get an error code that is nonzero if and only
1138  // if any process' local error code is nonzero.
1139  using ::Teuchos::outArg;
1140  using ::Teuchos::REDUCE_BOR;
1141  using ::Teuchos::reduceAll;
1142  const int lclErrCode = errCode;
1143  reduceAll<int, int> (comm, REDUCE_BOR, lclErrCode, outArg (errCode));
1144  return errCode;
1145 }
1146 
1147 } // namespace Details
1148 } // namespace Tpetra
1149 
1150 #endif // TPETRA_DETAILS_READTRIPLES_HPP
int packTriplesCount(const int, char[], const int, int &, const ::Teuchos::Comm< int > &, std::ostream *errStrm)
Pack the count (number) of matrix triples.
static int readLine(std::function< int(const GO, const GO, const SC &)> processTriple, const std::string &line, const std::size_t lineNumber, const bool tolerant=false, std::ostream *errStrm=NULL, const bool debug=false)
Take a line from the Matrix Market file or input stream, and process the sparse matrix entry in that ...
int readAndDealOutTriples(std::istream &inputStream, std::size_t &curLineNum, std::size_t &totalNumEntRead, std::function< int(const GO, const GO, const SC &)> processTriple, const std::size_t maxNumEntPerMsg, const ::Teuchos::Comm< int > &comm, const bool tolerant=false, std::ostream *errStrm=NULL, const bool debug=false)
On Process 0 in the given communicator, read sparse matrix entries (in chunks of at most maxNumEntPer...
Implementation of the readLine stand-alone function in this namespace (see below).
int unpackTriplesCount(const char[], const int, int &, int &, const ::Teuchos::Comm< int > &, std::ostream *errStrm)
Unpack just the count of triples from the given input buffer.
static int readLine(std::function< int(const GO, const GO, const SC &)> processTriple, const std::string &line, const std::size_t lineNumber, const bool tolerant=false, std::ostream *errStrm=NULL, const bool debug=false)
Take a line from the Matrix Market file or input stream, and process the sparse matrix entry in that ...
int countPackTriplesCount(const ::Teuchos::Comm< int > &, int &size, std::ostream *errStrm)
Compute the buffer size required by packTriples for packing the number of matrix entries (&quot;triples&quot;)...
static int readLine(std::function< int(const GO, const GO, const SC &)> processTriple, const std::string &line, const std::size_t lineNumber, const bool tolerant=false, std::ostream *errStrm=NULL, const bool debug=false)
Take a line from the Matrix Market file or input stream, and process the sparse matrix entry in that ...