Zoltan2
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
Zoltan2_Directory_Comm.cpp
Go to the documentation of this file.
1 // @HEADER
2 // *****************************************************************************
3 // Zoltan2: A package of combinatorial algorithms for scientific computing
4 //
5 // Copyright 2012 NTESS and the Zoltan2 contributors.
6 // SPDX-License-Identifier: BSD-3-Clause
7 // *****************************************************************************
8 // @HEADER
9 
11 #include <stdexcept>
12 #include <memory>
13 
14 namespace Zoltan2 {
15 
17  total_recv_size = 0;
18  for (int i = 0; i < from->nsends + from->self_msg; i++) {
19  total_recv_size += from->lengths_to[i];
20  }
21 
22  max_send_size = 0;
23  for (int i = 0; i < from->nrecvs; i++) {
24  if (from->lengths_from[i] > max_send_size) {
25  max_send_size = from->lengths_from[i];
26  }
27  }
28 
29  nvals = from->nvals_recv;
30  nvals_recv = from->nvals;
31  lengths_to = from->lengths_from;
32  procs_to = from->procs_from;
33  indices_to = from->indices_from;
34 
35  starts_to = from->starts_from;
36  lengths_from = from->lengths_to;
37  procs_from = from->procs_to;
38  indices_from = from->indices_to;
39 
40  starts_from = from->starts_to;
41  nrecvs = from->nsends;
42  nsends = from->nrecvs;
43  self_msg = from->self_msg;
44  comm = from->comm;
45 }
46 
47 
48 void Zoltan2_Directory_Plan::print(const std::string& headerMessage) const {
49 
50  #define PRINT_VECTOR(v) \
51  if(v != Teuchos::null) { \
52  std::cout << " " << #v << " "; \
53  for(Teuchos::ArrayRCP<int>::size_type n = 0; n < v.size(); ++n) { \
54  std::cout << v[n] << " "; \
55  } \
56  std::cout << std::endl; \
57  }
58 
59  #define PRINT_VAL(val) std::cout << " " << #val << ": " << val << std::endl;
60 
61  for(int proc = 0; proc < comm->getSize(); ++proc) {
62  comm->barrier();
63  if(proc == comm->getRank()) {
64 
65  std::cout << "Rank " << proc << " " << headerMessage << std::endl;
66 
82 
91  }
92  }
93  comm->barrier();
94 }
95 
97  int nvals, /* number of values I currently own */
98  const Teuchos::ArrayRCP<int> &assign, /* processor assignment for all my values */
99  Teuchos::RCP<const Teuchos::Comm<int> > comm, /* communicator */
100  int tag) : /* message tag I can use */
101  comm_(comm),
102  plan_forward(NULL)
103 {
104  if (comm == Teuchos::null){
105  throw std::logic_error("Invalid communicator: MPI_COMM_NULL.");
106  }
107 
108  int my_proc = comm->getRank(); /* my processor tag in communicator */
109  int nprocs = comm->getSize(); /* number of processors in communicator */
110 
111  /* First check to see if items are grouped by processor with no gaps. */
112  /* If so, indices_to should be NULL (= identity) */
113 
114  /* Make data structures that will allow me to traverse arrays quickly. */
115  /* Begin by determining number of objects I'm sending to each processor. */
116  Teuchos::ArrayRCP<int> starts(new int[nprocs + 1], 0, nprocs + 1, true);
117  for(int n = 0; n < starts.size(); ++n) {
118  starts[n] = 0;
119  }
120 
121  /* Note: Negative assign value means ignore item. */
122  /* Non-trailing negatives mean data not packed so need send_buf. */
123  /* Could (but don't) allow negatives between processor blocks w/o buf. */
124  int nactive = 0; /* number of values to remap */
125 
126  int no_send_buff = 1; /* is data nicely grouped by processor? */
127 
128  int prev_proc = nprocs; /* processor on previous loop pass */
129 
130  for (int i = 0; i < nvals; i++) {
131  int proc = assign[i];
132  if (no_send_buff && proc != prev_proc) { /* Checks if blocked by proc */
133  if (proc >= 0 && (starts[proc] || prev_proc < 0)) {
134  no_send_buff = 0;
135  }
136  else {
137  prev_proc = proc;
138  }
139  }
140  if (proc >= 0) {
141  ++starts[proc];
142  ++nactive;
143  }
144  }
145 
146  int self_msg = (starts[my_proc] != 0); /* do I have data for myself? */
147 
148  Teuchos::ArrayRCP<int> lengths_to; /* lengths I'll send to */
149  Teuchos::ArrayRCP<int> procs_to; /* processors I'll send to */
150  Teuchos::ArrayRCP<int> starts_to; /* where in list my sends begin */
151  Teuchos::ArrayRCP<int> indices_to; /* local_id values I'll be sending */
152 
153  int max_send_size = 0; /* size of longest message I send */
154  int nsends = 0; /* # procs I'll send to (including self) */
155  int nrecvs = 0; /* # procs I'll recv from (including self) */
156 
157  if (no_send_buff) {
158  /* Grouped by processor. Array indices_to can be NULL (= identity) */
159  nsends = 0;
160  for (int i = 0; i < nprocs; i++) {
161  if (starts[i] != 0) ++nsends;
162  }
163 
164  lengths_to.resize(nsends);
165  starts_to.resize(nsends);
166  procs_to.resize(nsends);
167 
168  int index = 0; /* index into list of objects */
169  /* Note that procs_to is in the order the data was passed in. */
170  for (int i = 0; i < nsends; i++) {
171  starts_to[i] = index;
172  int proc = assign[index];
173  procs_to[i] = proc;
174  index += starts[proc];
175  }
176 
177  /* Now sort the outgoing procs. */
178  /* This keeps recvs deterministic if I ever invert communication */
179  /* It also allows for better balance of traffic in comm_do */
180  sort_ints(procs_to, starts_to);
181 
182  max_send_size = 0;
183  for (int i = 0; i < nsends; i++) {
184  int proc = procs_to[i];
185  lengths_to[i] = starts[proc];
186  if (proc != my_proc && lengths_to[i] > max_send_size) {
187  max_send_size = lengths_to[i];
188  }
189  }
190  }
191  else { /* Not grouped by processor. More complex data structures. */
192  /* Sum starts values to be offsets into indices_to array. */
193  nsends = (starts[0] != 0);
194  for (int i = 1; i < nprocs; i++) {
195  if (starts[i] != 0)
196  ++nsends;
197  starts[i] += starts[i - 1];
198  }
199 
200  for (int i = nprocs - 1; i; i--) {
201  starts[i] = starts[i - 1];
202  }
203 
204  starts[0] = 0;
205 
206  indices_to = (nactive > 0) ?
207  Teuchos::arcp(new int[nactive], 0, nactive, true) : Teuchos::null;
208 
209  for (int i = 0; i < nvals; i++) {
210  int proc = assign[i];
211  if (proc >= 0) {
212  indices_to[starts[proc]] = i;
213  ++starts[proc];
214  }
215  }
216 
217  /* Indices_to array now has the data in clumps for each processor. */
218  /* Now reconstruct starts array to index into indices_to. */
219  for (int i = nprocs - 1; i; i--) {
220  starts[i] = starts[i - 1];
221  }
222  starts[0] = 0;
223  starts[nprocs] = nactive;
224 
225  /* Construct lengths_to, starts_to and procs_to arrays. */
226  /* Note: If indices_to is needed, procs are in increasing order */
227  lengths_to.resize(nsends);
228  starts_to.resize(nsends);
229  procs_to.resize(nsends);
230 
231  int j = 0;
232  max_send_size = 0;
233  for (int i = 0; i < nprocs; i++) {
234  if (starts[i + 1] != starts[i]) {
235  starts_to[j] = starts[i];
236  lengths_to[j] = starts[i + 1] - starts[i];
237  if (i != my_proc && lengths_to[j] > max_send_size) {
238  max_send_size = lengths_to[j];
239  }
240  procs_to[j] = i;
241  j++;
242  }
243  }
244  }
245 
246  /* Now change nsends to count only non-self messages */
247  nsends -= self_msg;
248 
249  /* Determine how many messages & what length I'll receive. */
250  Teuchos::ArrayRCP<int> lengths_from; /* lengths of messages I'll receive */
251  Teuchos::ArrayRCP<int> procs_from; /* processors I'll receive from */
252  int out_of_mem = 0; // TODO refactor this bit
253 
254  int comm_flag = invert_map(lengths_to, procs_to, nsends, self_msg,
255  lengths_from, procs_from, &nrecvs, my_proc, nprocs,
256  out_of_mem, tag, comm);
257 
258  /* pointers for where to put recv data */
259  Teuchos::ArrayRCP<int> starts_from(
260  new int[nrecvs + self_msg], 0, nrecvs + self_msg, true);
261  int j = 0;
262  for (int i = 0; i < nrecvs + self_msg; i++) {
263  starts_from[i] = j;
264  j += lengths_from[i];
265  }
266 
267  if (comm_flag != 0) {
268  throw std::logic_error("Failed to construct Zoltan2_Directory_Comm");
269  }
270 
271  int total_recv_size = 0; /* total size of messages I recv */
272  for (int i = 0; i < nrecvs + self_msg; i++) {
273  total_recv_size += lengths_from[i];
274  }
275 
276  plan_forward = new Zoltan2_Directory_Plan;
277  plan_forward->lengths_to = lengths_to;
278  plan_forward->starts_to = starts_to;
279  plan_forward->procs_to = procs_to;
280  plan_forward->indices_to = indices_to;
281  plan_forward->lengths_from = lengths_from;
282  plan_forward->starts_from = starts_from;
283  plan_forward->procs_from = procs_from;
284  plan_forward->nvals = nvals;
285  plan_forward->nvals_recv = total_recv_size;
286  plan_forward->nrecvs = nrecvs;
287  plan_forward->nsends = nsends;
288  plan_forward->self_msg = self_msg;
289  plan_forward->max_send_size = max_send_size;
290  plan_forward->total_recv_size = total_recv_size;
291  plan_forward->maxed_recvs = 0;
292  plan_forward->comm = comm;
293 
294  if (MPI_RECV_LIMIT > 0) {
295 
296  throw std::logic_error("Zoltan2_Directory_Comm.coom untested refactored code (1)"); // needs unit testing
297 
298  /* If we have a limit to the number of posted receives we are allowed,
299  ** and our plan has exceeded that, then switch to an MPI_Alltoallv so
300  ** that we will have fewer receives posted when we do the communication.
301  */
302  int global_nrecvs;
303  Teuchos::reduceAll(*comm, Teuchos::REDUCE_MAX, 1, &nrecvs, &global_nrecvs);
304  if (global_nrecvs > MPI_RECV_LIMIT){
305  plan_forward->maxed_recvs = 1;
306  }
307  }
308 
309  if (plan_forward->maxed_recvs == 0) {
310  // See notes in header for MPI_Request
311  plan_forward->request.resize(plan_forward->nrecvs);
312  }
313 
314  nrec = total_recv_size;
315 }
316 
318 {
319  delete plan_forward;
320 }
321 
322 int Zoltan2_Directory_Comm::invert_map(
323  const Teuchos::ArrayRCP<int> &lengths_to, /* number of items I'm sending */
324  const Teuchos::ArrayRCP<int> &procs_to, /* procs I send to */
325  int nsends, /* number of messages I'll send */
326  int self_msg, /* do I copy data to myself? */
327  Teuchos::ArrayRCP<int> &lengths_from, /* number of items I'm receiving */
328  Teuchos::ArrayRCP<int> &procs_from, /* procs I recv lengths from */
329  int *pnrecvs, /* number of messages I receive */
330  int my_proc, /* my processor number */
331  int nprocs, /* total number of processors */
332  int /* out_of_mem */, /* tell everyone I'm out of memory? */
333  int tag, /* message tag I can use */
334  Teuchos::RCP<const Teuchos::Comm<int> > comm) /* communicator */
335 {
336  Teuchos::ArrayRCP<int> msg_count(new int[nprocs], 0, nprocs, true);
337  Teuchos::ArrayRCP<int> counts(new int[nprocs], 0, nprocs, true);
338  for(int i = 0; i < nprocs; ++i) {
339  msg_count[i] = 0;
340  counts[i] = 1;
341  }
342 
343  for (int i = 0; i < nsends + self_msg; i++) {
344  msg_count[procs_to[i]] = 1;
345  }
346 
347  /*
348  * KDDKDD: Replaced MPI_Reduce_scatter with MPI_Reduce and MPI_Scatter
349  * KDDKDD: to avoid reported problems with MPICH 1.5.2.1.
350  * KDDKDD: Some sort of MPI_TYPE_INDEXED error.
351  * KDDKDD: Bug fix suggested by Clark Dohrmann and Rob Hoekstra.
352  * KDDKDD: July 20, 2004
353 
354  MPI_Reduce_scatter((void *) msg_count, (void *) &nrecvs, counts, MPI_INT,
355  MPI_SUM, comm);
356  */
357  Teuchos::reduceAll<int>(*comm, Teuchos::REDUCE_SUM, nprocs,
358  msg_count.getRawPtr(), counts.getRawPtr());
359 
360  int nrecvs = 0; /* number of messages I'll receive */
361 
362  Teuchos::scatter<int, int>(&(counts[0]), 1, &nrecvs, 1, 0, *comm);
363 
364  int max_nrecvs = 0;
365  if (my_proc == 0) {
366  for (int i=0; i < nprocs; i++) {
367  if (counts[i] > max_nrecvs) {
368  max_nrecvs = counts[i];
369  }
370  }
371  }
372 
373  Teuchos::broadcast(*comm, 0, &max_nrecvs);
374 
375  if(nrecvs > 0) {
376  lengths_from.resize(nrecvs); /* number of items I'm receiving */
377  procs_from.resize(nrecvs); /* processors I'll receive from */
378  }
379 
380  if (MPI_RECV_LIMIT == 0 || max_nrecvs <= MPI_RECV_LIMIT) {
381  // See notes in header for MPI_Request
382  Teuchos::ArrayRCP<Teuchos::RCP<Teuchos::CommRequest<int> > > requests(nrecvs);
383 
384  /* Note: I'm counting on having a unique tag or some of my incoming */
385  /* messages might get confused with others. */
386  for (int i=0; i < nrecvs; i++) {
387 #ifdef HAVE_MPI // Teuchos::ireceive not implemented for Serial - Serial is just for debugging
388  Teuchos::ArrayRCP<int> single_elem(&lengths_from[i], 0, 1, false);
389  requests[i] = Teuchos::ireceive(single_elem, MPI_ANY_SOURCE, tag, *comm);
390 #endif
391  }
392 
393  for (int i=0; i < nsends+self_msg; i++) {
394 #ifdef HAVE_MPI // Teuchos::send not implemented for Serial - Serial is just for debugging
395  Teuchos::send(&lengths_to[i], 1, procs_to[i], tag, *comm);
396 #endif
397  }
398 
399  for (int i=0; i < nrecvs; i++) {
400 #ifdef HAVE_MPI
401  procs_from[i] = requests[i]->wait()->getSourceRank();
402 #else
403  // above Teuchos MPI calls not supported for Serial so manually do the transfer.
404  // We don't really need Serial for this class but it helps with debugging to have a serial test that can run.
405  lengths_from[i] = lengths_to[i];
406 #endif
407  }
408 
409  }
410  else { /* some large HPC machines have a limit on number of posted receives */
411  Teuchos::ArrayRCP<int> sendbuf(new int[nprocs], 0, nprocs, true);
412  Teuchos::ArrayRCP<int> recvbuf(new int[nprocs], 0, nprocs, true);
413 
414  for (int i=0; i < nsends + self_msg; i++) {
415  sendbuf[procs_to[i]] = lengths_to[i];
416  }
417 
418  throw std::logic_error("Zoltan2_Directory_Comm.coom untested refactored code (2)"); // needs unit testing
419  // Did not refactor this - need Teuchos form but this is not tested code.
420  // MPI_Alltoall(&(sendbuf[0]), 1, MPI_INT, &(recvbuf[0]), 1, MPI_INT, getRawComm());
421 
422  for (int i=0, j=0; i < nprocs; i++) {
423  if (recvbuf[i] > 0){
424  lengths_from[j] = recvbuf[i];
425  procs_from[j] = i;
426  if (++j == nrecvs) {
427  break;
428  }
429  }
430  }
431  }
432 
433  /* Sort recv lists to keep execution deterministic (e.g. for debugging) */
434  sort_ints(procs_from, lengths_from);
435 
436  *pnrecvs = nrecvs - self_msg; /* Only return number of true messages */
437 
438  return 0;
439 }
440 
441 int Zoltan2_Directory_Comm::sort_ints(
442  Teuchos::ArrayRCP<int> &vals_sort, /* values to be sorted */
443  Teuchos::ArrayRCP<int> &vals_other) /* other array to be reordered w/ sort */
444 {
445  // TODO: Check - perhaps we can skip all of these for efficiency
446  if (vals_sort == Teuchos::null || vals_sort.size() == 0) {
447  return 1;
448  }
449  if (vals_other == Teuchos::null || vals_other.size() == 0) {
450  return 1;
451  }
452  if (vals_sort == Teuchos::null || vals_sort.size() == 1) {
453  return 0; /* fastest way to sort 1 item is to return */
454  }
455 
456  /* find largest value (sort sometimes used for non processor lists) */
457  int already_sorted = 1; /* flag indicating whether vals_sort is
458  already sorted; can exit early and skip
459  memory allocations if it is. */
460  int top = vals_sort[0]; /* largest integer to sort, smallest is assumed 0 */
461  for (Teuchos::ArrayRCP<int>::size_type i = 1; i < vals_sort.size(); i++) {
462  if (vals_sort[i-1] > vals_sort[i]) {
463  already_sorted = 0;
464  }
465  if (top < vals_sort[i]) {
466  top = vals_sort[i];
467  }
468  }
469 
470  if (already_sorted) {
471  return 0;
472  }
473 
474  Teuchos::ArrayRCP<int> store(new int[top+2], 0, top+2, true);
475  for(int n = 0; n < store.size(); ++n) {
476  store[n] = 0;
477  }
478 
479  Teuchos::ArrayRCP<int> copy_sort(new int[vals_sort.size()], 0, vals_sort.size(), true);
480  for(Teuchos::ArrayRCP<int>::size_type n = 0; n < copy_sort.size(); ++n) {
481  copy_sort[n] = vals_sort[n]; // TODO - use deepCopy method?
482  }
483 
484  Teuchos::ArrayRCP<int> copy_other(new int[vals_other.size()], 0, vals_other.size(), true);
485  for(Teuchos::ArrayRCP<int>::size_type n = 0; n < copy_other.size(); ++n) {
486  copy_other[n] = vals_other[n]; // TODO - use deepCopy method?
487  }
488 
489  // TODO: May want to modernize this ptr handling - however I didn't want
490  // to introduce inefficiencies so for now have kept the original structure
491  int *p = &(store[1]);
492  for (Teuchos::ArrayRCP<int>::size_type i = 0; i < vals_sort.size(); i++) {
493  p[copy_sort[i]]++; /* count number of occurances */
494  }
495 
496  for (int i = 1; i < top+1; i++) {
497  p[i] += p[i-1]; /* compute partial sums */
498  }
499  /* assert: p[top] = nvals */
500  p = &(store[0]); /* effectively shifts down by one */
501  for (Teuchos::ArrayRCP<int>::size_type i = 0; i < vals_sort.size(); i++) {
502  vals_sort[p[copy_sort[i]]] = copy_sort[i];
503  vals_other[p[copy_sort[i]]] = copy_other[i];
504  ++p[copy_sort[i]];
505  }
506 
507  return 0;
508 }
509 
511  int tag, /* message tag for communicating */
512  const Teuchos::ArrayRCP<char> &send_data, /* array of data I currently own */
513  int nbytes, /* msg size */
514  Teuchos::ArrayRCP<char> &recv_data) /* array of data I'll own after comm */
515 {
516  int status = 0;
517 
518  if (!plan_forward->maxed_recvs) {
519  status = do_post (plan_forward, tag, send_data, nbytes, recv_data);
520  if (status == 0) {
521  status = do_wait (plan_forward, tag, send_data, nbytes, recv_data);
522  }
523  }
524  else {
525  status = do_all_to_all(plan_forward, send_data, nbytes, recv_data);
526  }
527 
528  return status;
529 }
530 
531 int Zoltan2_Directory_Comm::do_post(
532  Zoltan2_Directory_Plan *plan, /* communication data structure */
533  int tag, /* message tag for communicating */
534  const Teuchos::ArrayRCP<char> &send_data, /* array of data I currently own */
535  int nbytes, /* msg size */
536  Teuchos::ArrayRCP<char> &recv_data) /* array of data I'll own after comm */
537 {
538  /* Check input parameters */
539  if (!plan) {
540  throw std::logic_error("Communication plan = NULL");
541  }
542 
543  /* If not point to point, currently we do synchroneous communications */
544  if (plan->maxed_recvs) {
545  throw std::logic_error("Zoltan2_Directory_Comm.coom untested refactored code (3)"); // needs unit testing
546  return do_all_to_all(plan, send_data, nbytes, recv_data);
547  }
548 
549  int my_proc = plan->comm->getRank(); /* processor ID */
550  if ((plan->nsends + plan->self_msg) && !send_data.size()) {
551  throw std::logic_error("Zoltan2_Directory_Comm.coom untested refactored code (4)"); // needs unit testing
552  size_t sum = 0;
553  if (plan->sizes_to.size()) { /* Not an error if all sizes_to == 0 */
554  for (int i = 0; i < (plan->nsends + plan->self_msg); i++) {
555  sum += plan->sizes_to[i];
556  }
557  }
558  if (!plan->sizes_to.size() || (plan->sizes_to.size() && sum)) {
559  throw std::logic_error("nsends not zero, but send_data = NULL");
560  }
561  }
562  if ((plan->nrecvs + plan->self_msg) && recv_data == Teuchos::null) {
563  throw std::logic_error("Zoltan2_Directory_Comm.coom untested refactored code (5)"); // needs unit testing
564  size_t sum = 0;
565  if (plan->sizes_from != Teuchos::null) /* Not an error if all sizes_from == 0 */
566  for (int i = 0; i < (plan->nrecvs + plan->self_msg); i++)
567  sum += plan->sizes_from[i];
568  if (!plan->sizes_from.size() || (plan->sizes_from.size() && sum)) {
569  throw std::logic_error("nrecvs not zero, but recv_data = NULL");
570  }
571  }
572 
573  /* Post irecvs */
574  if (plan->indices_from == Teuchos::null) {
575  /* Data can go directly into user space. */
576  plan->recv_buff = recv_data;
577  }
578  else { /* Need to buffer receive to reorder */
579  size_t rsize = (size_t) (plan->total_recv_size) * (size_t) nbytes;
580  plan->recv_buff = Teuchos::arcp(new char[rsize], 0, rsize, true);
581  }
582 
583  size_t self_recv_address = 0; /* where in recv_data self info starts */
584  if (!plan->using_sizes) { /* All data the same size */
585  int k = 0;
586  for (int i = 0; i < plan->nrecvs + plan->self_msg; i++) {
587  if (plan->procs_from[i] != my_proc) {
588  Teuchos::ArrayRCP<char> subview(
589  &plan->getRecvBuff().getRawPtr()[plan->starts_from[i] * nbytes],
590  0, plan->lengths_from[i] * nbytes, false);
591  plan->request[k] = Teuchos::ireceive(subview, plan->procs_from[i], tag, *comm_);
592  k++;
593  }
594  else {
595  self_recv_address = (size_t)(plan->starts_from[i]) * (size_t)nbytes;
596  }
597  }
598  }
599  else { /* Data of varying sizes */
600  int k = 0;
601  for (int i = 0; i < plan->nrecvs + plan->self_msg; i++) {
602  if (plan->procs_from[i] != my_proc) {
603  if (plan->sizes_from[i]) {
604  Teuchos::ArrayRCP<char> subview(
605  &plan->getRecvBuff().getRawPtr()[(size_t)(plan->starts_from_ptr[i])
606  * (size_t)nbytes],
607  0, plan->sizes_from[i] * nbytes, false);
608  plan->request[k] = Teuchos::ireceive(subview, plan->procs_from[i], tag, *comm_);
609  }
610  else {
611  plan->request[k] = Teuchos::null;
612  }
613  k++;
614  }
615  else {
616  self_recv_address =
617  (size_t)(plan->starts_from_ptr[i]) * (size_t)nbytes;
618  }
619  }
620  }
621 
622  Teuchos::ArrayRCP<char> send_buff;
623  if(plan->indices_to != Teuchos::null) {
624  send_buff = Teuchos::arcp(new char[plan->max_send_size * nbytes], 0, plan->max_send_size * nbytes, true);
625  }
626 
627  /* Barrier to ensure irecvs are posted before doing any sends. */
628  /* Simultaneously see if anyone out of memory */
629  int out_of_mem = 0;
630  // WARNING - do not delete this without proper barrier added as replacmeent.
631  // I'm refactoring memory handling so probably we won't use out_of_mem
632  // in the new version but we must still preserve a barrier here or get
633  // intermittent failures.
634  // I'll keep the memory reduce for now since we may end up with a memory
635  // handling anyways.
636  int global_out_of_mem;
637  Teuchos::reduceAll(*plan->comm, Teuchos::REDUCE_SUM, 1, &out_of_mem,
638  &global_out_of_mem);
639 
640  /* Send out data */
641 
642  /* Scan through procs_to list to start w/ higher numbered procs */
643  /* This should balance message traffic. */
644 
645  int nblocks = plan->nsends + plan->self_msg; /* # procs needing my data */
646  int proc_index = 0; /* loop counter over procs to send to */
647  while (proc_index < nblocks && plan->procs_to[proc_index] < my_proc) {
648  proc_index++;
649  }
650  if (proc_index == nblocks) {
651  proc_index = 0;
652  }
653 
654  if (!plan->using_sizes) { /* Data all of same size */
655  if (plan->indices_to == Teuchos::null) { /* data already blocked by processor. */
656  int self_num = 0; /* where in send list my_proc appears */
657  for (int i = proc_index, j = 0; j < nblocks; j++) {
658  if (plan->procs_to[i] != my_proc) {
659  Teuchos::ArrayRCP<char> subview(
660  &send_data[plan->starts_to[i] * nbytes],
661  0, plan->lengths_to[i] * nbytes, false);
662  Teuchos::readySend(subview.getRawPtr(), static_cast<int>(subview.size()), plan->procs_to[i], tag, *comm_);
663  }
664  else {
665  self_num = i;
666  }
667  if (++i == nblocks) {
668  i = 0;
669  }
670  }
671 
672  if (plan->self_msg) { /* Copy data to self. */
673  /* I use array+offset instead of &(array[offset]) because of
674  a bug with PGI v9 */
675  /* I use memmove because I'm not sure that the pointer are not
676  overlapped. */
677 
678  // TODO: Refactor to make C++ and remove getRawPtr, etc
679  memmove(
680  plan->getRecvBuff().getRawPtr()+self_recv_address,
681  send_data.getRawPtr()+(size_t)(plan->starts_to[self_num])*(size_t)nbytes,
682  (size_t) (plan->lengths_to[self_num]) * (size_t) nbytes);
683  }
684  }
685  else { /* Not blocked by processor. Need to buffer. */
686  int self_index = 0; /* send offset for data I'm keeping */
687  int self_num = 0; /* where in send list my_proc appears */
688  for (int i = proc_index, jj = 0; jj < nblocks; jj++) {
689  if (plan->procs_to[i] != my_proc) {
690  /* Need to pack message first. */
691  size_t offset = 0; /* offset into array I'm copying into */
692  int j = plan->starts_to[i];
693  for (int k = 0; k < plan->lengths_to[i]; k++) {
694  memcpy(&send_buff[offset],
695  &send_data[(size_t)(plan->indices_to[j++]) * (size_t)nbytes], nbytes);
696  offset += nbytes;
697  }
698  Teuchos::readySend(&send_buff[0], plan->lengths_to[i] * nbytes,
699  plan->procs_to[i], tag, *comm_);
700  }
701  else {
702  self_num = i;
703  self_index = plan->starts_to[i];
704  }
705  if (++i == nblocks)
706  i = 0;
707  }
708 
709  if (plan->self_msg) { /* Copy data to self. */
710  for (int k = 0; k < plan->lengths_to[self_num]; k++) {
711  memcpy(&(plan->getRecvBuff())[self_recv_address],
712  &send_data[
713  (size_t)(plan->indices_to[self_index++]) * (size_t)nbytes],
714  nbytes);
715  self_recv_address += nbytes;
716  }
717  }
718  }
719  }
720  else { /* Data of differing sizes */
721  if (plan->indices_to == Teuchos::null) { /* data already blocked by processor. */
722  int self_num = 0; /* where in send list my_proc appears */
723  for (int i = proc_index, j = 0; j < nblocks; j++) {
724  if (plan->procs_to[i] != my_proc) {
725  if (plan->sizes_to[i]) {
726  Teuchos::readySend(&send_data[plan->starts_to_ptr[i] * nbytes], plan->sizes_to[i] * nbytes,
727  plan->procs_to[i], tag, *comm_);
728  }
729  }
730  else
731  self_num = i;
732 
733  if (++i == nblocks)
734  i = 0;
735  }
736  if (plan->self_msg) { /* Copy data to self. */
737  if (plan->sizes_to[self_num]) {
738  char* lrecv = &plan->getRecvBuff().getRawPtr()[self_recv_address];
739  const char* lsend =
740  &send_data.getRawPtr()[(size_t)(plan->starts_to_ptr[self_num]) * (size_t)nbytes];
741  int sindex = plan->sizes_to[self_num], idx;
742  for (idx=0; idx<nbytes; idx++) {
743  memcpy(lrecv, lsend, sindex);
744  lrecv += sindex;
745  lsend += sindex;
746  }
747  }
748  }
749  }
750  else { /* Not blocked by processor. Need to buffer. */
751  int self_num = 0; /* where in send list my_proc appears */
752  for (int i = proc_index, jj = 0; jj < nblocks; jj++) {
753  if (plan->procs_to[i] != my_proc) {
754  size_t offset = 0; /* offset into array I'm copying into */
755  int j = plan->starts_to[i];
756  for (int k = 0; k < plan->lengths_to[i]; k++) {
757  if (plan->sizes[plan->indices_to[j]]) {
758  memcpy(&send_buff[offset],
759  &send_data[(size_t)(plan->indices_to_ptr[j]) * (size_t)nbytes],
760  (size_t)(plan->sizes[plan->indices_to[j]]) * (size_t)nbytes);
761  offset +=
762  (size_t)(plan->sizes[plan->indices_to[j]]) * (size_t)nbytes;
763  }
764  j++;
765  }
766  if (plan->sizes_to[i]) {
767  Teuchos::readySend(&send_buff[0], plan->sizes_to[i] * nbytes,
768  plan->procs_to[i], tag, *comm_);
769  }
770  }
771  else
772  self_num = i;
773 
774  if (++i == nblocks)
775  i = 0;
776  }
777  if (plan->self_msg) { /* Copy data to self. */
778  if (plan->sizes_to[self_num]) {
779  int j = plan->starts_to[self_num];
780  for (int k = 0; k < plan->lengths_to[self_num]; k++) {
781  int kk = plan->indices_to_ptr[j];
782  char* lrecv = &(plan->getRecvBuff())[self_recv_address];
783  size_t send_idx = (size_t)kk * (size_t)nbytes;
784  const char* lsend = &send_data[send_idx];
785  int sindex = plan->sizes[plan->indices_to[j]], idx;
786  for (idx=0; idx<nbytes; idx++) {
787  memcpy(lrecv, lsend, sindex);
788  lrecv += sindex;
789  lsend += sindex;
790  }
791  self_recv_address += (size_t)(plan->sizes[plan->indices_to[j]])
792  * (size_t) nbytes;
793  j++;
794  }
795  }
796  }
797  }
798  }
799 
800  return 0;
801 }
802 
803 int Zoltan2_Directory_Comm::do_wait(
804  Zoltan2_Directory_Plan *plan, /* communication data structure */
805  int /* tag */, /* message tag for communicating */
806  const Teuchos::ArrayRCP<char> &/* send_data */, /* array of data I currently own */
807  int nbytes, /* msg size */
808  Teuchos::ArrayRCP<char> &recv_data) /* array of data I'll own after comm */
809 {
810  /* If not point to point, currently we do synchroneous communications */
811  if (plan->maxed_recvs){
812  /* Do nothing */
813  return 0;
814  }
815 
816  int my_proc = plan->comm->getRank(); /* processor ID */
817 
818  /* Wait for messages to arrive & unpack them if necessary. */
819  /* Note: since request is in plan, could wait in later routine. */
820  if (plan->indices_from == Teuchos::null) { /* No copying required */
821  if (plan->nrecvs > 0) {
822  Teuchos::waitAll(*comm_, plan->request());
823  }
824  }
825  else { /* Need to copy into recv_data. */
826  int self_num; /* where in send list my_proc appears */
827  size_t offsetDst = 0;
828  if (plan->self_msg) { /* Unpack own data before waiting */
829  for (self_num = 0; self_num < plan->nrecvs + plan->self_msg;
830  self_num++) {
831  if (plan->procs_from[self_num] == my_proc) {
832  break;
833  }
834  }
835 
836  if(plan->sizes_from.size()) {
837  // NEW METHOD for variable sized data
838  // This will NOT put the data in order but during the directory read
839  // of the buffer which follows, this data gets sorted anyways since each
840  // element has an index to tell where it belongs. I am not sure yet if
841  // we must sort here, or if it's fine to allow the sort to happen through
842  // the index value. This needs furthe discussion.
843  memcpy(&recv_data[offsetDst * (size_t)nbytes],
844  &(plan->getRecvBuff())[plan->starts_from_ptr[self_num] * (size_t)nbytes],
845  plan->sizes_from[self_num] * (size_t)nbytes);
846  offsetDst += plan->sizes_from[self_num];
847  }
848  else {
849  int k = plan->starts_from[self_num];
850  for (int j = plan->lengths_from[self_num]; j; j--) {
851  memcpy(&recv_data[(size_t)(plan->indices_from[k]) * (size_t)nbytes],
852  &(plan->getRecvBuff())[(size_t)k * (size_t)nbytes], nbytes);
853  k++;
854  }
855  }
856  }
857  else {
858  self_num = plan->nrecvs;
859  }
860 
861  for (int jj = 0; jj < plan->nrecvs; jj++) {
862  // TODO: Refactored directory to use Teuchos comm but we have no Teuchos::waitAny
863  // Short term fix is we just call wait() on each request in serial.
864  // When we add Teuchos::waitAny we can replace it for the old version
865  // which is commented out below.
866  plan->request[jj]->wait();
867  int index = jj;
868 
869  // Old form with MPI_Waitany
870  // MPI_Status status; /* return from Waitany */
871  // int index;
872  // MPI_Waitany(plan->nrecvs, &plan->request[0], &index, &status);
873 
874  if (index >= self_num) {
875  index++;
876  }
877 
878  if(plan->sizes_from.size()) {
879  // NEW METHOD for variable sized data
880  // This will NOT put the data in order but during the directory read
881  // of the buffer which follows, this data gets sorted anyways since each
882  // element has an index to tell where it belongs. I am not sure yet if
883  // we must sort here, or if it's fine to allow the sort to happen through
884  // the index value. This needs furthe discussion.
885  memcpy(&recv_data[offsetDst * (size_t)nbytes],
886  &plan->getRecvBuff().getRawPtr()[plan->starts_from_ptr[index] * (size_t)nbytes],
887  plan->sizes_from[index] * (size_t)nbytes);
888  offsetDst += plan->sizes_from[index];
889  }
890  else {
891  int k = plan->starts_from[index];
892  for (int j = plan->lengths_from[index]; j; j--) {
893  memcpy(&recv_data.getRawPtr()[(size_t)(plan->indices_from[k]) * (size_t)nbytes],
894  &plan->getRecvBuff().getRawPtr()[(size_t)k * (size_t)nbytes], nbytes);
895  k++;
896  }
897  }
898  }
899  }
900 
901  return 0;
902 }
903 
904 /* Do_Post would require posting more receives than allowed on this platform.
905 * We use MPI_AlltoAllv instead, which is probably implemented such that each
906 * process does one receive at a time.
907 */
908 
909 int Zoltan2_Directory_Comm::do_all_to_all(
910  Zoltan2_Directory_Plan *plan, /* communication data structure */
911  const Teuchos::ArrayRCP<char> &send_data, /* array of data I currently own */
912  int nbytes, /* msg size */
913  Teuchos::ArrayRCP<char> &recv_data) /* array of data I'll own after comm */
914 {
915  throw std::logic_error("Zoltan2_Directory_Comm.coom untested refactored code (6)"); // needs unit testing
916 
917  int sm = (plan->self_msg > 0) ? 1 : 0;
918 
919  int nSendMsgs = plan->nsends + sm;
920  int nRecvMsgs = plan->nrecvs + sm;
921 
922  int nSendItems = 0;
923  for (int i=0; i <nSendMsgs; i++) {
924  nSendItems += plan->lengths_to[i];
925  }
926  int nRecvItems = 0;
927  for (int i=0; i <nRecvMsgs; i++) {
928  nRecvItems += plan->lengths_from[i];
929  }
930 
931  int nprocs = plan->comm->getSize();
932 
933  Teuchos::ArrayRCP<int> outbufCounts(new int[nprocs], 0, nprocs, true);
934  Teuchos::ArrayRCP<int> outbufOffsets(new int[nprocs], 0, nprocs, true);
935  Teuchos::ArrayRCP<int> inbufCounts(new int[nprocs], 0, nprocs, true);
936  Teuchos::ArrayRCP<int> inbufOffsets(new int[nprocs], 0, nprocs, true);
937 
938  /* The *_to fields of the plan refer to the items in the send_data buffer,
939  * and how to pull out the correct items for each receiver. The
940  * *_from fields of the plan refer to the recv_data buffer. Items
941  * arrive in process rank order, and these fields tell us where to
942  * put them in the recv_data buffer.
943  */
944 
945  /* CREATE SEND BUFFER */
946 
947  int sorted = 0;
948  if (plan->indices_to == Teuchos::null){
949  sorted = 1;
950  for (int i=1; i< nSendMsgs; i++){
951  if (plan->starts_to[i] < plan->starts_to[i-1]){
952  sorted = 0;
953  break;
954  }
955  }
956  }
957 
958  Teuchos::ArrayRCP<char> outbuf;
959  Teuchos::ArrayRCP<char> inbuf;
960  Teuchos::ArrayRCP<char> buf;
961 
962  if (plan->sizes_to.size()){
963  /*
964  * Each message contains items for a process, and each item may be
965  * a different size.
966  */
967 
968  int outbufLen = 0;
969  for (int i = 0; i < nSendMsgs; i++){
970  outbufLen += plan->sizes_to[i];
971  }
972 
973  if (plan->indices_to != Teuchos::null) {
974  /*
975  * items are not grouped by message
976  */
977  buf.resize(outbufLen*nbytes);
978  outbuf.resize(outbufLen*nbytes);
979  char * pBufPtr = &(outbuf[0]);
980  int i = 0;
981  int k = 0;
982  for (int p = 0; p < nprocs; p++) {
983 
984  int length = 0;
985 
986  if (i < nSendMsgs){
987  if (plan->procs_to[i] == p){ /* procs_to is sorted */
988 
989  for (int j=0; j < plan->lengths_to[i]; j++,k++){
990  int itemSize = plan->sizes[plan->indices_to[k]] * nbytes;
991  int offset = plan->indices_to_ptr[k] * nbytes;
992 
993  memcpy(pBufPtr, &(send_data[0]) + offset, itemSize);
994 
995  pBufPtr += itemSize;
996  length += itemSize;
997  }
998  i++;
999  }
1000  }
1001 
1002  outbufCounts[p] = length;
1003  if (p){
1004  outbufOffsets[p] = outbufOffsets[p-1] + outbufCounts[p-1];
1005  }
1006  }
1007  }
1008  else{
1009  /*
1010  * items are stored contiguously for each message
1011  */
1012 
1013  if (!sorted || (plan->nvals > nSendItems) ){
1014  buf.resize(outbufLen*nbytes);
1015  outbuf.resize(outbufLen*nbytes);
1016  }
1017  else{
1018  /* All items in send_data are being sent, and they are sorted
1019  * in process rank order.
1020  */
1021  // TODO: Optimize - original just set the ptr...
1022  for(int n = 0; n < outbufLen*nbytes; ++n) {
1023  outbuf[n] = send_data[n];
1024  }
1025  }
1026 
1027  char * pBufPtr = &(outbuf[0]);
1028 
1029  int i = 0;
1030  for (int p = 0; p < nprocs; p++) {
1031 
1032  int length = 0;
1033 
1034  if (i < nSendMsgs){
1035  if (plan->procs_to[i] == p){ /* procs_to is sorted */
1036  length = plan->sizes_to[i] * nbytes;
1037  int offset = plan->starts_to_ptr[i] * nbytes;
1038 
1039  if ((!sorted || (plan->nvals > nSendItems)) && length){
1040  memcpy(pBufPtr, &(send_data[0]) + offset, length);
1041  pBufPtr += length;
1042  }
1043  i++;
1044  }
1045  }
1046 
1047  outbufCounts[p] = length;
1048  if (p){
1049  outbufOffsets[p] = outbufOffsets[p-1] + outbufCounts[p-1];
1050  }
1051  }
1052  }
1053  }
1054  else if (plan->indices_to != Teuchos::null) {
1055  /*
1056  * item sizes are constant, however the items belonging in a given
1057  * message may not be contiguous in send_data
1058  */
1059  buf.resize(nSendItems*nbytes);
1060  outbuf.resize(nSendItems*nbytes);
1061  char * pBufPtr = &(outbuf[0]);
1062  int i = 0;
1063  int k = 0;
1064  for (int p = 0; p < nprocs; p++){
1065 
1066  int length = 0;
1067 
1068  if (i < nSendMsgs){
1069  if (plan->procs_to[i] == p){ /* procs_to is sorted */
1070  for (int j=0; j < plan->lengths_to[i]; j++,k++) {
1071  int offset = plan->indices_to[k] * nbytes;
1072  memcpy(pBufPtr, &(send_data[0]) + offset, nbytes);
1073  pBufPtr += nbytes;
1074  }
1075  length = plan->lengths_to[i] * nbytes;
1076  i++;
1077  }
1078  }
1079 
1080  outbufCounts[p] = length;
1081  if (p){
1082  outbufOffsets[p] = outbufOffsets[p-1] + outbufCounts[p-1];
1083  }
1084  }
1085  }
1086  else{
1087 
1088  /* item sizes are constant, and items belonging to a
1089  * given message are always stored contiguously in send_data
1090  */
1091 
1092  if (!sorted || (plan->nvals > nSendItems)){
1093  buf.resize(nSendItems*nbytes);
1094  outbuf.resize(nSendItems*nbytes);
1095  }
1096  else{
1097  /* send_data is sorted by process, and we don't skip
1098  * any of the data in the buffer, so we can use send_data
1099  * in the alltoall call
1100  */
1101  // TODO: Optimize - original just set ptr
1102  outbuf = send_data;
1103  }
1104 
1105  char * pBufPtr = &(outbuf[0]);
1106 
1107  int i = 0;
1108  for (int p=0; p < nprocs; p++) {
1109 
1110  int length = 0;
1111 
1112  if (i < nSendMsgs){
1113  if (plan->procs_to[i] == p){ /* procs_to is sorted */
1114  int offset = plan->starts_to[i] * nbytes;
1115  length = plan->lengths_to[i] * nbytes;
1116 
1117  if ((!sorted || (plan->nvals > nSendItems)) && length){
1118  memcpy(pBufPtr, &(send_data[0]) + offset, length);
1119  pBufPtr += length;
1120  }
1121  i++;
1122  }
1123  }
1124 
1125  outbufCounts[p] = length;
1126  if (p){
1127  outbufOffsets[p] = outbufOffsets[p-1] + outbufCounts[p-1];
1128  }
1129  }
1130  }
1131 
1132  /* CREATE RECEIVE BUFFER */
1133 
1134  sorted = 0;
1135  int i;
1136  if (plan->indices_from == Teuchos::null) {
1137  sorted = 1;
1138  for (i=1; i< nRecvMsgs; i++) {
1139  if (plan->starts_from[i] < plan->starts_from[i-1]){
1140  sorted = 0;
1141  break;
1142  }
1143  }
1144  }
1145 
1146  if (sorted){
1147  /* Caller already expects received data to be ordered by
1148  * the sending process rank.
1149  */
1150 
1151  // TODO: Optimize - original just set ptr
1152  outbuf = send_data;
1153  inbuf = recv_data;
1154  }
1155  else {
1156  inbuf.resize(plan->total_recv_size * nbytes);
1157  }
1158 
1159  for (int p = 0; p < nprocs; p++) {
1160  int length = 0;
1161 
1162  if (i < nRecvMsgs){
1163  if (plan->procs_from[i] == p){
1164 
1165  if (!plan->using_sizes){
1166  length = plan->lengths_from[i] * nbytes;
1167  }
1168  else{
1169  length = plan->sizes_from[i] * nbytes;
1170  }
1171  i++;
1172  }
1173  }
1174 
1175  inbufCounts[p] = length;
1176  if (p){
1177  inbufOffsets[p] = inbufOffsets[p-1] + inbufCounts[p-1];
1178  }
1179  }
1180 
1181  /* EXCHANGE DATA */
1182  // Did not refactor this - need Teuchos form but not tested code
1183  // MPI_Alltoall(&(sendbuf[0]), 1, MPI_INT, &(recvbuf[0]), 1, MPI_INT,
1184  // getRawComm());
1185 
1186  /* WRITE RECEIVED DATA INTO USER'S BUFFER WHERE IT'S EXPECTED */
1187 
1188  if (!sorted){
1189 
1190  char * pBufPtr = &(inbuf[0]);
1191 
1192  if (!plan->using_sizes){
1193 
1194  /* each item in each message is nbytes long */
1195 
1196  if (plan->indices_from == Teuchos::null) {
1197  for (i=0; i < nRecvMsgs; i++){
1198  int offset = plan->starts_from[i] * nbytes;
1199  int length = plan->lengths_from[i] * nbytes;
1200  memcpy(&(recv_data[0]) + offset, pBufPtr, length);
1201  pBufPtr += length;
1202  }
1203  }
1204  else{
1205  int k = 0;
1206  for (i=0; i < nRecvMsgs; i++) {
1207 
1208  for (int j=0; j < plan->lengths_from[i]; j++,k++){
1209  int offset = plan->indices_from[k] * nbytes;
1210  memcpy(&(recv_data[0]) + offset, pBufPtr, nbytes);
1211  pBufPtr += nbytes;
1212  }
1213  }
1214  }
1215  }
1216  else{ /* (sizes!=NULL) && (indices_from!=NULL) not allowed by Zoltan_Comm_Resize */
1217 
1218  /* items can be different sizes */
1219 
1220  for (i=0; i < nRecvMsgs; i++){
1221  int offset = plan->starts_from_ptr[i] * nbytes;
1222  int length = plan->sizes_from[i] * nbytes;
1223  memcpy(&(recv_data[0]) + offset, pBufPtr, length);
1224  pBufPtr += length;
1225  }
1226  }
1227  }
1228 
1229  return 0;
1230 }
1231 
1233  int tag, /* message tag for communicating */
1234  const Teuchos::ArrayRCP<char> &send_data, /* array of data I currently own */
1235  int nbytes, /* msg size */
1236  const Teuchos::ArrayRCP<int> &sizes,
1237  Teuchos::ArrayRCP<char> &recv_data) /* array of data I'll own after reverse comm */
1238 {
1239  /* create plan->plan_reverse
1240  */
1241  int status = create_reverse_plan(tag, sizes);
1242 
1243  // NEW METHOD
1244  // Set up recv_data with the proper size
1245  // This information is only available after the above create_reverse_plan is
1246  // called so we can setup the return data with proper buffer size now.
1247  // However should we do this here?
1248  size_t new_size = plan_forward->plan_reverse->total_recv_size*nbytes;
1249  if(new_size > 0) {
1250  // have to be careful with new[0] which will for example turn a
1251  // arrayRCP.getRawPtr() from a valid NULL read to a debug assert fail.
1252  recv_data = Teuchos::arcp(new char[new_size], 0, new_size, true);
1253  }
1254 
1255  if (status == 0) {
1256 
1257  if (plan_forward->plan_reverse->maxed_recvs) {
1258 
1259  throw std::logic_error("Zoltan2_Directory_Comm.coom untested refactored code (7)"); // needs unit testing
1260 
1261  /* use MPI_Alltoallv to implement plan->plan_reverse, because comm_do_post
1262  * would post more receives that allowed on this machine
1263  */
1264 
1265  status = do_all_to_all(plan_forward->plan_reverse, send_data,
1266  nbytes, recv_data);
1267  }
1268  else {
1269  /* use post/wait which is faster when each sends to few
1270  */
1271  status = do_post(plan_forward->plan_reverse, tag, send_data,
1272  nbytes, recv_data);
1273 
1274  if (status == 0) {
1275  status = do_wait (plan_forward->plan_reverse, tag, send_data,
1276  nbytes, recv_data);
1277  }
1278  }
1279  }
1280 
1281  free_reverse_plan(plan_forward);
1282 
1283  return status;
1284 }
1285 
1286 void Zoltan2_Directory_Comm::free_reverse_plan(Zoltan2_Directory_Plan *plan)
1287 {
1288  if(!plan) {
1289  throw std::logic_error("Plan is NULL!");
1290  }
1291  delete plan->plan_reverse;
1292  plan->plan_reverse = NULL;
1293 }
1294 
1295 int Zoltan2_Directory_Comm::create_reverse_plan(
1296  int tag,
1297  const Teuchos::ArrayRCP<int> &sizes)/* variable size of objects (if not size 0) */
1298 {
1299  /* Check input parameters */
1300  if (!plan_forward){
1301  throw std::logic_error("memory error");
1302  }
1303 
1304  /* Let Zoltan_Comm_Do check the remaining parameters. */
1305  plan_forward->plan_reverse = new Zoltan2_Directory_Plan;
1306  plan_forward->plan_reverse->getInvertedValues(plan_forward);
1307 
1308  if (MPI_RECV_LIMIT > 0){
1309  /* If we have a limit to the number of posted receives we are allowed,
1310  ** and our plan has exceeded that, then switch to an MPI_Alltoallv so
1311  ** that we will have fewer receives posted when we do the communication.
1312  */
1313  int global_nsends;
1314  Teuchos::reduceAll<int>(*plan_forward->comm, Teuchos::REDUCE_SUM, 1,
1315  &plan_forward->nsends, &global_nsends);
1316  if (global_nsends > MPI_RECV_LIMIT){
1317  plan_forward->plan_reverse->maxed_recvs = 1;
1318  }
1319  }
1320 
1321  if (plan_forward->plan_reverse->maxed_recvs == 0) {
1322  // See notes in header for MPI_Request
1323  // plan_forward->plan_reverse->request = Teuchos::arcp(new MPI_Request[plan_forward->plan_reverse->nrecvs], 0, plan_forward->plan_reverse->nrecvs, true);
1324  // plan_forward->plan_reverse->status = Teuchos::arcp(new MPI_Status[plan_forward->plan_reverse->nrecvs], 0, plan_forward->plan_reverse->nrecvs, true);
1325  plan_forward->plan_reverse->request.resize(plan_forward->plan_reverse->nrecvs);
1326  }
1327 
1328  int sum_recv_sizes;
1329  int comm_flag = resize( plan_forward->plan_reverse,
1330  sizes, tag, &sum_recv_sizes);
1331 
1332  if (comm_flag != 0) {
1333  return(comm_flag);
1334  }
1335 
1336  if (sum_recv_sizes != plan_forward->plan_reverse->total_recv_size){
1337  /* Sanity check */
1338  return 1;
1339  }
1340 
1341  return 0;
1342 }
1343 
1345  const Teuchos::ArrayRCP<int> &sizes, /* size of each item I'm sending */
1346  int tag, /* message tag I can use */
1347  int *sum_recv_sizes) /* sum of the sizes of the items I'll receive */
1348 {
1349  return resize(plan_forward, sizes, tag, sum_recv_sizes);
1350 }
1351 
1353  Zoltan2_Directory_Plan *plan, /* communication plan object */
1354  const Teuchos::ArrayRCP<int> &sizes, /* size of each item I'm sending */
1355  int tag, /* message tag I can use */
1356  int *sum_recv_sizes) /* sum of the sizes of the items I'll receive */
1357 {
1358  /* If sizes vary, then I need to compute and communicate message lengths */
1359  /* First check if sizes array is NULL on all procs. */
1360  int my_proc = plan->comm->getRank(); /* my processor ID */
1361  int has_sizes = (sizes.size() != 0);
1362  int var_sizes; /* items have variable sizes? */
1363 
1364  Teuchos::reduceAll(*comm_, Teuchos::REDUCE_BOR, 1, &has_sizes, &var_sizes);
1365 
1366  if (var_sizes && plan->indices_from != Teuchos::null) {
1367  // NEW METHOD
1368  // Allow this to run now - the below implementation is working but perhaps
1369  // not done correctly for other usage cases I have not considered yet.
1370  // throw std::logic_error("Non-blocked, variable-sized recvs not supported");
1371  }
1372 
1373  int nsends = plan->nsends; /* number of msgs I'll send */
1374  int nrecvs = plan->nrecvs; /* number of msgs I'll recv */
1375  int self_msg = plan->self_msg;
1376 
1377  Teuchos::ArrayRCP<int> sizes_to;
1378  Teuchos::ArrayRCP<int> sizes_from;
1379  Teuchos::ArrayRCP<int> starts_to_ptr;
1380  Teuchos::ArrayRCP<int> starts_from_ptr;
1381  Teuchos::ArrayRCP<int> indices_to_ptr;
1382  Teuchos::ArrayRCP<int> indices_from_ptr;
1383 
1384  if (!var_sizes) { /* Easy case. Size = length */
1385  plan->total_recv_size = 0;
1386  for (int i = 0; i < nrecvs + self_msg; i++) {
1387  plan->total_recv_size += plan->lengths_from[i];
1388  }
1389 
1390  plan->max_send_size = 0;
1391  for (int i = 0; i < nsends + self_msg; i++) {
1392  if (plan->procs_to[i] != my_proc &&
1393  plan->lengths_to[i] > plan->max_send_size) {
1394  plan->max_send_size = plan->lengths_to[i];
1395  }
1396  }
1397  }
1398  else { /* Need to actually compute message sizes */
1399 
1400  // TODO Investigate purpose of the +1 in the old code. Is that used?
1401 
1402  // OLD CODE
1403  // plan->sizes.resize(plan->nvals + 1);
1404  // for (int i = 0; i < plan->nvals; i++) {
1405  // plan->sizes[i] = sizes[i];
1406  // }
1407 
1408  // NEW CODE
1409  plan->sizes = sizes; // can we just copy?
1410  plan->using_sizes = true;
1411 
1412  if(nsends + self_msg > 0) {
1413  sizes_to = Teuchos::arcp(
1414  new int[nsends + self_msg], 0, nsends + self_msg, true);
1415  for(int n = 0; n < sizes_to.size(); ++n) {
1416  sizes_to[n] = 0;
1417  }
1418  }
1419  if(nrecvs + self_msg > 0) {
1420  sizes_from = Teuchos::arcp(
1421  new int[nrecvs + self_msg], 0, nrecvs + self_msg, true);
1422  }
1423 
1424  /* Several cases:
1425  1. indices_to == NULL
1426  => starts_to != NULL, need to allocate, set starts_to_ptr
1427  2. indices_to != NULL (=> starts_to == NULL)
1428  need to allocate, set indices_to_ptr
1429  3,4. mirror cases for _from
1430  */
1431  if(nsends + self_msg > 0) {
1432  starts_to_ptr = Teuchos::arcp(
1433  new int[nsends + self_msg], 0, nsends + self_msg, true);
1434  }
1435  if (plan->indices_to == Teuchos::null) {
1436  /* Simpler case; sends already blocked by processor */
1437  Teuchos::ArrayRCP<int> index;
1438  Teuchos::ArrayRCP<int> sort_val;
1439  if(nsends + self_msg > 0) {
1440  index = Teuchos::arcp(
1441  new int[nsends + self_msg], 0, nsends + self_msg, true);
1442  sort_val = Teuchos::arcp(
1443  new int[nsends + self_msg], 0, nsends + self_msg, true);
1444  }
1445  for (int i = 0; i < nsends + self_msg; i++) {
1446  int j = plan->starts_to[i];
1447 
1448  for (int k = 0; k < plan->lengths_to[i]; k++) {
1449  sizes_to[i] += sizes[j++];
1450  }
1451  if (sizes_to[i] > plan->max_send_size &&
1452  plan->procs_to[i] != my_proc)
1453  plan->max_send_size = sizes_to[i];
1454  }
1455  for (int i = 0; i < nsends + self_msg; i++) {
1456  sort_val[i] = plan->starts_to[i];
1457  index[i] = i;
1458  }
1459  sort_ints(sort_val, index);
1460  int sum = 0;
1461  for (int i = 0; i < nsends + self_msg; i++) {
1462  starts_to_ptr[index[i]] = sum;
1463  sum += sizes_to[index[i]];
1464  }
1465  }
1466  else { /* Harder case, sends not blocked */
1467  Teuchos::ArrayRCP<int> offset;
1468  if(plan->nvals > 0) {
1469  offset = Teuchos::arcp(new int[plan->nvals], 0, plan->nvals, true);
1470  }
1471  indices_to_ptr.resize(plan->nvals);
1472 
1473  /* Compute address for every item in send array */
1474  int sum = 0;
1475  for (int i = 0; i < plan->nvals; i++) {
1476  offset[i] = sum;
1477  sum += sizes[i];
1478  }
1479 
1480  sum = 0;
1481  plan->max_send_size = 0;
1482  for (int i = 0; i < nsends + self_msg; i++) {
1483  starts_to_ptr[i] = sum;
1484  int j = plan->starts_to[i];
1485  for (int k = 0; k < plan->lengths_to[i]; k++) {
1486  indices_to_ptr[j] = offset[plan->indices_to[j]];
1487  sizes_to[i] += sizes[plan->indices_to[j++]];
1488  }
1489  if (sizes_to[i] > plan->max_send_size &&
1490  plan->procs_to[i] != my_proc)
1491  plan->max_send_size = sizes_to[i];
1492  sum += sizes_to[i];
1493  }
1494  }
1495 
1496  /* Note: This routine only gets message sizes, not object sizes. */
1497  /* Anything requiring item sizes requires more code */
1498  /* Should such functionality reside here? */
1499  exchange_sizes(sizes_to, plan->procs_to, nsends, self_msg,
1500  sizes_from, plan->procs_from, nrecvs,
1501  &plan->total_recv_size, my_proc, tag, plan->comm);
1502 
1503  if(nrecvs + self_msg > 0) {
1504  starts_from_ptr = Teuchos::arcp(
1505  new int[nrecvs + self_msg], 0, nrecvs + self_msg, true);
1506  }
1507 
1508  if (plan->indices_from == Teuchos::null) {
1509  /* Simpler case; recvs already blocked by processor */
1510  Teuchos::ArrayRCP<int> index;
1511  Teuchos::ArrayRCP<int> sort_val;
1512  if(nrecvs + self_msg > 0) {
1513  index = Teuchos::arcp(
1514  new int[nrecvs + self_msg], 0, nrecvs + self_msg, true);
1515  sort_val = Teuchos::arcp<int>(
1516  new int[nrecvs + self_msg], 0, nrecvs + self_msg, true);
1517  }
1518 
1519  for (int i = 0; i < nrecvs + self_msg; i++) {
1520  sort_val[i] = plan->starts_from[i];
1521  index[i] = i;
1522  }
1523  sort_ints(sort_val, index);
1524 
1525  int sum = 0;
1526  for (int i = 0; i < nrecvs + self_msg; i++) {
1527  starts_from_ptr[index[i]] = sum;
1528  sum += sizes_from[index[i]];
1529  }
1530  }
1531  else {
1532  // OLD COMMENT left here for reference but to be deleted
1533  /* Harder case, recvs not blocked */
1534  /* Not currently supported */
1535  /* Can't do w/o individual item sizes */
1536  /* Currently checked for at top of file */
1537 
1538  // NEW METHOD
1539  // Note this is currently just a duplicate of above block which is working
1540  // since I've set up do_wait to just copy the entire block. However I am
1541  // not sure yet how to organize this implementation and suspect this may
1542  // not be correct anyways - though it seems to work. After do_wait copies
1543  // the data (out of order) the directory will handle resorting it since
1544  // each element has index to indicate where it goes in the array. In the
1545  // original do_wait implementation it seemed it was setup to place the
1546  // elemenets in order, even though they would be sorted later, so that
1547  // part I need to discuss further
1548  Teuchos::ArrayRCP<int> index;
1549  Teuchos::ArrayRCP<int> sort_val;
1550  if(nrecvs + self_msg > 0) {
1551  index = Teuchos::arcp(
1552  new int[nrecvs + self_msg], 0, nrecvs + self_msg, true);
1553  sort_val = Teuchos::arcp(
1554  new int[nrecvs + self_msg], 0, nrecvs + self_msg, true);
1555  }
1556 
1557  for (int i = 0; i < nrecvs + self_msg; i++) {
1558  sort_val[i] = plan->starts_from[i];
1559  index[i] = i;
1560  }
1561  sort_ints(sort_val, index);
1562 
1563  int sum = 0;
1564  for (int i = 0; i < nrecvs + self_msg; i++) {
1565  starts_from_ptr[index[i]] = sum;
1566  sum += sizes_from[index[i]];
1567  }
1568  }
1569  }
1570  plan->sizes_to = sizes_to;
1571  plan->sizes_from = sizes_from;
1572  plan->starts_to_ptr = starts_to_ptr;
1573  plan->starts_from_ptr = starts_from_ptr;
1574  plan->indices_to_ptr = indices_to_ptr;
1575  plan->indices_from_ptr = indices_from_ptr;
1576 
1577  if (sum_recv_sizes) {
1578  *sum_recv_sizes = plan->total_recv_size;
1579  }
1580 
1581  return 0;
1582 }
1583 
1584 int Zoltan2_Directory_Comm::exchange_sizes(
1585  const Teuchos::ArrayRCP<int> &sizes_to, /* value I need to exchange (size of true msg) */
1586  const Teuchos::ArrayRCP<int> &procs_to, /* procs I send to */
1587  int nsends, /* number of messages I'll send */
1588  int self_msg, /* do I copy data to myself? */
1589  Teuchos::ArrayRCP<int> &sizes_from, /* (returned) size of all my receives */
1590  const Teuchos::ArrayRCP<int> &procs_from, /* procs I recv from */
1591  int nrecvs, /* number of messages I receive */
1592  int *total_recv_size, /* (returned) sum of all incoming sizes */
1593  int my_proc, /* my processor number */
1594  int tag, /* message tag I can use */
1595  Teuchos::RCP<const Teuchos::Comm<int> > /* comm */) { /* communicator */
1596 
1597  /* If sizes vary, then I need to communicate messaaage lengths */
1598  int self_index_to = -1; /* location of self in procs_to */
1599  for (int i = 0; i < nsends + self_msg; i++) {
1600  if (procs_to[i] != my_proc) {
1601 #ifdef HAVE_MPI // Teuchos::send not implemented for Serial - Serial is just for debugging
1602  Teuchos::send(*comm_, 1, &sizes_to[i], procs_to[i]);
1603 #endif
1604  }
1605  else {
1606  self_index_to = i;
1607  }
1608  }
1609 
1610  *total_recv_size = 0;
1611 
1612  for (int i = 0; i < nrecvs + self_msg; i++) {
1613  if (procs_from[i] != my_proc) {
1614 #ifdef HAVE_MPI // Teuchos::receive not implemented for Serial - Serial is just for debugging
1615  Teuchos::receive(*comm_, procs_from[i], 1, &sizes_from[i]);
1616 #endif
1617  }
1618  else {
1619  sizes_from[i] = sizes_to[self_index_to];
1620  }
1621  *total_recv_size += sizes_from[i];
1622  }
1623  return 0;
1624 }
1625 
1626 } // end namespace Zoltan2
Teuchos::ArrayRCP< int > indices_from_ptr
void print(const std::string &headerMessage) const
int do_reverse(int tag, const Teuchos::ArrayRCP< char > &send_data, int nbytes, const Teuchos::ArrayRCP< int > &sizes, Teuchos::ArrayRCP< char > &recv_data)
Teuchos::ArrayRCP< char > getRecvBuff() const
Teuchos::ArrayRCP< int > indices_to_ptr
#define MPI_RECV_LIMIT
Teuchos::ArrayRCP< int > starts_from_ptr
int do_forward(int tag, const Teuchos::ArrayRCP< char > &send_data, int nbytes, Teuchos::ArrayRCP< char > &recv_data)
Teuchos::ArrayRCP< Teuchos::RCP< Teuchos::CommRequest< int > > > request
int resize(const Teuchos::ArrayRCP< int > &sizes, int tag, int *sum_recv_sizes)
Teuchos::RCP< const Teuchos::Comm< int > > comm
#define PRINT_VAL(val)
#define PRINT_VECTOR(v)
Zoltan2_Directory_Comm(int nvals, const Teuchos::ArrayRCP< int > &assign, Teuchos::RCP< const Teuchos::Comm< int > > comm, int tag)
Teuchos::ArrayRCP< int > starts_to_ptr
void getInvertedValues(Zoltan2_Directory_Plan *from)