Zoltan2
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
Zoltan2_Directory_Comm.cpp
Go to the documentation of this file.
1 /*
2  * @HEADER
3  *
4  * ***********************************************************************
5  *
6  * Zoltan2 Directory for Load-balancing, Partitioning, Ordering and Coloring
7  * Copyright 2012 Sandia Corporation
8  *
9  * Under the terms of Contract DE-AC04-94AL85000 with Sandia Corporation,
10  * the U.S. Government retains certain rights in this software.
11  *
12  * Redistribution and use in source and binary forms, with or without
13  * modification, are permitted provided that the following conditions are
14  * met:
15  *
16  * 1. Redistributions of source code must retain the above copyright
17  * notice, this list of conditions and the following disclaimer.
18  *
19  * 2. Redistributions in binary form must reproduce the above copyright
20  * notice, this list of conditions and the following disclaimer in the
21  * documentation and/or other materials provided with the distribution.
22  *
23  * 3. Neither the name of the Corporation nor the names of theremove_local
24  * contributors may be used to endorse or promote products derived from
25  * this software without specific prior written permission.
26  *
27  * THIS SOFTWARE IS PROVIDED BY SANDIA CORPORATION "AS IS" AND ANY
28  * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
29  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
30  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SANDIA CORPORATION OR THE
31  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
32  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
33  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
34  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
35  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
36  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
37  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38  *
39  * Questions? Contact Karen Devine kddevin@sandia.gov
40  * Erik Boman egboman@sandia.gov
41  *
42  * ***********************************************************************
43  *
44  * @HEADER
45  */
46 
48 #include <stdexcept>
49 #include <memory>
50 
51 namespace Zoltan2 {
52 
54  total_recv_size = 0;
55  for (int i = 0; i < from->nsends + from->self_msg; i++) {
56  total_recv_size += from->lengths_to[i];
57  }
58 
59  max_send_size = 0;
60  for (int i = 0; i < from->nrecvs; i++) {
61  if (from->lengths_from[i] > max_send_size) {
62  max_send_size = from->lengths_from[i];
63  }
64  }
65 
66  nvals = from->nvals_recv;
67  nvals_recv = from->nvals;
68  lengths_to = from->lengths_from;
69  procs_to = from->procs_from;
70  indices_to = from->indices_from;
71 
72  starts_to = from->starts_from;
73  lengths_from = from->lengths_to;
74  procs_from = from->procs_to;
75  indices_from = from->indices_to;
76 
77  starts_from = from->starts_to;
78  nrecvs = from->nsends;
79  nsends = from->nrecvs;
80  self_msg = from->self_msg;
81  comm = from->comm;
82 }
83 
84 
85 void Zoltan2_Directory_Plan::print(const std::string& headerMessage) const {
86 
87  #define PRINT_VECTOR(v) \
88  if(v != Teuchos::null) { \
89  std::cout << " " << #v << " "; \
90  for(Teuchos::ArrayRCP<int>::size_type n = 0; n < v.size(); ++n) { \
91  std::cout << v[n] << " "; \
92  } \
93  std::cout << std::endl; \
94  }
95 
96  #define PRINT_VAL(val) std::cout << " " << #val << ": " << val << std::endl;
97 
98  for(int proc = 0; proc < comm->getSize(); ++proc) {
99  comm->barrier();
100  if(proc == comm->getRank()) {
101 
102  std::cout << "Rank " << proc << " " << headerMessage << std::endl;
103 
119 
128  }
129  }
130  comm->barrier();
131 }
132 
134  int nvals, /* number of values I currently own */
135  const Teuchos::ArrayRCP<int> &assign, /* processor assignment for all my values */
136  Teuchos::RCP<const Teuchos::Comm<int> > comm, /* communicator */
137  int tag) : /* message tag I can use */
138  comm_(comm),
139  plan_forward(NULL)
140 {
141  if (comm == Teuchos::null){
142  throw std::logic_error("Invalid communicator: MPI_COMM_NULL.");
143  }
144 
145  int my_proc = comm->getRank(); /* my processor tag in communicator */
146  int nprocs = comm->getSize(); /* number of processors in communicator */
147 
148  /* First check to see if items are grouped by processor with no gaps. */
149  /* If so, indices_to should be NULL (= identity) */
150 
151  /* Make data structures that will allow me to traverse arrays quickly. */
152  /* Begin by determining number of objects I'm sending to each processor. */
153  Teuchos::ArrayRCP<int> starts(new int[nprocs + 1], 0, nprocs + 1, true);
154  for(int n = 0; n < starts.size(); ++n) {
155  starts[n] = 0;
156  }
157 
158  /* Note: Negative assign value means ignore item. */
159  /* Non-trailing negatives mean data not packed so need send_buf. */
160  /* Could (but don't) allow negatives between processor blocks w/o buf. */
161  int nactive = 0; /* number of values to remap */
162 
163  int no_send_buff = 1; /* is data nicely grouped by processor? */
164 
165  int prev_proc = nprocs; /* processor on previous loop pass */
166 
167  for (int i = 0; i < nvals; i++) {
168  int proc = assign[i];
169  if (no_send_buff && proc != prev_proc) { /* Checks if blocked by proc */
170  if (proc >= 0 && (starts[proc] || prev_proc < 0)) {
171  no_send_buff = 0;
172  }
173  else {
174  prev_proc = proc;
175  }
176  }
177  if (proc >= 0) {
178  ++starts[proc];
179  ++nactive;
180  }
181  }
182 
183  int self_msg = (starts[my_proc] != 0); /* do I have data for myself? */
184 
185  Teuchos::ArrayRCP<int> lengths_to; /* lengths I'll send to */
186  Teuchos::ArrayRCP<int> procs_to; /* processors I'll send to */
187  Teuchos::ArrayRCP<int> starts_to; /* where in list my sends begin */
188  Teuchos::ArrayRCP<int> indices_to; /* local_id values I'll be sending */
189 
190  int max_send_size = 0; /* size of longest message I send */
191  int nsends = 0; /* # procs I'll send to (including self) */
192  int nrecvs = 0; /* # procs I'll recv from (including self) */
193 
194  if (no_send_buff) {
195  /* Grouped by processor. Array indices_to can be NULL (= identity) */
196  nsends = 0;
197  for (int i = 0; i < nprocs; i++) {
198  if (starts[i] != 0) ++nsends;
199  }
200 
201  lengths_to.resize(nsends);
202  starts_to.resize(nsends);
203  procs_to.resize(nsends);
204 
205  int index = 0; /* index into list of objects */
206  /* Note that procs_to is in the order the data was passed in. */
207  for (int i = 0; i < nsends; i++) {
208  starts_to[i] = index;
209  int proc = assign[index];
210  procs_to[i] = proc;
211  index += starts[proc];
212  }
213 
214  /* Now sort the outgoing procs. */
215  /* This keeps recvs deterministic if I ever invert communication */
216  /* It also allows for better balance of traffic in comm_do */
217  sort_ints(procs_to, starts_to);
218 
219  max_send_size = 0;
220  for (int i = 0; i < nsends; i++) {
221  int proc = procs_to[i];
222  lengths_to[i] = starts[proc];
223  if (proc != my_proc && lengths_to[i] > max_send_size) {
224  max_send_size = lengths_to[i];
225  }
226  }
227  }
228  else { /* Not grouped by processor. More complex data structures. */
229  /* Sum starts values to be offsets into indices_to array. */
230  nsends = (starts[0] != 0);
231  for (int i = 1; i < nprocs; i++) {
232  if (starts[i] != 0)
233  ++nsends;
234  starts[i] += starts[i - 1];
235  }
236 
237  for (int i = nprocs - 1; i; i--) {
238  starts[i] = starts[i - 1];
239  }
240 
241  starts[0] = 0;
242 
243  indices_to = (nactive > 0) ?
244  Teuchos::arcp(new int[nactive], 0, nactive, true) : Teuchos::null;
245 
246  for (int i = 0; i < nvals; i++) {
247  int proc = assign[i];
248  if (proc >= 0) {
249  indices_to[starts[proc]] = i;
250  ++starts[proc];
251  }
252  }
253 
254  /* Indices_to array now has the data in clumps for each processor. */
255  /* Now reconstruct starts array to index into indices_to. */
256  for (int i = nprocs - 1; i; i--) {
257  starts[i] = starts[i - 1];
258  }
259  starts[0] = 0;
260  starts[nprocs] = nactive;
261 
262  /* Construct lengths_to, starts_to and procs_to arrays. */
263  /* Note: If indices_to is needed, procs are in increasing order */
264  lengths_to.resize(nsends);
265  starts_to.resize(nsends);
266  procs_to.resize(nsends);
267 
268  int j = 0;
269  max_send_size = 0;
270  for (int i = 0; i < nprocs; i++) {
271  if (starts[i + 1] != starts[i]) {
272  starts_to[j] = starts[i];
273  lengths_to[j] = starts[i + 1] - starts[i];
274  if (i != my_proc && lengths_to[j] > max_send_size) {
275  max_send_size = lengths_to[j];
276  }
277  procs_to[j] = i;
278  j++;
279  }
280  }
281  }
282 
283  /* Now change nsends to count only non-self messages */
284  nsends -= self_msg;
285 
286  /* Determine how many messages & what length I'll receive. */
287  Teuchos::ArrayRCP<int> lengths_from; /* lengths of messages I'll receive */
288  Teuchos::ArrayRCP<int> procs_from; /* processors I'll receive from */
289  int out_of_mem = 0; // TODO refactor this bit
290 
291  int comm_flag = invert_map(lengths_to, procs_to, nsends, self_msg,
292  lengths_from, procs_from, &nrecvs, my_proc, nprocs,
293  out_of_mem, tag, comm);
294 
295  /* pointers for where to put recv data */
296  Teuchos::ArrayRCP<int> starts_from(
297  new int[nrecvs + self_msg], 0, nrecvs + self_msg, true);
298  int j = 0;
299  for (int i = 0; i < nrecvs + self_msg; i++) {
300  starts_from[i] = j;
301  j += lengths_from[i];
302  }
303 
304  if (comm_flag != 0) {
305  throw std::logic_error("Failed to construct Zoltan2_Directory_Comm");
306  }
307 
308  int total_recv_size = 0; /* total size of messages I recv */
309  for (int i = 0; i < nrecvs + self_msg; i++) {
310  total_recv_size += lengths_from[i];
311  }
312 
313  plan_forward = new Zoltan2_Directory_Plan;
314  plan_forward->lengths_to = lengths_to;
315  plan_forward->starts_to = starts_to;
316  plan_forward->procs_to = procs_to;
317  plan_forward->indices_to = indices_to;
318  plan_forward->lengths_from = lengths_from;
319  plan_forward->starts_from = starts_from;
320  plan_forward->procs_from = procs_from;
321  plan_forward->nvals = nvals;
322  plan_forward->nvals_recv = total_recv_size;
323  plan_forward->nrecvs = nrecvs;
324  plan_forward->nsends = nsends;
325  plan_forward->self_msg = self_msg;
326  plan_forward->max_send_size = max_send_size;
327  plan_forward->total_recv_size = total_recv_size;
328  plan_forward->maxed_recvs = 0;
329  plan_forward->comm = comm;
330 
331  if (MPI_RECV_LIMIT > 0) {
332 
333  throw std::logic_error("Zoltan2_Directory_Comm.coom untested refactored code (1)"); // needs unit testing
334 
335  /* If we have a limit to the number of posted receives we are allowed,
336  ** and our plan has exceeded that, then switch to an MPI_Alltoallv so
337  ** that we will have fewer receives posted when we do the communication.
338  */
339  int global_nrecvs;
340  Teuchos::reduceAll(*comm, Teuchos::REDUCE_MAX, 1, &nrecvs, &global_nrecvs);
341  if (global_nrecvs > MPI_RECV_LIMIT){
342  plan_forward->maxed_recvs = 1;
343  }
344  }
345 
346  if (plan_forward->maxed_recvs == 0) {
347  // See notes in header for MPI_Request
348  plan_forward->request.resize(plan_forward->nrecvs);
349  }
350 
351  nrec = total_recv_size;
352 }
353 
355 {
356  delete plan_forward;
357 }
358 
359 int Zoltan2_Directory_Comm::invert_map(
360  const Teuchos::ArrayRCP<int> &lengths_to, /* number of items I'm sending */
361  const Teuchos::ArrayRCP<int> &procs_to, /* procs I send to */
362  int nsends, /* number of messages I'll send */
363  int self_msg, /* do I copy data to myself? */
364  Teuchos::ArrayRCP<int> &lengths_from, /* number of items I'm receiving */
365  Teuchos::ArrayRCP<int> &procs_from, /* procs I recv lengths from */
366  int *pnrecvs, /* number of messages I receive */
367  int my_proc, /* my processor number */
368  int nprocs, /* total number of processors */
369  int /* out_of_mem */, /* tell everyone I'm out of memory? */
370  int tag, /* message tag I can use */
371  Teuchos::RCP<const Teuchos::Comm<int> > comm) /* communicator */
372 {
373  Teuchos::ArrayRCP<int> msg_count(new int[nprocs], 0, nprocs, true);
374  Teuchos::ArrayRCP<int> counts(new int[nprocs], 0, nprocs, true);
375  for(int i = 0; i < nprocs; ++i) {
376  msg_count[i] = 0;
377  counts[i] = 1;
378  }
379 
380  for (int i = 0; i < nsends + self_msg; i++) {
381  msg_count[procs_to[i]] = 1;
382  }
383 
384  /*
385  * KDDKDD: Replaced MPI_Reduce_scatter with MPI_Reduce and MPI_Scatter
386  * KDDKDD: to avoid reported problems with MPICH 1.5.2.1.
387  * KDDKDD: Some sort of MPI_TYPE_INDEXED error.
388  * KDDKDD: Bug fix suggested by Clark Dohrmann and Rob Hoekstra.
389  * KDDKDD: July 20, 2004
390 
391  MPI_Reduce_scatter((void *) msg_count, (void *) &nrecvs, counts, MPI_INT,
392  MPI_SUM, comm);
393  */
394  Teuchos::reduceAll<int>(*comm, Teuchos::REDUCE_SUM, nprocs,
395  msg_count.getRawPtr(), counts.getRawPtr());
396 
397  int nrecvs = 0; /* number of messages I'll receive */
398 
399  Teuchos::scatter<int, int>(&(counts[0]), 1, &nrecvs, 1, 0, *comm);
400 
401  int max_nrecvs = 0;
402  if (my_proc == 0) {
403  for (int i=0; i < nprocs; i++) {
404  if (counts[i] > max_nrecvs) {
405  max_nrecvs = counts[i];
406  }
407  }
408  }
409 
410  Teuchos::broadcast(*comm, 0, &max_nrecvs);
411 
412  if(nrecvs > 0) {
413  lengths_from.resize(nrecvs); /* number of items I'm receiving */
414  procs_from.resize(nrecvs); /* processors I'll receive from */
415  }
416 
417  if (MPI_RECV_LIMIT == 0 || max_nrecvs <= MPI_RECV_LIMIT) {
418  // See notes in header for MPI_Request
419  Teuchos::ArrayRCP<Teuchos::RCP<Teuchos::CommRequest<int> > > requests(nrecvs);
420 
421  /* Note: I'm counting on having a unique tag or some of my incoming */
422  /* messages might get confused with others. */
423  for (int i=0; i < nrecvs; i++) {
424 #ifdef HAVE_MPI // Teuchos::ireceive not implemented for Serial - Serial is just for debugging
425  Teuchos::ArrayRCP<int> single_elem(&lengths_from[i], 0, 1, false);
426  requests[i] = Teuchos::ireceive(single_elem, MPI_ANY_SOURCE, tag, *comm);
427 #endif
428  }
429 
430  for (int i=0; i < nsends+self_msg; i++) {
431 #ifdef HAVE_MPI // Teuchos::send not implemented for Serial - Serial is just for debugging
432  Teuchos::send(&lengths_to[i], 1, procs_to[i], tag, *comm);
433 #endif
434  }
435 
436  for (int i=0; i < nrecvs; i++) {
437 #ifdef HAVE_MPI
438  procs_from[i] = requests[i]->wait()->getSourceRank();
439 #else
440  // above Teuchos MPI calls not supported for Serial so manually do the transfer.
441  // We don't really need Serial for this class but it helps with debugging to have a serial test that can run.
442  lengths_from[i] = lengths_to[i];
443 #endif
444  }
445 
446  }
447  else { /* some large HPC machines have a limit on number of posted receives */
448  Teuchos::ArrayRCP<int> sendbuf(new int[nprocs], 0, nprocs, true);
449  Teuchos::ArrayRCP<int> recvbuf(new int[nprocs], 0, nprocs, true);
450 
451  for (int i=0; i < nsends + self_msg; i++) {
452  sendbuf[procs_to[i]] = lengths_to[i];
453  }
454 
455  throw std::logic_error("Zoltan2_Directory_Comm.coom untested refactored code (2)"); // needs unit testing
456  // Did not refactor this - need Teuchos form but this is not tested code.
457  // MPI_Alltoall(&(sendbuf[0]), 1, MPI_INT, &(recvbuf[0]), 1, MPI_INT, getRawComm());
458 
459  for (int i=0, j=0; i < nprocs; i++) {
460  if (recvbuf[i] > 0){
461  lengths_from[j] = recvbuf[i];
462  procs_from[j] = i;
463  if (++j == nrecvs) {
464  break;
465  }
466  }
467  }
468  }
469 
470  /* Sort recv lists to keep execution deterministic (e.g. for debugging) */
471  sort_ints(procs_from, lengths_from);
472 
473  *pnrecvs = nrecvs - self_msg; /* Only return number of true messages */
474 
475  return 0;
476 }
477 
478 int Zoltan2_Directory_Comm::sort_ints(
479  Teuchos::ArrayRCP<int> &vals_sort, /* values to be sorted */
480  Teuchos::ArrayRCP<int> &vals_other) /* other array to be reordered w/ sort */
481 {
482  // TODO: Check - perhaps we can skip all of these for efficiency
483  if (vals_sort == Teuchos::null || vals_sort.size() == 0) {
484  return 1;
485  }
486  if (vals_other == Teuchos::null || vals_other.size() == 0) {
487  return 1;
488  }
489  if (vals_sort == Teuchos::null || vals_sort.size() == 1) {
490  return 0; /* fastest way to sort 1 item is to return */
491  }
492 
493  /* find largest value (sort sometimes used for non processor lists) */
494  int already_sorted = 1; /* flag indicating whether vals_sort is
495  already sorted; can exit early and skip
496  memory allocations if it is. */
497  int top = vals_sort[0]; /* largest integer to sort, smallest is assumed 0 */
498  for (Teuchos::ArrayRCP<int>::size_type i = 1; i < vals_sort.size(); i++) {
499  if (vals_sort[i-1] > vals_sort[i]) {
500  already_sorted = 0;
501  }
502  if (top < vals_sort[i]) {
503  top = vals_sort[i];
504  }
505  }
506 
507  if (already_sorted) {
508  return 0;
509  }
510 
511  Teuchos::ArrayRCP<int> store(new int[top+2], 0, top+2, true);
512  for(int n = 0; n < store.size(); ++n) {
513  store[n] = 0;
514  }
515 
516  Teuchos::ArrayRCP<int> copy_sort(new int[vals_sort.size()], 0, vals_sort.size(), true);
517  for(Teuchos::ArrayRCP<int>::size_type n = 0; n < copy_sort.size(); ++n) {
518  copy_sort[n] = vals_sort[n]; // TODO - use deepCopy method?
519  }
520 
521  Teuchos::ArrayRCP<int> copy_other(new int[vals_other.size()], 0, vals_other.size(), true);
522  for(Teuchos::ArrayRCP<int>::size_type n = 0; n < copy_other.size(); ++n) {
523  copy_other[n] = vals_other[n]; // TODO - use deepCopy method?
524  }
525 
526  // TODO: May want to modernize this ptr handling - however I didn't want
527  // to introduce inefficiencies so for now have kept the original structure
528  int *p = &(store[1]);
529  for (Teuchos::ArrayRCP<int>::size_type i = 0; i < vals_sort.size(); i++) {
530  p[copy_sort[i]]++; /* count number of occurances */
531  }
532 
533  for (int i = 1; i < top+1; i++) {
534  p[i] += p[i-1]; /* compute partial sums */
535  }
536  /* assert: p[top] = nvals */
537  p = &(store[0]); /* effectively shifts down by one */
538  for (Teuchos::ArrayRCP<int>::size_type i = 0; i < vals_sort.size(); i++) {
539  vals_sort[p[copy_sort[i]]] = copy_sort[i];
540  vals_other[p[copy_sort[i]]] = copy_other[i];
541  ++p[copy_sort[i]];
542  }
543 
544  return 0;
545 }
546 
548  int tag, /* message tag for communicating */
549  const Teuchos::ArrayRCP<char> &send_data, /* array of data I currently own */
550  int nbytes, /* msg size */
551  Teuchos::ArrayRCP<char> &recv_data) /* array of data I'll own after comm */
552 {
553  int status = 0;
554 
555  if (!plan_forward->maxed_recvs) {
556  status = do_post (plan_forward, tag, send_data, nbytes, recv_data);
557  if (status == 0) {
558  status = do_wait (plan_forward, tag, send_data, nbytes, recv_data);
559  }
560  }
561  else {
562  status = do_all_to_all(plan_forward, send_data, nbytes, recv_data);
563  }
564 
565  return status;
566 }
567 
568 int Zoltan2_Directory_Comm::do_post(
569  Zoltan2_Directory_Plan *plan, /* communication data structure */
570  int tag, /* message tag for communicating */
571  const Teuchos::ArrayRCP<char> &send_data, /* array of data I currently own */
572  int nbytes, /* msg size */
573  Teuchos::ArrayRCP<char> &recv_data) /* array of data I'll own after comm */
574 {
575  /* Check input parameters */
576  if (!plan) {
577  throw std::logic_error("Communication plan = NULL");
578  }
579 
580  /* If not point to point, currently we do synchroneous communications */
581  if (plan->maxed_recvs) {
582  throw std::logic_error("Zoltan2_Directory_Comm.coom untested refactored code (3)"); // needs unit testing
583  return do_all_to_all(plan, send_data, nbytes, recv_data);
584  }
585 
586  int my_proc = plan->comm->getRank(); /* processor ID */
587  if ((plan->nsends + plan->self_msg) && !send_data.size()) {
588  throw std::logic_error("Zoltan2_Directory_Comm.coom untested refactored code (4)"); // needs unit testing
589  size_t sum = 0;
590  if (plan->sizes_to.size()) { /* Not an error if all sizes_to == 0 */
591  for (int i = 0; i < (plan->nsends + plan->self_msg); i++) {
592  sum += plan->sizes_to[i];
593  }
594  }
595  if (!plan->sizes_to.size() || (plan->sizes_to.size() && sum)) {
596  throw std::logic_error("nsends not zero, but send_data = NULL");
597  }
598  }
599  if ((plan->nrecvs + plan->self_msg) && recv_data == Teuchos::null) {
600  throw std::logic_error("Zoltan2_Directory_Comm.coom untested refactored code (5)"); // needs unit testing
601  size_t sum = 0;
602  if (plan->sizes_from != Teuchos::null) /* Not an error if all sizes_from == 0 */
603  for (int i = 0; i < (plan->nrecvs + plan->self_msg); i++)
604  sum += plan->sizes_from[i];
605  if (!plan->sizes_from.size() || (plan->sizes_from.size() && sum)) {
606  throw std::logic_error("nrecvs not zero, but recv_data = NULL");
607  }
608  }
609 
610  /* Post irecvs */
611  if (plan->indices_from == Teuchos::null) {
612  /* Data can go directly into user space. */
613  plan->recv_buff = recv_data;
614  }
615  else { /* Need to buffer receive to reorder */
616  size_t rsize = (size_t) (plan->total_recv_size) * (size_t) nbytes;
617  plan->recv_buff = Teuchos::arcp(new char[rsize], 0, rsize, true);
618  }
619 
620  size_t self_recv_address = 0; /* where in recv_data self info starts */
621  if (!plan->using_sizes) { /* All data the same size */
622  int k = 0;
623  for (int i = 0; i < plan->nrecvs + plan->self_msg; i++) {
624  if (plan->procs_from[i] != my_proc) {
625  Teuchos::ArrayRCP<char> subview(
626  &plan->getRecvBuff().getRawPtr()[plan->starts_from[i] * nbytes],
627  0, plan->lengths_from[i] * nbytes, false);
628  plan->request[k] = Teuchos::ireceive(subview, plan->procs_from[i], tag, *comm_);
629  k++;
630  }
631  else {
632  self_recv_address = (size_t)(plan->starts_from[i]) * (size_t)nbytes;
633  }
634  }
635  }
636  else { /* Data of varying sizes */
637  int k = 0;
638  for (int i = 0; i < plan->nrecvs + plan->self_msg; i++) {
639  if (plan->procs_from[i] != my_proc) {
640  if (plan->sizes_from[i]) {
641  Teuchos::ArrayRCP<char> subview(
642  &plan->getRecvBuff().getRawPtr()[(size_t)(plan->starts_from_ptr[i])
643  * (size_t)nbytes],
644  0, plan->sizes_from[i] * nbytes, false);
645  plan->request[k] = Teuchos::ireceive(subview, plan->procs_from[i], tag, *comm_);
646  }
647  else {
648  plan->request[k] = Teuchos::null;
649  }
650  k++;
651  }
652  else {
653  self_recv_address =
654  (size_t)(plan->starts_from_ptr[i]) * (size_t)nbytes;
655  }
656  }
657  }
658 
659  Teuchos::ArrayRCP<char> send_buff;
660  if(plan->indices_to != Teuchos::null) {
661  send_buff = Teuchos::arcp(new char[plan->max_send_size * nbytes], 0, plan->max_send_size * nbytes, true);
662  }
663 
664  /* Barrier to ensure irecvs are posted before doing any sends. */
665  /* Simultaneously see if anyone out of memory */
666  int out_of_mem = 0;
667  // WARNING - do not delete this without proper barrier added as replacmeent.
668  // I'm refactoring memory handling so probably we won't use out_of_mem
669  // in the new version but we must still preserve a barrier here or get
670  // intermittent failures.
671  // I'll keep the memory reduce for now since we may end up with a memory
672  // handling anyways.
673  int global_out_of_mem;
674  Teuchos::reduceAll(*plan->comm, Teuchos::REDUCE_SUM, 1, &out_of_mem,
675  &global_out_of_mem);
676 
677  /* Send out data */
678 
679  /* Scan through procs_to list to start w/ higher numbered procs */
680  /* This should balance message traffic. */
681 
682  int nblocks = plan->nsends + plan->self_msg; /* # procs needing my data */
683  int proc_index = 0; /* loop counter over procs to send to */
684  while (proc_index < nblocks && plan->procs_to[proc_index] < my_proc) {
685  proc_index++;
686  }
687  if (proc_index == nblocks) {
688  proc_index = 0;
689  }
690 
691  if (!plan->using_sizes) { /* Data all of same size */
692  if (plan->indices_to == Teuchos::null) { /* data already blocked by processor. */
693  int self_num = 0; /* where in send list my_proc appears */
694  for (int i = proc_index, j = 0; j < nblocks; j++) {
695  if (plan->procs_to[i] != my_proc) {
696  Teuchos::ArrayRCP<char> subview(
697  &send_data[plan->starts_to[i] * nbytes],
698  0, plan->lengths_to[i] * nbytes, false);
699  Teuchos::readySend(subview.getRawPtr(), static_cast<int>(subview.size()), plan->procs_to[i], tag, *comm_);
700  }
701  else {
702  self_num = i;
703  }
704  if (++i == nblocks) {
705  i = 0;
706  }
707  }
708 
709  if (plan->self_msg) { /* Copy data to self. */
710  /* I use array+offset instead of &(array[offset]) because of
711  a bug with PGI v9 */
712  /* I use memmove because I'm not sure that the pointer are not
713  overlapped. */
714 
715  // TODO: Refactor to make C++ and remove getRawPtr, etc
716  memmove(
717  plan->getRecvBuff().getRawPtr()+self_recv_address,
718  send_data.getRawPtr()+(size_t)(plan->starts_to[self_num])*(size_t)nbytes,
719  (size_t) (plan->lengths_to[self_num]) * (size_t) nbytes);
720  }
721  }
722  else { /* Not blocked by processor. Need to buffer. */
723  int self_index = 0; /* send offset for data I'm keeping */
724  int self_num = 0; /* where in send list my_proc appears */
725  for (int i = proc_index, jj = 0; jj < nblocks; jj++) {
726  if (plan->procs_to[i] != my_proc) {
727  /* Need to pack message first. */
728  size_t offset = 0; /* offset into array I'm copying into */
729  int j = plan->starts_to[i];
730  for (int k = 0; k < plan->lengths_to[i]; k++) {
731  memcpy(&send_buff[offset],
732  &send_data[(size_t)(plan->indices_to[j++]) * (size_t)nbytes], nbytes);
733  offset += nbytes;
734  }
735  Teuchos::readySend(&send_buff[0], plan->lengths_to[i] * nbytes,
736  plan->procs_to[i], tag, *comm_);
737  }
738  else {
739  self_num = i;
740  self_index = plan->starts_to[i];
741  }
742  if (++i == nblocks)
743  i = 0;
744  }
745 
746  if (plan->self_msg) { /* Copy data to self. */
747  for (int k = 0; k < plan->lengths_to[self_num]; k++) {
748  memcpy(&(plan->getRecvBuff())[self_recv_address],
749  &send_data[
750  (size_t)(plan->indices_to[self_index++]) * (size_t)nbytes],
751  nbytes);
752  self_recv_address += nbytes;
753  }
754  }
755  }
756  }
757  else { /* Data of differing sizes */
758  if (plan->indices_to == Teuchos::null) { /* data already blocked by processor. */
759  int self_num = 0; /* where in send list my_proc appears */
760  for (int i = proc_index, j = 0; j < nblocks; j++) {
761  if (plan->procs_to[i] != my_proc) {
762  if (plan->sizes_to[i]) {
763  Teuchos::readySend(&send_data[plan->starts_to_ptr[i] * nbytes], plan->sizes_to[i] * nbytes,
764  plan->procs_to[i], tag, *comm_);
765  }
766  }
767  else
768  self_num = i;
769 
770  if (++i == nblocks)
771  i = 0;
772  }
773  if (plan->self_msg) { /* Copy data to self. */
774  if (plan->sizes_to[self_num]) {
775  char* lrecv = &plan->getRecvBuff().getRawPtr()[self_recv_address];
776  const char* lsend =
777  &send_data.getRawPtr()[(size_t)(plan->starts_to_ptr[self_num]) * (size_t)nbytes];
778  int sindex = plan->sizes_to[self_num], idx;
779  for (idx=0; idx<nbytes; idx++) {
780  memcpy(lrecv, lsend, sindex);
781  lrecv += sindex;
782  lsend += sindex;
783  }
784  }
785  }
786  }
787  else { /* Not blocked by processor. Need to buffer. */
788  int self_num = 0; /* where in send list my_proc appears */
789  for (int i = proc_index, jj = 0; jj < nblocks; jj++) {
790  if (plan->procs_to[i] != my_proc) {
791  size_t offset = 0; /* offset into array I'm copying into */
792  int j = plan->starts_to[i];
793  for (int k = 0; k < plan->lengths_to[i]; k++) {
794  if (plan->sizes[plan->indices_to[j]]) {
795  memcpy(&send_buff[offset],
796  &send_data[(size_t)(plan->indices_to_ptr[j]) * (size_t)nbytes],
797  (size_t)(plan->sizes[plan->indices_to[j]]) * (size_t)nbytes);
798  offset +=
799  (size_t)(plan->sizes[plan->indices_to[j]]) * (size_t)nbytes;
800  }
801  j++;
802  }
803  if (plan->sizes_to[i]) {
804  Teuchos::readySend(&send_buff[0], plan->sizes_to[i] * nbytes,
805  plan->procs_to[i], tag, *comm_);
806  }
807  }
808  else
809  self_num = i;
810 
811  if (++i == nblocks)
812  i = 0;
813  }
814  if (plan->self_msg) { /* Copy data to self. */
815  if (plan->sizes_to[self_num]) {
816  int j = plan->starts_to[self_num];
817  for (int k = 0; k < plan->lengths_to[self_num]; k++) {
818  int kk = plan->indices_to_ptr[j];
819  char* lrecv = &(plan->getRecvBuff())[self_recv_address];
820  size_t send_idx = (size_t)kk * (size_t)nbytes;
821  const char* lsend = &send_data[send_idx];
822  int sindex = plan->sizes[plan->indices_to[j]], idx;
823  for (idx=0; idx<nbytes; idx++) {
824  memcpy(lrecv, lsend, sindex);
825  lrecv += sindex;
826  lsend += sindex;
827  }
828  self_recv_address += (size_t)(plan->sizes[plan->indices_to[j]])
829  * (size_t) nbytes;
830  j++;
831  }
832  }
833  }
834  }
835  }
836 
837  return 0;
838 }
839 
840 int Zoltan2_Directory_Comm::do_wait(
841  Zoltan2_Directory_Plan *plan, /* communication data structure */
842  int /* tag */, /* message tag for communicating */
843  const Teuchos::ArrayRCP<char> &/* send_data */, /* array of data I currently own */
844  int nbytes, /* msg size */
845  Teuchos::ArrayRCP<char> &recv_data) /* array of data I'll own after comm */
846 {
847  /* If not point to point, currently we do synchroneous communications */
848  if (plan->maxed_recvs){
849  /* Do nothing */
850  return 0;
851  }
852 
853  int my_proc = plan->comm->getRank(); /* processor ID */
854 
855  /* Wait for messages to arrive & unpack them if necessary. */
856  /* Note: since request is in plan, could wait in later routine. */
857  if (plan->indices_from == Teuchos::null) { /* No copying required */
858  if (plan->nrecvs > 0) {
859  Teuchos::waitAll(*comm_, plan->request());
860  }
861  }
862  else { /* Need to copy into recv_data. */
863  int self_num; /* where in send list my_proc appears */
864  size_t offsetDst = 0;
865  if (plan->self_msg) { /* Unpack own data before waiting */
866  for (self_num = 0; self_num < plan->nrecvs + plan->self_msg;
867  self_num++) {
868  if (plan->procs_from[self_num] == my_proc) {
869  break;
870  }
871  }
872 
873  if(plan->sizes_from.size()) {
874  // NEW METHOD for variable sized data
875  // This will NOT put the data in order but during the directory read
876  // of the buffer which follows, this data gets sorted anyways since each
877  // element has an index to tell where it belongs. I am not sure yet if
878  // we must sort here, or if it's fine to allow the sort to happen through
879  // the index value. This needs furthe discussion.
880  memcpy(&recv_data[offsetDst * (size_t)nbytes],
881  &(plan->getRecvBuff())[plan->starts_from_ptr[self_num] * (size_t)nbytes],
882  plan->sizes_from[self_num] * (size_t)nbytes);
883  offsetDst += plan->sizes_from[self_num];
884  }
885  else {
886  int k = plan->starts_from[self_num];
887  for (int j = plan->lengths_from[self_num]; j; j--) {
888  memcpy(&recv_data[(size_t)(plan->indices_from[k]) * (size_t)nbytes],
889  &(plan->getRecvBuff())[(size_t)k * (size_t)nbytes], nbytes);
890  k++;
891  }
892  }
893  }
894  else {
895  self_num = plan->nrecvs;
896  }
897 
898  for (int jj = 0; jj < plan->nrecvs; jj++) {
899  // TODO: Refactored directory to use Teuchos comm but we have no Teuchos::waitAny
900  // Short term fix is we just call wait() on each request in serial.
901  // When we add Teuchos::waitAny we can replace it for the old version
902  // which is commented out below.
903  plan->request[jj]->wait();
904  int index = jj;
905 
906  // Old form with MPI_Waitany
907  // MPI_Status status; /* return from Waitany */
908  // int index;
909  // MPI_Waitany(plan->nrecvs, &plan->request[0], &index, &status);
910 
911  if (index >= self_num) {
912  index++;
913  }
914 
915  if(plan->sizes_from.size()) {
916  // NEW METHOD for variable sized data
917  // This will NOT put the data in order but during the directory read
918  // of the buffer which follows, this data gets sorted anyways since each
919  // element has an index to tell where it belongs. I am not sure yet if
920  // we must sort here, or if it's fine to allow the sort to happen through
921  // the index value. This needs furthe discussion.
922  memcpy(&recv_data[offsetDst * (size_t)nbytes],
923  &plan->getRecvBuff().getRawPtr()[plan->starts_from_ptr[index] * (size_t)nbytes],
924  plan->sizes_from[index] * (size_t)nbytes);
925  offsetDst += plan->sizes_from[index];
926  }
927  else {
928  int k = plan->starts_from[index];
929  for (int j = plan->lengths_from[index]; j; j--) {
930  memcpy(&recv_data.getRawPtr()[(size_t)(plan->indices_from[k]) * (size_t)nbytes],
931  &plan->getRecvBuff().getRawPtr()[(size_t)k * (size_t)nbytes], nbytes);
932  k++;
933  }
934  }
935  }
936  }
937 
938  return 0;
939 }
940 
941 /* Do_Post would require posting more receives than allowed on this platform.
942 * We use MPI_AlltoAllv instead, which is probably implemented such that each
943 * process does one receive at a time.
944 */
945 
946 int Zoltan2_Directory_Comm::do_all_to_all(
947  Zoltan2_Directory_Plan *plan, /* communication data structure */
948  const Teuchos::ArrayRCP<char> &send_data, /* array of data I currently own */
949  int nbytes, /* msg size */
950  Teuchos::ArrayRCP<char> &recv_data) /* array of data I'll own after comm */
951 {
952  throw std::logic_error("Zoltan2_Directory_Comm.coom untested refactored code (6)"); // needs unit testing
953 
954  int sm = (plan->self_msg > 0) ? 1 : 0;
955 
956  int nSendMsgs = plan->nsends + sm;
957  int nRecvMsgs = plan->nrecvs + sm;
958 
959  int nSendItems = 0;
960  for (int i=0; i <nSendMsgs; i++) {
961  nSendItems += plan->lengths_to[i];
962  }
963  int nRecvItems = 0;
964  for (int i=0; i <nRecvMsgs; i++) {
965  nRecvItems += plan->lengths_from[i];
966  }
967 
968  int nprocs = plan->comm->getSize();
969 
970  Teuchos::ArrayRCP<int> outbufCounts(new int[nprocs], 0, nprocs, true);
971  Teuchos::ArrayRCP<int> outbufOffsets(new int[nprocs], 0, nprocs, true);
972  Teuchos::ArrayRCP<int> inbufCounts(new int[nprocs], 0, nprocs, true);
973  Teuchos::ArrayRCP<int> inbufOffsets(new int[nprocs], 0, nprocs, true);
974 
975  /* The *_to fields of the plan refer to the items in the send_data buffer,
976  * and how to pull out the correct items for each receiver. The
977  * *_from fields of the plan refer to the recv_data buffer. Items
978  * arrive in process rank order, and these fields tell us where to
979  * put them in the recv_data buffer.
980  */
981 
982  /* CREATE SEND BUFFER */
983 
984  int sorted = 0;
985  if (plan->indices_to == Teuchos::null){
986  sorted = 1;
987  for (int i=1; i< nSendMsgs; i++){
988  if (plan->starts_to[i] < plan->starts_to[i-1]){
989  sorted = 0;
990  break;
991  }
992  }
993  }
994 
995  Teuchos::ArrayRCP<char> outbuf;
996  Teuchos::ArrayRCP<char> inbuf;
997  Teuchos::ArrayRCP<char> buf;
998 
999  if (plan->sizes_to.size()){
1000  /*
1001  * Each message contains items for a process, and each item may be
1002  * a different size.
1003  */
1004 
1005  int outbufLen = 0;
1006  for (int i = 0; i < nSendMsgs; i++){
1007  outbufLen += plan->sizes_to[i];
1008  }
1009 
1010  if (plan->indices_to != Teuchos::null) {
1011  /*
1012  * items are not grouped by message
1013  */
1014  buf.resize(outbufLen*nbytes);
1015  outbuf.resize(outbufLen*nbytes);
1016  char * pBufPtr = &(outbuf[0]);
1017  int i = 0;
1018  int k = 0;
1019  for (int p = 0; p < nprocs; p++) {
1020 
1021  int length = 0;
1022 
1023  if (i < nSendMsgs){
1024  if (plan->procs_to[i] == p){ /* procs_to is sorted */
1025 
1026  for (int j=0; j < plan->lengths_to[i]; j++,k++){
1027  int itemSize = plan->sizes[plan->indices_to[k]] * nbytes;
1028  int offset = plan->indices_to_ptr[k] * nbytes;
1029 
1030  memcpy(pBufPtr, &(send_data[0]) + offset, itemSize);
1031 
1032  pBufPtr += itemSize;
1033  length += itemSize;
1034  }
1035  i++;
1036  }
1037  }
1038 
1039  outbufCounts[p] = length;
1040  if (p){
1041  outbufOffsets[p] = outbufOffsets[p-1] + outbufCounts[p-1];
1042  }
1043  }
1044  }
1045  else{
1046  /*
1047  * items are stored contiguously for each message
1048  */
1049 
1050  if (!sorted || (plan->nvals > nSendItems) ){
1051  buf.resize(outbufLen*nbytes);
1052  outbuf.resize(outbufLen*nbytes);
1053  }
1054  else{
1055  /* All items in send_data are being sent, and they are sorted
1056  * in process rank order.
1057  */
1058  // TODO: Optimize - original just set the ptr...
1059  for(int n = 0; n < outbufLen*nbytes; ++n) {
1060  outbuf[n] = send_data[n];
1061  }
1062  }
1063 
1064  char * pBufPtr = &(outbuf[0]);
1065 
1066  int i = 0;
1067  for (int p = 0; p < nprocs; p++) {
1068 
1069  int length = 0;
1070 
1071  if (i < nSendMsgs){
1072  if (plan->procs_to[i] == p){ /* procs_to is sorted */
1073  length = plan->sizes_to[i] * nbytes;
1074  int offset = plan->starts_to_ptr[i] * nbytes;
1075 
1076  if ((!sorted || (plan->nvals > nSendItems)) && length){
1077  memcpy(pBufPtr, &(send_data[0]) + offset, length);
1078  pBufPtr += length;
1079  }
1080  i++;
1081  }
1082  }
1083 
1084  outbufCounts[p] = length;
1085  if (p){
1086  outbufOffsets[p] = outbufOffsets[p-1] + outbufCounts[p-1];
1087  }
1088  }
1089  }
1090  }
1091  else if (plan->indices_to != Teuchos::null) {
1092  /*
1093  * item sizes are constant, however the items belonging in a given
1094  * message may not be contiguous in send_data
1095  */
1096  buf.resize(nSendItems*nbytes);
1097  outbuf.resize(nSendItems*nbytes);
1098  char * pBufPtr = &(outbuf[0]);
1099  int i = 0;
1100  int k = 0;
1101  for (int p = 0; p < nprocs; p++){
1102 
1103  int length = 0;
1104 
1105  if (i < nSendMsgs){
1106  if (plan->procs_to[i] == p){ /* procs_to is sorted */
1107  for (int j=0; j < plan->lengths_to[i]; j++,k++) {
1108  int offset = plan->indices_to[k] * nbytes;
1109  memcpy(pBufPtr, &(send_data[0]) + offset, nbytes);
1110  pBufPtr += nbytes;
1111  }
1112  length = plan->lengths_to[i] * nbytes;
1113  i++;
1114  }
1115  }
1116 
1117  outbufCounts[p] = length;
1118  if (p){
1119  outbufOffsets[p] = outbufOffsets[p-1] + outbufCounts[p-1];
1120  }
1121  }
1122  }
1123  else{
1124 
1125  /* item sizes are constant, and items belonging to a
1126  * given message are always stored contiguously in send_data
1127  */
1128 
1129  if (!sorted || (plan->nvals > nSendItems)){
1130  buf.resize(nSendItems*nbytes);
1131  outbuf.resize(nSendItems*nbytes);
1132  }
1133  else{
1134  /* send_data is sorted by process, and we don't skip
1135  * any of the data in the buffer, so we can use send_data
1136  * in the alltoall call
1137  */
1138  // TODO: Optimize - original just set ptr
1139  outbuf = send_data;
1140  }
1141 
1142  char * pBufPtr = &(outbuf[0]);
1143 
1144  int i = 0;
1145  for (int p=0; p < nprocs; p++) {
1146 
1147  int length = 0;
1148 
1149  if (i < nSendMsgs){
1150  if (plan->procs_to[i] == p){ /* procs_to is sorted */
1151  int offset = plan->starts_to[i] * nbytes;
1152  length = plan->lengths_to[i] * nbytes;
1153 
1154  if ((!sorted || (plan->nvals > nSendItems)) && length){
1155  memcpy(pBufPtr, &(send_data[0]) + offset, length);
1156  pBufPtr += length;
1157  }
1158  i++;
1159  }
1160  }
1161 
1162  outbufCounts[p] = length;
1163  if (p){
1164  outbufOffsets[p] = outbufOffsets[p-1] + outbufCounts[p-1];
1165  }
1166  }
1167  }
1168 
1169  /* CREATE RECEIVE BUFFER */
1170 
1171  sorted = 0;
1172  int i;
1173  if (plan->indices_from == Teuchos::null) {
1174  sorted = 1;
1175  for (i=1; i< nRecvMsgs; i++) {
1176  if (plan->starts_from[i] < plan->starts_from[i-1]){
1177  sorted = 0;
1178  break;
1179  }
1180  }
1181  }
1182 
1183  if (sorted){
1184  /* Caller already expects received data to be ordered by
1185  * the sending process rank.
1186  */
1187 
1188  // TODO: Optimize - original just set ptr
1189  outbuf = send_data;
1190  inbuf = recv_data;
1191  }
1192  else {
1193  inbuf.resize(plan->total_recv_size * nbytes);
1194  }
1195 
1196  for (int p = 0; p < nprocs; p++) {
1197  int length = 0;
1198 
1199  if (i < nRecvMsgs){
1200  if (plan->procs_from[i] == p){
1201 
1202  if (!plan->using_sizes){
1203  length = plan->lengths_from[i] * nbytes;
1204  }
1205  else{
1206  length = plan->sizes_from[i] * nbytes;
1207  }
1208  i++;
1209  }
1210  }
1211 
1212  inbufCounts[p] = length;
1213  if (p){
1214  inbufOffsets[p] = inbufOffsets[p-1] + inbufCounts[p-1];
1215  }
1216  }
1217 
1218  /* EXCHANGE DATA */
1219  // Did not refactor this - need Teuchos form but not tested code
1220  // MPI_Alltoall(&(sendbuf[0]), 1, MPI_INT, &(recvbuf[0]), 1, MPI_INT,
1221  // getRawComm());
1222 
1223  /* WRITE RECEIVED DATA INTO USER'S BUFFER WHERE IT'S EXPECTED */
1224 
1225  if (!sorted){
1226 
1227  char * pBufPtr = &(inbuf[0]);
1228 
1229  if (!plan->using_sizes){
1230 
1231  /* each item in each message is nbytes long */
1232 
1233  if (plan->indices_from == Teuchos::null) {
1234  for (i=0; i < nRecvMsgs; i++){
1235  int offset = plan->starts_from[i] * nbytes;
1236  int length = plan->lengths_from[i] * nbytes;
1237  memcpy(&(recv_data[0]) + offset, pBufPtr, length);
1238  pBufPtr += length;
1239  }
1240  }
1241  else{
1242  int k = 0;
1243  for (i=0; i < nRecvMsgs; i++) {
1244 
1245  for (int j=0; j < plan->lengths_from[i]; j++,k++){
1246  int offset = plan->indices_from[k] * nbytes;
1247  memcpy(&(recv_data[0]) + offset, pBufPtr, nbytes);
1248  pBufPtr += nbytes;
1249  }
1250  }
1251  }
1252  }
1253  else{ /* (sizes!=NULL) && (indices_from!=NULL) not allowed by Zoltan_Comm_Resize */
1254 
1255  /* items can be different sizes */
1256 
1257  for (i=0; i < nRecvMsgs; i++){
1258  int offset = plan->starts_from_ptr[i] * nbytes;
1259  int length = plan->sizes_from[i] * nbytes;
1260  memcpy(&(recv_data[0]) + offset, pBufPtr, length);
1261  pBufPtr += length;
1262  }
1263  }
1264  }
1265 
1266  return 0;
1267 }
1268 
1270  int tag, /* message tag for communicating */
1271  const Teuchos::ArrayRCP<char> &send_data, /* array of data I currently own */
1272  int nbytes, /* msg size */
1273  const Teuchos::ArrayRCP<int> &sizes,
1274  Teuchos::ArrayRCP<char> &recv_data) /* array of data I'll own after reverse comm */
1275 {
1276  /* create plan->plan_reverse
1277  */
1278  int status = create_reverse_plan(tag, sizes);
1279 
1280  // NEW METHOD
1281  // Set up recv_data with the proper size
1282  // This information is only available after the above create_reverse_plan is
1283  // called so we can setup the return data with proper buffer size now.
1284  // However should we do this here?
1285  size_t new_size = plan_forward->plan_reverse->total_recv_size*nbytes;
1286  if(new_size > 0) {
1287  // have to be careful with new[0] which will for example turn a
1288  // arrayRCP.getRawPtr() from a valid NULL read to a debug assert fail.
1289  recv_data = Teuchos::arcp(new char[new_size], 0, new_size, true);
1290  }
1291 
1292  if (status == 0) {
1293 
1294  if (plan_forward->plan_reverse->maxed_recvs) {
1295 
1296  throw std::logic_error("Zoltan2_Directory_Comm.coom untested refactored code (7)"); // needs unit testing
1297 
1298  /* use MPI_Alltoallv to implement plan->plan_reverse, because comm_do_post
1299  * would post more receives that allowed on this machine
1300  */
1301 
1302  status = do_all_to_all(plan_forward->plan_reverse, send_data,
1303  nbytes, recv_data);
1304  }
1305  else {
1306  /* use post/wait which is faster when each sends to few
1307  */
1308  status = do_post(plan_forward->plan_reverse, tag, send_data,
1309  nbytes, recv_data);
1310 
1311  if (status == 0) {
1312  status = do_wait (plan_forward->plan_reverse, tag, send_data,
1313  nbytes, recv_data);
1314  }
1315  }
1316  }
1317 
1318  free_reverse_plan(plan_forward);
1319 
1320  return status;
1321 }
1322 
1323 void Zoltan2_Directory_Comm::free_reverse_plan(Zoltan2_Directory_Plan *plan)
1324 {
1325  if(!plan) {
1326  throw std::logic_error("Plan is NULL!");
1327  }
1328  delete plan->plan_reverse;
1329  plan->plan_reverse = NULL;
1330 }
1331 
1332 int Zoltan2_Directory_Comm::create_reverse_plan(
1333  int tag,
1334  const Teuchos::ArrayRCP<int> &sizes)/* variable size of objects (if not size 0) */
1335 {
1336  /* Check input parameters */
1337  if (!plan_forward){
1338  throw std::logic_error("memory error");
1339  }
1340 
1341  /* Let Zoltan_Comm_Do check the remaining parameters. */
1342  plan_forward->plan_reverse = new Zoltan2_Directory_Plan;
1343  plan_forward->plan_reverse->getInvertedValues(plan_forward);
1344 
1345  if (MPI_RECV_LIMIT > 0){
1346  /* If we have a limit to the number of posted receives we are allowed,
1347  ** and our plan has exceeded that, then switch to an MPI_Alltoallv so
1348  ** that we will have fewer receives posted when we do the communication.
1349  */
1350  int global_nsends;
1351  Teuchos::reduceAll<int>(*plan_forward->comm, Teuchos::REDUCE_SUM, 1,
1352  &plan_forward->nsends, &global_nsends);
1353  if (global_nsends > MPI_RECV_LIMIT){
1354  plan_forward->plan_reverse->maxed_recvs = 1;
1355  }
1356  }
1357 
1358  if (plan_forward->plan_reverse->maxed_recvs == 0) {
1359  // See notes in header for MPI_Request
1360  // plan_forward->plan_reverse->request = Teuchos::arcp(new MPI_Request[plan_forward->plan_reverse->nrecvs], 0, plan_forward->plan_reverse->nrecvs, true);
1361  // plan_forward->plan_reverse->status = Teuchos::arcp(new MPI_Status[plan_forward->plan_reverse->nrecvs], 0, plan_forward->plan_reverse->nrecvs, true);
1362  plan_forward->plan_reverse->request.resize(plan_forward->plan_reverse->nrecvs);
1363  }
1364 
1365  int sum_recv_sizes;
1366  int comm_flag = resize( plan_forward->plan_reverse,
1367  sizes, tag, &sum_recv_sizes);
1368 
1369  if (comm_flag != 0) {
1370  return(comm_flag);
1371  }
1372 
1373  if (sum_recv_sizes != plan_forward->plan_reverse->total_recv_size){
1374  /* Sanity check */
1375  return 1;
1376  }
1377 
1378  return 0;
1379 }
1380 
1382  const Teuchos::ArrayRCP<int> &sizes, /* size of each item I'm sending */
1383  int tag, /* message tag I can use */
1384  int *sum_recv_sizes) /* sum of the sizes of the items I'll receive */
1385 {
1386  return resize(plan_forward, sizes, tag, sum_recv_sizes);
1387 }
1388 
1390  Zoltan2_Directory_Plan *plan, /* communication plan object */
1391  const Teuchos::ArrayRCP<int> &sizes, /* size of each item I'm sending */
1392  int tag, /* message tag I can use */
1393  int *sum_recv_sizes) /* sum of the sizes of the items I'll receive */
1394 {
1395  /* If sizes vary, then I need to compute and communicate message lengths */
1396  /* First check if sizes array is NULL on all procs. */
1397  int my_proc = plan->comm->getRank(); /* my processor ID */
1398  int has_sizes = (sizes.size() != 0);
1399  int var_sizes; /* items have variable sizes? */
1400 
1401  Teuchos::reduceAll(*comm_, Teuchos::REDUCE_BOR, 1, &has_sizes, &var_sizes);
1402 
1403  if (var_sizes && plan->indices_from != Teuchos::null) {
1404  // NEW METHOD
1405  // Allow this to run now - the below implementation is working but perhaps
1406  // not done correctly for other usage cases I have not considered yet.
1407  // throw std::logic_error("Non-blocked, variable-sized recvs not supported");
1408  }
1409 
1410  int nsends = plan->nsends; /* number of msgs I'll send */
1411  int nrecvs = plan->nrecvs; /* number of msgs I'll recv */
1412  int self_msg = plan->self_msg;
1413 
1414  Teuchos::ArrayRCP<int> sizes_to;
1415  Teuchos::ArrayRCP<int> sizes_from;
1416  Teuchos::ArrayRCP<int> starts_to_ptr;
1417  Teuchos::ArrayRCP<int> starts_from_ptr;
1418  Teuchos::ArrayRCP<int> indices_to_ptr;
1419  Teuchos::ArrayRCP<int> indices_from_ptr;
1420 
1421  if (!var_sizes) { /* Easy case. Size = length */
1422  plan->total_recv_size = 0;
1423  for (int i = 0; i < nrecvs + self_msg; i++) {
1424  plan->total_recv_size += plan->lengths_from[i];
1425  }
1426 
1427  plan->max_send_size = 0;
1428  for (int i = 0; i < nsends + self_msg; i++) {
1429  if (plan->procs_to[i] != my_proc &&
1430  plan->lengths_to[i] > plan->max_send_size) {
1431  plan->max_send_size = plan->lengths_to[i];
1432  }
1433  }
1434  }
1435  else { /* Need to actually compute message sizes */
1436 
1437  // TODO Investigate purpose of the +1 in the old code. Is that used?
1438 
1439  // OLD CODE
1440  // plan->sizes.resize(plan->nvals + 1);
1441  // for (int i = 0; i < plan->nvals; i++) {
1442  // plan->sizes[i] = sizes[i];
1443  // }
1444 
1445  // NEW CODE
1446  plan->sizes = sizes; // can we just copy?
1447  plan->using_sizes = true;
1448 
1449  if(nsends + self_msg > 0) {
1450  sizes_to = Teuchos::arcp(
1451  new int[nsends + self_msg], 0, nsends + self_msg, true);
1452  for(int n = 0; n < sizes_to.size(); ++n) {
1453  sizes_to[n] = 0;
1454  }
1455  }
1456  if(nrecvs + self_msg > 0) {
1457  sizes_from = Teuchos::arcp(
1458  new int[nrecvs + self_msg], 0, nrecvs + self_msg, true);
1459  }
1460 
1461  /* Several cases:
1462  1. indices_to == NULL
1463  => starts_to != NULL, need to allocate, set starts_to_ptr
1464  2. indices_to != NULL (=> starts_to == NULL)
1465  need to allocate, set indices_to_ptr
1466  3,4. mirror cases for _from
1467  */
1468  if(nsends + self_msg > 0) {
1469  starts_to_ptr = Teuchos::arcp(
1470  new int[nsends + self_msg], 0, nsends + self_msg, true);
1471  }
1472  if (plan->indices_to == Teuchos::null) {
1473  /* Simpler case; sends already blocked by processor */
1474  Teuchos::ArrayRCP<int> index;
1475  Teuchos::ArrayRCP<int> sort_val;
1476  if(nsends + self_msg > 0) {
1477  index = Teuchos::arcp(
1478  new int[nsends + self_msg], 0, nsends + self_msg, true);
1479  sort_val = Teuchos::arcp(
1480  new int[nsends + self_msg], 0, nsends + self_msg, true);
1481  }
1482  for (int i = 0; i < nsends + self_msg; i++) {
1483  int j = plan->starts_to[i];
1484 
1485  for (int k = 0; k < plan->lengths_to[i]; k++) {
1486  sizes_to[i] += sizes[j++];
1487  }
1488  if (sizes_to[i] > plan->max_send_size &&
1489  plan->procs_to[i] != my_proc)
1490  plan->max_send_size = sizes_to[i];
1491  }
1492  for (int i = 0; i < nsends + self_msg; i++) {
1493  sort_val[i] = plan->starts_to[i];
1494  index[i] = i;
1495  }
1496  sort_ints(sort_val, index);
1497  int sum = 0;
1498  for (int i = 0; i < nsends + self_msg; i++) {
1499  starts_to_ptr[index[i]] = sum;
1500  sum += sizes_to[index[i]];
1501  }
1502  }
1503  else { /* Harder case, sends not blocked */
1504  Teuchos::ArrayRCP<int> offset;
1505  if(plan->nvals > 0) {
1506  offset = Teuchos::arcp(new int[plan->nvals], 0, plan->nvals, true);
1507  }
1508  indices_to_ptr.resize(plan->nvals);
1509 
1510  /* Compute address for every item in send array */
1511  int sum = 0;
1512  for (int i = 0; i < plan->nvals; i++) {
1513  offset[i] = sum;
1514  sum += sizes[i];
1515  }
1516 
1517  sum = 0;
1518  plan->max_send_size = 0;
1519  for (int i = 0; i < nsends + self_msg; i++) {
1520  starts_to_ptr[i] = sum;
1521  int j = plan->starts_to[i];
1522  for (int k = 0; k < plan->lengths_to[i]; k++) {
1523  indices_to_ptr[j] = offset[plan->indices_to[j]];
1524  sizes_to[i] += sizes[plan->indices_to[j++]];
1525  }
1526  if (sizes_to[i] > plan->max_send_size &&
1527  plan->procs_to[i] != my_proc)
1528  plan->max_send_size = sizes_to[i];
1529  sum += sizes_to[i];
1530  }
1531  }
1532 
1533  /* Note: This routine only gets message sizes, not object sizes. */
1534  /* Anything requiring item sizes requires more code */
1535  /* Should such functionality reside here? */
1536  exchange_sizes(sizes_to, plan->procs_to, nsends, self_msg,
1537  sizes_from, plan->procs_from, nrecvs,
1538  &plan->total_recv_size, my_proc, tag, plan->comm);
1539 
1540  if(nrecvs + self_msg > 0) {
1541  starts_from_ptr = Teuchos::arcp(
1542  new int[nrecvs + self_msg], 0, nrecvs + self_msg, true);
1543  }
1544 
1545  if (plan->indices_from == Teuchos::null) {
1546  /* Simpler case; recvs already blocked by processor */
1547  Teuchos::ArrayRCP<int> index;
1548  Teuchos::ArrayRCP<int> sort_val;
1549  if(nrecvs + self_msg > 0) {
1550  index = Teuchos::arcp(
1551  new int[nrecvs + self_msg], 0, nrecvs + self_msg, true);
1552  sort_val = Teuchos::arcp<int>(
1553  new int[nrecvs + self_msg], 0, nrecvs + self_msg, true);
1554  }
1555 
1556  for (int i = 0; i < nrecvs + self_msg; i++) {
1557  sort_val[i] = plan->starts_from[i];
1558  index[i] = i;
1559  }
1560  sort_ints(sort_val, index);
1561 
1562  int sum = 0;
1563  for (int i = 0; i < nrecvs + self_msg; i++) {
1564  starts_from_ptr[index[i]] = sum;
1565  sum += sizes_from[index[i]];
1566  }
1567  }
1568  else {
1569  // OLD COMMENT left here for reference but to be deleted
1570  /* Harder case, recvs not blocked */
1571  /* Not currently supported */
1572  /* Can't do w/o individual item sizes */
1573  /* Currently checked for at top of file */
1574 
1575  // NEW METHOD
1576  // Note this is currently just a duplicate of above block which is working
1577  // since I've set up do_wait to just copy the entire block. However I am
1578  // not sure yet how to organize this implementation and suspect this may
1579  // not be correct anyways - though it seems to work. After do_wait copies
1580  // the data (out of order) the directory will handle resorting it since
1581  // each element has index to indicate where it goes in the array. In the
1582  // original do_wait implementation it seemed it was setup to place the
1583  // elemenets in order, even though they would be sorted later, so that
1584  // part I need to discuss further
1585  Teuchos::ArrayRCP<int> index;
1586  Teuchos::ArrayRCP<int> sort_val;
1587  if(nrecvs + self_msg > 0) {
1588  index = Teuchos::arcp(
1589  new int[nrecvs + self_msg], 0, nrecvs + self_msg, true);
1590  sort_val = Teuchos::arcp(
1591  new int[nrecvs + self_msg], 0, nrecvs + self_msg, true);
1592  }
1593 
1594  for (int i = 0; i < nrecvs + self_msg; i++) {
1595  sort_val[i] = plan->starts_from[i];
1596  index[i] = i;
1597  }
1598  sort_ints(sort_val, index);
1599 
1600  int sum = 0;
1601  for (int i = 0; i < nrecvs + self_msg; i++) {
1602  starts_from_ptr[index[i]] = sum;
1603  sum += sizes_from[index[i]];
1604  }
1605  }
1606  }
1607  plan->sizes_to = sizes_to;
1608  plan->sizes_from = sizes_from;
1609  plan->starts_to_ptr = starts_to_ptr;
1610  plan->starts_from_ptr = starts_from_ptr;
1611  plan->indices_to_ptr = indices_to_ptr;
1612  plan->indices_from_ptr = indices_from_ptr;
1613 
1614  if (sum_recv_sizes) {
1615  *sum_recv_sizes = plan->total_recv_size;
1616  }
1617 
1618  return 0;
1619 }
1620 
1621 int Zoltan2_Directory_Comm::exchange_sizes(
1622  const Teuchos::ArrayRCP<int> &sizes_to, /* value I need to exchange (size of true msg) */
1623  const Teuchos::ArrayRCP<int> &procs_to, /* procs I send to */
1624  int nsends, /* number of messages I'll send */
1625  int self_msg, /* do I copy data to myself? */
1626  Teuchos::ArrayRCP<int> &sizes_from, /* (returned) size of all my receives */
1627  const Teuchos::ArrayRCP<int> &procs_from, /* procs I recv from */
1628  int nrecvs, /* number of messages I receive */
1629  int *total_recv_size, /* (returned) sum of all incoming sizes */
1630  int my_proc, /* my processor number */
1631  int tag, /* message tag I can use */
1632  Teuchos::RCP<const Teuchos::Comm<int> > /* comm */) { /* communicator */
1633 
1634  /* If sizes vary, then I need to communicate messaaage lengths */
1635  int self_index_to = -1; /* location of self in procs_to */
1636  for (int i = 0; i < nsends + self_msg; i++) {
1637  if (procs_to[i] != my_proc) {
1638 #ifdef HAVE_MPI // Teuchos::send not implemented for Serial - Serial is just for debugging
1639  Teuchos::send(*comm_, 1, &sizes_to[i], procs_to[i]);
1640 #endif
1641  }
1642  else {
1643  self_index_to = i;
1644  }
1645  }
1646 
1647  *total_recv_size = 0;
1648 
1649  for (int i = 0; i < nrecvs + self_msg; i++) {
1650  if (procs_from[i] != my_proc) {
1651 #ifdef HAVE_MPI // Teuchos::receive not implemented for Serial - Serial is just for debugging
1652  Teuchos::receive(*comm_, procs_from[i], 1, &sizes_from[i]);
1653 #endif
1654  }
1655  else {
1656  sizes_from[i] = sizes_to[self_index_to];
1657  }
1658  *total_recv_size += sizes_from[i];
1659  }
1660  return 0;
1661 }
1662 
1663 } // end namespace Zoltan2
Teuchos::ArrayRCP< int > indices_from_ptr
void print(const std::string &headerMessage) const
int do_reverse(int tag, const Teuchos::ArrayRCP< char > &send_data, int nbytes, const Teuchos::ArrayRCP< int > &sizes, Teuchos::ArrayRCP< char > &recv_data)
Teuchos::ArrayRCP< char > getRecvBuff() const
Teuchos::ArrayRCP< int > indices_to_ptr
#define MPI_RECV_LIMIT
Teuchos::ArrayRCP< int > starts_from_ptr
int do_forward(int tag, const Teuchos::ArrayRCP< char > &send_data, int nbytes, Teuchos::ArrayRCP< char > &recv_data)
Teuchos::ArrayRCP< Teuchos::RCP< Teuchos::CommRequest< int > > > request
int resize(const Teuchos::ArrayRCP< int > &sizes, int tag, int *sum_recv_sizes)
Teuchos::RCP< const Teuchos::Comm< int > > comm
#define PRINT_VAL(val)
#define PRINT_VECTOR(v)
Zoltan2_Directory_Comm(int nvals, const Teuchos::ArrayRCP< int > &assign, Teuchos::RCP< const Teuchos::Comm< int > > comm, int tag)
Teuchos::ArrayRCP< int > starts_to_ptr
void getInvertedValues(Zoltan2_Directory_Plan *from)