Zoltan2
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
Zoltan2_Directory_Comm.cpp
Go to the documentation of this file.
1 /*
2  * @HEADER
3  *
4  * ***********************************************************************
5  *
6  * Zoltan2 Directory for Load-balancing, Partitioning, Ordering and Coloring
7  * Copyright 2012 Sandia Corporation
8  *
9  * Under the terms of Contract DE-AC04-94AL85000 with Sandia Corporation,
10  * the U.S. Government retains certain rights in this software.
11  *
12  * Redistribution and use in source and binary forms, with or without
13  * modification, are permitted provided that the following conditions are
14  * met:
15  *
16  * 1. Redistributions of source code must retain the above copyright
17  * notice, this list of conditions and the following disclaimer.
18  *
19  * 2. Redistributions in binary form must reproduce the above copyright
20  * notice, this list of conditions and the following disclaimer in the
21  * documentation and/or other materials provided with the distribution.
22  *
23  * 3. Neither the name of the Corporation nor the names of theremove_local
24  * contributors may be used to endorse or promote products derived from
25  * this software without specific prior written permission.
26  *
27  * THIS SOFTWARE IS PROVIDED BY SANDIA CORPORATION "AS IS" AND ANY
28  * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
29  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
30  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SANDIA CORPORATION OR THE
31  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
32  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
33  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
34  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
35  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
36  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
37  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38  *
39  * Questions? Contact Karen Devine kddevin@sandia.gov
40  * Erik Boman egboman@sandia.gov
41  *
42  * ***********************************************************************
43  *
44  * @HEADER
45  */
46 
48 #include <stdexcept>
49 #include <memory>
50 
51 namespace Zoltan2 {
52 
54  total_recv_size = 0;
55  for (int i = 0; i < from->nsends + from->self_msg; i++) {
56  total_recv_size += from->lengths_to[i];
57  }
58 
59  max_send_size = 0;
60  for (int i = 0; i < from->nrecvs; i++) {
61  if (from->lengths_from[i] > max_send_size) {
62  max_send_size = from->lengths_from[i];
63  }
64  }
65 
66  nvals = from->nvals_recv;
67  nvals_recv = from->nvals;
68  lengths_to = from->lengths_from;
69  procs_to = from->procs_from;
70  indices_to = from->indices_from;
71 
72  starts_to = from->starts_from;
73  lengths_from = from->lengths_to;
74  procs_from = from->procs_to;
75  indices_from = from->indices_to;
76 
77  starts_from = from->starts_to;
78  nrecvs = from->nsends;
79  nsends = from->nrecvs;
80  self_msg = from->self_msg;
81  comm = from->comm;
82 }
83 
84 
85 void Zoltan2_Directory_Plan::print(const std::string& headerMessage) const {
86 
87  #define PRINT_VECTOR(v) \
88  if(v != Teuchos::null) { \
89  std::cout << " " << #v << " "; \
90  for(Teuchos::ArrayRCP<int>::size_type n = 0; n < v.size(); ++n) { \
91  std::cout << v[n] << " "; \
92  } \
93  std::cout << std::endl; \
94  }
95 
96  #define PRINT_VAL(val) std::cout << " " << #val << ": " << val << std::endl;
97 
98  for(int proc = 0; proc < comm->getSize(); ++proc) {
99  comm->barrier();
100  if(proc == comm->getRank()) {
101 
102  std::cout << "Rank " << proc << " " << headerMessage << std::endl;
103 
119 
128  }
129  }
130  comm->barrier();
131 }
132 
134  int nvals, /* number of values I currently own */
135  const Teuchos::ArrayRCP<int> &assign, /* processor assignment for all my values */
136  Teuchos::RCP<const Teuchos::Comm<int> > comm, /* communicator */
137  int tag) : /* message tag I can use */
138  comm_(comm),
139  plan_forward(NULL)
140 {
141  if (comm == Teuchos::null){
142  throw std::logic_error("Invalid communicator: MPI_COMM_NULL.");
143  }
144 
145  int my_proc = comm->getRank(); /* my processor tag in communicator */
146  int nprocs = comm->getSize(); /* number of processors in communicator */
147 
148  /* First check to see if items are grouped by processor with no gaps. */
149  /* If so, indices_to should be NULL (= identity) */
150 
151  /* Make data structures that will allow me to traverse arrays quickly. */
152  /* Begin by determining number of objects I'm sending to each processor. */
153  Teuchos::ArrayRCP<int> starts(new int[nprocs + 1], 0, nprocs + 1, true);
154  for(int n = 0; n < starts.size(); ++n) {
155  starts[n] = 0;
156  }
157 
158  /* Note: Negative assign value means ignore item. */
159  /* Non-trailing negatives mean data not packed so need send_buf. */
160  /* Could (but don't) allow negatives between processor blocks w/o buf. */
161  int nactive = 0; /* number of values to remap */
162 
163  int no_send_buff = 1; /* is data nicely grouped by processor? */
164 
165  int prev_proc = nprocs; /* processor on previous loop pass */
166 
167  for (int i = 0; i < nvals; i++) {
168  int proc = assign[i];
169  if (no_send_buff && proc != prev_proc) { /* Checks if blocked by proc */
170  if (proc >= 0 && (starts[proc] || prev_proc < 0)) {
171  no_send_buff = 0;
172  }
173  else {
174  prev_proc = proc;
175  }
176  }
177  if (proc >= 0) {
178  ++starts[proc];
179  ++nactive;
180  }
181  }
182 
183  int self_msg = (starts[my_proc] != 0); /* do I have data for myself? */
184 
185  Teuchos::ArrayRCP<int> lengths_to; /* lengths I'll send to */
186  Teuchos::ArrayRCP<int> procs_to; /* processors I'll send to */
187  Teuchos::ArrayRCP<int> starts_to; /* where in list my sends begin */
188  Teuchos::ArrayRCP<int> indices_to; /* local_id values I'll be sending */
189 
190  int max_send_size = 0; /* size of longest message I send */
191  int nsends = 0; /* # procs I'll send to (including self) */
192  int nrecvs = 0; /* # procs I'll recv from (including self) */
193 
194  if (no_send_buff) {
195  /* Grouped by processor. Array indices_to can be NULL (= identity) */
196  nsends = 0;
197  for (int i = 0; i < nprocs; i++) {
198  if (starts[i] != 0) ++nsends;
199  }
200 
201  lengths_to.resize(nsends);
202  starts_to.resize(nsends);
203  procs_to.resize(nsends);
204 
205  int index = 0; /* index into list of objects */
206  /* Note that procs_to is in the order the data was passed in. */
207  for (int i = 0; i < nsends; i++) {
208  starts_to[i] = index;
209  int proc = assign[index];
210  procs_to[i] = proc;
211  index += starts[proc];
212  }
213 
214  /* Now sort the outgoing procs. */
215  /* This keeps recvs deterministic if I ever invert communication */
216  /* It also allows for better balance of traffic in comm_do */
217  sort_ints(procs_to, starts_to);
218 
219  max_send_size = 0;
220  for (int i = 0; i < nsends; i++) {
221  int proc = procs_to[i];
222  lengths_to[i] = starts[proc];
223  if (proc != my_proc && lengths_to[i] > max_send_size) {
224  max_send_size = lengths_to[i];
225  }
226  }
227  }
228  else { /* Not grouped by processor. More complex data structures. */
229  /* Sum starts values to be offsets into indices_to array. */
230  nsends = (starts[0] != 0);
231  for (int i = 1; i < nprocs; i++) {
232  if (starts[i] != 0)
233  ++nsends;
234  starts[i] += starts[i - 1];
235  }
236 
237  for (int i = nprocs - 1; i; i--) {
238  starts[i] = starts[i - 1];
239  }
240 
241  starts[0] = 0;
242 
243  indices_to = (nactive > 0) ?
244  Teuchos::arcp(new int[nactive], 0, nactive, true) : Teuchos::null;
245 
246  for (int i = 0; i < nvals; i++) {
247  int proc = assign[i];
248  if (proc >= 0) {
249  indices_to[starts[proc]] = i;
250  ++starts[proc];
251  }
252  }
253 
254  /* Indices_to array now has the data in clumps for each processor. */
255  /* Now reconstruct starts array to index into indices_to. */
256  for (int i = nprocs - 1; i; i--) {
257  starts[i] = starts[i - 1];
258  }
259  starts[0] = 0;
260  starts[nprocs] = nactive;
261 
262  /* Construct lengths_to, starts_to and procs_to arrays. */
263  /* Note: If indices_to is needed, procs are in increasing order */
264  lengths_to.resize(nsends);
265  starts_to.resize(nsends);
266  procs_to.resize(nsends);
267 
268  int j = 0;
269  max_send_size = 0;
270  for (int i = 0; i < nprocs; i++) {
271  if (starts[i + 1] != starts[i]) {
272  starts_to[j] = starts[i];
273  lengths_to[j] = starts[i + 1] - starts[i];
274  if (i != my_proc && lengths_to[j] > max_send_size) {
275  max_send_size = lengths_to[j];
276  }
277  procs_to[j] = i;
278  j++;
279  }
280  }
281  }
282 
283  /* Now change nsends to count only non-self messages */
284  nsends -= self_msg;
285 
286  /* Determine how many messages & what length I'll receive. */
287  Teuchos::ArrayRCP<int> lengths_from; /* lengths of messages I'll receive */
288  Teuchos::ArrayRCP<int> procs_from; /* processors I'll receive from */
289  int out_of_mem = 0; // TODO refactor this bit
290 
291  int comm_flag = invert_map(lengths_to, procs_to, nsends, self_msg,
292  lengths_from, procs_from, &nrecvs, my_proc, nprocs,
293  out_of_mem, tag, comm);
294 
295  /* pointers for where to put recv data */
296  Teuchos::ArrayRCP<int> starts_from(
297  new int[nrecvs + self_msg], 0, nrecvs + self_msg, true);
298  int j = 0;
299  for (int i = 0; i < nrecvs + self_msg; i++) {
300  starts_from[i] = j;
301  j += lengths_from[i];
302  }
303 
304  if (comm_flag != 0) {
305  throw std::logic_error("Failed to construct Zoltan2_Directory_Comm");
306  }
307 
308  int total_recv_size = 0; /* total size of messages I recv */
309  for (int i = 0; i < nrecvs + self_msg; i++) {
310  total_recv_size += lengths_from[i];
311  }
312 
313  plan_forward = new Zoltan2_Directory_Plan;
314  plan_forward->lengths_to = lengths_to;
315  plan_forward->starts_to = starts_to;
316  plan_forward->procs_to = procs_to;
317  plan_forward->indices_to = indices_to;
318  plan_forward->lengths_from = lengths_from;
319  plan_forward->starts_from = starts_from;
320  plan_forward->procs_from = procs_from;
321  plan_forward->nvals = nvals;
322  plan_forward->nvals_recv = total_recv_size;
323  plan_forward->nrecvs = nrecvs;
324  plan_forward->nsends = nsends;
325  plan_forward->self_msg = self_msg;
326  plan_forward->max_send_size = max_send_size;
327  plan_forward->total_recv_size = total_recv_size;
328  plan_forward->maxed_recvs = 0;
329  plan_forward->comm = comm;
330 
331  if (MPI_RECV_LIMIT > 0) {
332 
333  throw std::logic_error("UNTESTED COMM 2"); // needs unit testing
334 
335  /* If we have a limit to the number of posted receives we are allowed,
336  ** and our plan has exceeded that, then switch to an MPI_Alltoallv so
337  ** that we will have fewer receives posted when we do the communication.
338  */
339  int global_nrecvs;
340  Teuchos::reduceAll(*comm, Teuchos::REDUCE_MAX, 1, &nrecvs, &global_nrecvs);
341  if (global_nrecvs > MPI_RECV_LIMIT){
342  plan_forward->maxed_recvs = 1;
343  }
344  }
345 
346  if (plan_forward->maxed_recvs == 0) {
347  // See notes in header for MPI_Request
348  plan_forward->request.resize(plan_forward->nrecvs);
349  plan_forward->status.resize(plan_forward->nrecvs);
350  // plan_forward->request = Teuchos::arcp(new MPI_Request[plan_forward->nrecvs], 0, plan_forward->nrecvs, true);
351  // plan_forward->status = Teuchos::arcp(new MPI_Status[plan_forward->nrecvs], 0, plan_forward->nrecvs, true);
352  }
353 
354  nrec = total_recv_size;
355 }
356 
358 {
359  delete plan_forward;
360 }
361 
362 int Zoltan2_Directory_Comm::invert_map(
363  const Teuchos::ArrayRCP<int> &lengths_to, /* number of items I'm sending */
364  const Teuchos::ArrayRCP<int> &procs_to, /* procs I send to */
365  int nsends, /* number of messages I'll send */
366  int self_msg, /* do I copy data to myself? */
367  Teuchos::ArrayRCP<int> &lengths_from, /* number of items I'm receiving */
368  Teuchos::ArrayRCP<int> &procs_from, /* procs I recv lengths from */
369  int *pnrecvs, /* number of messages I receive */
370  int my_proc, /* my processor number */
371  int nprocs, /* total number of processors */
372  int /* out_of_mem */, /* tell everyone I'm out of memory? */
373  int tag, /* message tag I can use */
374  Teuchos::RCP<const Teuchos::Comm<int> > comm) /* communicator */
375 {
376  Teuchos::ArrayRCP<int> msg_count(new int[nprocs], 0, nprocs, true);
377  Teuchos::ArrayRCP<int> counts(new int[nprocs], 0, nprocs, true);
378  for(int i = 0; i < nprocs; ++i) {
379  msg_count[i] = 0;
380  counts[i] = 1;
381  }
382 
383  for (int i = 0; i < nsends + self_msg; i++) {
384  msg_count[procs_to[i]] = 1;
385  }
386 
387  /*
388  * KDDKDD: Replaced MPI_Reduce_scatter with MPI_Reduce and MPI_Scatter
389  * KDDKDD: to avoid reported problems with MPICH 1.5.2.1.
390  * KDDKDD: Some sort of MPI_TYPE_INDEXED error.
391  * KDDKDD: Bug fix suggested by Clark Dohrmann and Rob Hoekstra.
392  * KDDKDD: July 20, 2004
393 
394  MPI_Reduce_scatter((void *) msg_count, (void *) &nrecvs, counts, MPI_INT,
395  MPI_SUM, comm);
396  */
397  Teuchos::reduceAll<int>(*comm, Teuchos::REDUCE_SUM, nprocs,
398  msg_count.getRawPtr(), counts.getRawPtr());
399 
400  int nrecvs = 0; /* number of messages I'll receive */
401 
402  // TODO: Teuchos::scatterImpl(...)
403  MPI_Scatter(&(counts[0]), 1, MPI_INT, &nrecvs, 1, MPI_INT, 0, getRawComm());
404 
405  int max_nrecvs = 0;
406  if (my_proc == 0) {
407  for (int i=0; i < nprocs; i++) {
408  if (counts[i] > max_nrecvs) {
409  max_nrecvs = counts[i];
410  }
411  }
412  }
413 
414  // TODO: Teuchos::MpiComm<Ordinal>::broadcast(...)
415 
416  MPI_Bcast(&max_nrecvs, 1, MPI_INT, 0, getRawComm());
417 
418  if(nrecvs > 0) {
419  lengths_from.resize(nrecvs); /* number of items I'm receiving */
420  procs_from.resize(nrecvs); /* processors I'll receive from */
421  }
422 
423  if (MPI_RECV_LIMIT == 0 || max_nrecvs <= MPI_RECV_LIMIT) {
424  // See notes in header for MPI_Request
425  std::vector<MPI_Request> req(nrecvs);
426  //Teuchos::ArrayRCP<MPI_Request> req(new MPI_Request[nrecvs], 0, nrecvs, true);
427 
428  /* Note: I'm counting on having a unique tag or some of my incoming */
429  /* messages might get confused with others. */
430 
431  // TODO: Teuchos::ireceiveImpl(...)
432  for (int i=0; i < nrecvs; i++) {
433  MPI_Irecv(&(lengths_from[0]) + i, 1, MPI_INT, MPI_ANY_SOURCE,
434  tag, getRawComm(), &(req[i]));
435  }
436 
437  // TODO: Teuchos::sendImpl(...)
438  for (int i=0; i < nsends+self_msg; i++) {
439  // MPI_Send takes non-const for buf (1st param)
440  // Apparently in future versions this will become true const
441  // Then the const_cast could be removed
442  MPI_Send(const_cast<int*>(&lengths_to[i]), 1, MPI_INT, procs_to[i], tag,
443  getRawComm());
444  }
445 
446  for (int i=0; i < nrecvs; i++) {
447  MPI_Status status;
448  MPI_Wait(&(req[i]), &status);
449  procs_from[i] = status.MPI_SOURCE;
450  }
451 
452  }
453  else { /* some large HPC machines have a limit on number of posted receives */
454  throw std::logic_error("UNTESTED COMM 3"); // needs unit testing
455 
456  Teuchos::ArrayRCP<int> sendbuf(new int[nprocs], 0, nprocs, true);
457  Teuchos::ArrayRCP<int> recvbuf(new int[nprocs], 0, nprocs, true);
458 
459  for (int i=0; i < nsends + self_msg; i++) {
460  sendbuf[procs_to[i]] = lengths_to[i];
461  }
462 
463  // TODO: Teuchos
464  MPI_Alltoall(&(sendbuf[0]), 1, MPI_INT, &(recvbuf[0]), 1, MPI_INT,
465  getRawComm());
466 
467  for (int i=0, j=0; i < nprocs; i++) {
468  if (recvbuf[i] > 0){
469  lengths_from[j] = recvbuf[i];
470  procs_from[j] = i;
471  if (++j == nrecvs) {
472  break;
473  }
474  }
475  }
476  }
477 
478  /* Sort recv lists to keep execution deterministic (e.g. for debugging) */
479  sort_ints(procs_from, lengths_from);
480 
481  *pnrecvs = nrecvs - self_msg; /* Only return number of true messages */
482 
483  return 0;
484 }
485 
486 int Zoltan2_Directory_Comm::sort_ints(
487  Teuchos::ArrayRCP<int> &vals_sort, /* values to be sorted */
488  Teuchos::ArrayRCP<int> &vals_other) /* other array to be reordered w/ sort */
489 {
490  // TODO: Check - perhaps we can skip all of these for efficiency
491  if (vals_sort == Teuchos::null || vals_sort.size() == 0) {
492  return 1;
493  }
494  if (vals_other == Teuchos::null || vals_other.size() == 0) {
495  return 1;
496  }
497  if (vals_sort == Teuchos::null || vals_sort.size() == 1) {
498  return 0; /* fastest way to sort 1 item is to return */
499  }
500 
501  /* find largest value (sort sometimes used for non processor lists) */
502  int already_sorted = 1; /* flag indicating whether vals_sort is
503  already sorted; can exit early and skip
504  memory allocations if it is. */
505  int top = vals_sort[0]; /* largest integer to sort, smallest is assumed 0 */
506  for (Teuchos::ArrayRCP<int>::size_type i = 1; i < vals_sort.size(); i++) {
507  if (vals_sort[i-1] > vals_sort[i]) {
508  already_sorted = 0;
509  }
510  if (top < vals_sort[i]) {
511  top = vals_sort[i];
512  }
513  }
514 
515  if (already_sorted) {
516  return 0;
517  }
518 
519  Teuchos::ArrayRCP<int> store(new int[top+2], 0, top+2, true);
520  for(int n = 0; n < store.size(); ++n) {
521  store[n] = 0;
522  }
523 
524  Teuchos::ArrayRCP<int> copy_sort(new int[vals_sort.size()], 0, vals_sort.size(), true);
525  for(Teuchos::ArrayRCP<int>::size_type n = 0; n < copy_sort.size(); ++n) {
526  copy_sort[n] = vals_sort[n]; // TODO - use deepCopy method?
527  }
528 
529  Teuchos::ArrayRCP<int> copy_other(new int[vals_other.size()], 0, vals_other.size(), true);
530  for(Teuchos::ArrayRCP<int>::size_type n = 0; n < copy_other.size(); ++n) {
531  copy_other[n] = vals_other[n]; // TODO - use deepCopy method?
532  }
533 
534  // TODO: May want to modernize this ptr handling - however I didn't want
535  // to introduce inefficiencies so for now have kept the original structure
536  int *p = &(store[1]);
537  for (Teuchos::ArrayRCP<int>::size_type i = 0; i < vals_sort.size(); i++) {
538  p[copy_sort[i]]++; /* count number of occurances */
539  }
540 
541  for (int i = 1; i < top+1; i++) {
542  p[i] += p[i-1]; /* compute partial sums */
543  }
544  /* assert: p[top] = nvals */
545  p = &(store[0]); /* effectively shifts down by one */
546  for (Teuchos::ArrayRCP<int>::size_type i = 0; i < vals_sort.size(); i++) {
547  vals_sort[p[copy_sort[i]]] = copy_sort[i];
548  vals_other[p[copy_sort[i]]] = copy_other[i];
549  ++p[copy_sort[i]];
550  }
551 
552  return 0;
553 }
554 
556  int tag, /* message tag for communicating */
557  const Teuchos::ArrayRCP<char> &send_data, /* array of data I currently own */
558  int nbytes, /* msg size */
559  Teuchos::ArrayRCP<char> &recv_data) /* array of data I'll own after comm */
560 {
561  int status = 0;
562 
563  if (!plan_forward->maxed_recvs) {
564  status = do_post (plan_forward, tag, send_data, nbytes, recv_data);
565  if (status == 0) {
566  status = do_wait (plan_forward, tag, send_data, nbytes, recv_data);
567  }
568  }
569  else {
570  status = do_all_to_all(plan_forward, send_data, nbytes, recv_data);
571  }
572 
573  return status;
574 }
575 
576 int Zoltan2_Directory_Comm::do_post(
577  Zoltan2_Directory_Plan *plan, /* communication data structure */
578  int tag, /* message tag for communicating */
579  const Teuchos::ArrayRCP<char> &send_data, /* array of data I currently own */
580  int nbytes, /* msg size */
581  Teuchos::ArrayRCP<char> &recv_data) /* array of data I'll own after comm */
582 {
583  /* Check input parameters */
584  if (!plan) {
585  throw std::logic_error("Communication plan = NULL");
586  }
587 
588  /* If not point to point, currently we do synchroneous communications */
589  if (plan->maxed_recvs) {
590  throw std::logic_error("UNTESTED COMM 4"); // needs unit testing
591  return do_all_to_all(plan, send_data, nbytes, recv_data);
592  }
593 
594  int my_proc = plan->comm->getRank(); /* processor ID */
595  if ((plan->nsends + plan->self_msg) && !send_data.size()) {
596  throw std::logic_error("UNTESTED COMM 5"); // needs unit testing
597  size_t sum = 0;
598  if (plan->sizes_to.size()) { /* Not an error if all sizes_to == 0 */
599  for (int i = 0; i < (plan->nsends + plan->self_msg); i++) {
600  sum += plan->sizes_to[i];
601  }
602  }
603  if (!plan->sizes_to.size() || (plan->sizes_to.size() && sum)) {
604  throw std::logic_error("nsends not zero, but send_data = NULL");
605  }
606  }
607  if ((plan->nrecvs + plan->self_msg) && recv_data == Teuchos::null) {
608  throw std::logic_error("UNTESTED COMM 6"); // needs unit testing
609  size_t sum = 0;
610  if (plan->sizes_from != Teuchos::null) /* Not an error if all sizes_from == 0 */
611  for (int i = 0; i < (plan->nrecvs + plan->self_msg); i++)
612  sum += plan->sizes_from[i];
613  if (!plan->sizes_from.size() || (plan->sizes_from.size() && sum)) {
614  throw std::logic_error("nrecvs not zero, but recv_data = NULL");
615  }
616  }
617 
618  /* Post irecvs */
619  if (plan->indices_from == Teuchos::null) {
620  /* Data can go directly into user space. */
621  plan->recv_buff = recv_data;
622  }
623  else { /* Need to buffer receive to reorder */
624  size_t rsize = (size_t) (plan->total_recv_size) * (size_t) nbytes;
625  plan->recv_buff = Teuchos::arcp(new char[rsize], 0, rsize, true);
626  }
627 
628  size_t self_recv_address = 0; /* where in recv_data self info starts */
629  if (!plan->using_sizes) { /* All data the same size */
630  int k = 0;
631  for (int i = 0; i < plan->nrecvs + plan->self_msg; i++) {
632  if (plan->procs_from[i] != my_proc) {
633  // TODO: Teuchos::ireceiveImpl(...)
634  MPI_Irecv((void *)
635  &plan->getRecvBuff().getRawPtr()[(size_t)(plan->starts_from[i])*(size_t)nbytes],
636  plan->lengths_from[i] * nbytes,
637  (MPI_Datatype) MPI_BYTE, plan->procs_from[i], tag,
638  getRawComm(), &plan->request[k]);
639  k++;
640  }
641  else {
642  self_recv_address = (size_t)(plan->starts_from[i]) * (size_t)nbytes;
643  }
644  }
645  }
646  else { /* Data of varying sizes */
647  int k = 0;
648  for (int i = 0; i < plan->nrecvs + plan->self_msg; i++) {
649  if (plan->procs_from[i] != my_proc) {
650  if (plan->sizes_from[i]) {
651  // TODO: Teuchos::ireceiveImpl(...)
652  MPI_Irecv((void *)
653  &plan->getRecvBuff().getRawPtr()[(size_t)(plan->starts_from_ptr[i])
654  * (size_t)nbytes],
655  plan->sizes_from[i] * nbytes,
656  (MPI_Datatype) MPI_BYTE, plan->procs_from[i],
657  tag, getRawComm(), &plan->request[k]);
658  }
659  else
660  plan->request[k] = MPI_REQUEST_NULL;
661  k++;
662  }
663  else {
664  self_recv_address =
665  (size_t)(plan->starts_from_ptr[i]) * (size_t)nbytes;
666  }
667  }
668  }
669 
670  Teuchos::ArrayRCP<char> send_buff;
671  if(plan->indices_to != Teuchos::null) {
672  send_buff = Teuchos::arcp(new char[plan->max_send_size * nbytes], 0, plan->max_send_size * nbytes, true);
673  }
674 
675  /* Barrier to ensure irecvs are posted before doing any sends. */
676  /* Simultaneously see if anyone out of memory */
677  int out_of_mem = 0;
678  // WARNING - do not delete this without proper barrier added as replacmeent.
679  // I'm refactoring memory handling so probably we won't use out_of_mem
680  // in the new version but we must still preserve a barrier here or get
681  // intermittent failures.
682  // I'll keep the memory reduce for now since we may end up with a memory
683  // handling anyways.
684  int global_out_of_mem;
685  Teuchos::reduceAll(*plan->comm, Teuchos::REDUCE_SUM, 1, &out_of_mem,
686  &global_out_of_mem);
687 
688  /* Send out data */
689 
690  /* Scan through procs_to list to start w/ higher numbered procs */
691  /* This should balance message traffic. */
692 
693  int nblocks = plan->nsends + plan->self_msg; /* # procs needing my data */
694  int proc_index = 0; /* loop counter over procs to send to */
695  while (proc_index < nblocks && plan->procs_to[proc_index] < my_proc) {
696  proc_index++;
697  }
698  if (proc_index == nblocks) {
699  proc_index = 0;
700  }
701 
702  if (!plan->using_sizes) { /* Data all of same size */
703  if (plan->indices_to == Teuchos::null) { /* data already blocked by processor. */
704  int self_num = 0; /* where in send list my_proc appears */
705  for (int i = proc_index, j = 0; j < nblocks; j++) {
706  if (plan->procs_to[i] != my_proc) {
707  // TODO: Teuchos::readySend(...)
708  MPI_Rsend(
709  (void *) &send_data[(size_t)(plan->starts_to[i])*(size_t)nbytes],
710  plan->lengths_to[i] * nbytes, (MPI_Datatype) MPI_BYTE,
711  plan->procs_to[i], tag, getRawComm());
712  }
713  else {
714  self_num = i;
715  }
716  if (++i == nblocks) {
717  i = 0;
718  }
719  }
720 
721  if (plan->self_msg) { /* Copy data to self. */
722  /* I use array+offset instead of &(array[offset]) because of
723  a bug with PGI v9 */
724  /* I use memmove because I'm not sure that the pointer are not
725  overlapped. */
726 
727  // TODO: Refactor to make C++ and remove getRawPtr, etc
728  memmove(
729  plan->getRecvBuff().getRawPtr()+self_recv_address,
730  send_data.getRawPtr()+(size_t)(plan->starts_to[self_num])*(size_t)nbytes,
731  (size_t) (plan->lengths_to[self_num]) * (size_t) nbytes);
732  }
733  }
734  else { /* Not blocked by processor. Need to buffer. */
735  int self_index = 0; /* send offset for data I'm keeping */
736  int self_num = 0; /* where in send list my_proc appears */
737  for (int i = proc_index, jj = 0; jj < nblocks; jj++) {
738  if (plan->procs_to[i] != my_proc) {
739  /* Need to pack message first. */
740  size_t offset = 0; /* offset into array I'm copying into */
741  int j = plan->starts_to[i];
742  for (int k = 0; k < plan->lengths_to[i]; k++) {
743  memcpy(&send_buff[offset],
744  &send_data[(size_t)(plan->indices_to[j++]) * (size_t)nbytes], nbytes);
745  offset += nbytes;
746  }
747  // TODO: Teuchos::readySend(...)
748  MPI_Rsend((void *) &(send_buff[0]), plan->lengths_to[i] * nbytes,
749  (MPI_Datatype) MPI_BYTE, plan->procs_to[i], tag,
750  getRawComm());
751  }
752  else {
753  self_num = i;
754  self_index = plan->starts_to[i];
755  }
756  if (++i == nblocks)
757  i = 0;
758  }
759 
760  if (plan->self_msg) { /* Copy data to self. */
761  for (int k = 0; k < plan->lengths_to[self_num]; k++) {
762  memcpy(&(plan->getRecvBuff())[self_recv_address],
763  &send_data[
764  (size_t)(plan->indices_to[self_index++]) * (size_t)nbytes],
765  nbytes);
766  self_recv_address += nbytes;
767  }
768  }
769  }
770  }
771  else { /* Data of differing sizes */
772  if (plan->indices_to == Teuchos::null) { /* data already blocked by processor. */
773  int self_num = 0; /* where in send list my_proc appears */
774  for (int i = proc_index, j = 0; j < nblocks; j++) {
775  if (plan->procs_to[i] != my_proc) {
776  if (plan->sizes_to[i]) {
777  // TODO: Teuchos::readySend(...)
778  MPI_Rsend((void *) &send_data[(size_t)(
779  plan->starts_to_ptr[i]) * (size_t)nbytes],
780  plan->sizes_to[i] * nbytes,
781  (MPI_Datatype) MPI_BYTE, plan->procs_to[i],
782  tag, getRawComm());
783  }
784  }
785  else
786  self_num = i;
787 
788  if (++i == nblocks)
789  i = 0;
790  }
791  if (plan->self_msg) { /* Copy data to self. */
792  if (plan->sizes_to[self_num]) {
793  char* lrecv = &plan->getRecvBuff().getRawPtr()[self_recv_address];
794  const char* lsend =
795  &send_data.getRawPtr()[(size_t)(plan->starts_to_ptr[self_num]) * (size_t)nbytes];
796  int sindex = plan->sizes_to[self_num], idx;
797  for (idx=0; idx<nbytes; idx++) {
798  memcpy(lrecv, lsend, sindex);
799  lrecv += sindex;
800  lsend += sindex;
801  }
802  }
803  }
804  }
805  else { /* Not blocked by processor. Need to buffer. */
806  int self_num = 0; /* where in send list my_proc appears */
807  for (int i = proc_index, jj = 0; jj < nblocks; jj++) {
808  if (plan->procs_to[i] != my_proc) {
809  size_t offset = 0; /* offset into array I'm copying into */
810  int j = plan->starts_to[i];
811  for (int k = 0; k < plan->lengths_to[i]; k++) {
812  if (plan->sizes[plan->indices_to[j]]) {
813  memcpy(&send_buff[offset],
814  &send_data[(size_t)(plan->indices_to_ptr[j]) * (size_t)nbytes],
815  (size_t)(plan->sizes[plan->indices_to[j]]) * (size_t)nbytes);
816  offset +=
817  (size_t)(plan->sizes[plan->indices_to[j]]) * (size_t)nbytes;
818  }
819  j++;
820  }
821  if (plan->sizes_to[i]) {
822  // TODO: Teuchos::readySend(...)
823  MPI_Rsend((void *) &(send_buff[0]), plan->sizes_to[i] * nbytes,
824  (MPI_Datatype) MPI_BYTE, plan->procs_to[i], tag, getRawComm());
825  }
826  }
827  else
828  self_num = i;
829 
830  if (++i == nblocks)
831  i = 0;
832  }
833  if (plan->self_msg) { /* Copy data to self. */
834  if (plan->sizes_to[self_num]) {
835  int j = plan->starts_to[self_num];
836  for (int k = 0; k < plan->lengths_to[self_num]; k++) {
837  int kk = plan->indices_to_ptr[j];
838  char* lrecv = &(plan->getRecvBuff())[self_recv_address];
839  size_t send_idx = (size_t)kk * (size_t)nbytes;
840  const char* lsend = &send_data[send_idx];
841  int sindex = plan->sizes[plan->indices_to[j]], idx;
842  for (idx=0; idx<nbytes; idx++) {
843  memcpy(lrecv, lsend, sindex);
844  lrecv += sindex;
845  lsend += sindex;
846  }
847  self_recv_address += (size_t)(plan->sizes[plan->indices_to[j]])
848  * (size_t) nbytes;
849  j++;
850  }
851  }
852  }
853  }
854  }
855 
856  return 0;
857 }
858 
859 int Zoltan2_Directory_Comm::do_wait(
860  Zoltan2_Directory_Plan *plan, /* communication data structure */
861  int /* tag */, /* message tag for communicating */
862  const Teuchos::ArrayRCP<char> &/* send_data */, /* array of data I currently own */
863  int nbytes, /* msg size */
864  Teuchos::ArrayRCP<char> &recv_data) /* array of data I'll own after comm */
865 {
866  /* If not point to point, currently we do synchroneous communications */
867  if (plan->maxed_recvs){
868  /* Do nothing */
869  return 0;
870  }
871 
872  int my_proc = plan->comm->getRank(); /* processor ID */
873 
874  /* Wait for messages to arrive & unpack them if necessary. */
875  /* Note: since request is in plan, could wait in later routine. */
876  if (plan->indices_from == Teuchos::null) { /* No copying required */
877  if (plan->nrecvs > 0) {
878  // TODO: Teuchos::waitAllImpl(...)
879  MPI_Waitall(plan->nrecvs, &plan->request[0], &plan->status[0]);
880  //MPI_Waitall(plan->nrecvs, plan->request.getRawPtr(), plan->status.getRawPtr());
881  }
882  }
883  else { /* Need to copy into recv_data. */
884  int self_num; /* where in send list my_proc appears */
885  size_t offsetDst = 0;
886  if (plan->self_msg) { /* Unpack own data before waiting */
887  for (self_num = 0; self_num < plan->nrecvs + plan->self_msg;
888  self_num++) {
889  if (plan->procs_from[self_num] == my_proc) {
890  break;
891  }
892  }
893 
894  if(plan->sizes_from.size()) {
895  // NEW METHOD for variable sized data
896  // This will NOT put the data in order but during the directory read
897  // of the buffer which follows, this data gets sorted anyways since each
898  // element has an index to tell where it belongs. I am not sure yet if
899  // we must sort here, or if it's fine to allow the sort to happen through
900  // the index value. This needs furthe discussion.
901  memcpy(&recv_data[offsetDst * (size_t)nbytes],
902  &(plan->getRecvBuff())[plan->starts_from_ptr[self_num] * (size_t)nbytes],
903  plan->sizes_from[self_num] * (size_t)nbytes);
904  offsetDst += plan->sizes_from[self_num];
905  }
906  else {
907  int k = plan->starts_from[self_num];
908  for (int j = plan->lengths_from[self_num]; j; j--) {
909  memcpy(&recv_data[(size_t)(plan->indices_from[k]) * (size_t)nbytes],
910  &(plan->getRecvBuff())[(size_t)k * (size_t)nbytes], nbytes);
911  k++;
912  }
913  }
914  }
915  else {
916  self_num = plan->nrecvs;
917  }
918 
919  for (int jj = 0; jj < plan->nrecvs; jj++) {
920  MPI_Status status; /* return from Waitany */
921  int index;
922  MPI_Waitany(plan->nrecvs, &plan->request[0], &index, &status);
923  //MPI_Waitany(plan->nrecvs, plan->request.getRawPtr(), &index, &status);
924 
925  if (index == MPI_UNDEFINED) {
926  break; /* No more receives */
927  }
928 
929  if (index >= self_num) {
930  index++;
931  }
932 
933  if(plan->sizes_from.size()) {
934  // NEW METHOD for variable sized data
935  // This will NOT put the data in order but during the directory read
936  // of the buffer which follows, this data gets sorted anyways since each
937  // element has an index to tell where it belongs. I am not sure yet if
938  // we must sort here, or if it's fine to allow the sort to happen through
939  // the index value. This needs furthe discussion.
940  memcpy(&recv_data[offsetDst * (size_t)nbytes],
941  &plan->getRecvBuff().getRawPtr()[plan->starts_from_ptr[index] * (size_t)nbytes],
942  plan->sizes_from[index] * (size_t)nbytes);
943  offsetDst += plan->sizes_from[index];
944  }
945  else {
946  int k = plan->starts_from[index];
947  for (int j = plan->lengths_from[index]; j; j--) {
948  memcpy(&recv_data.getRawPtr()[(size_t)(plan->indices_from[k]) * (size_t)nbytes],
949  &plan->getRecvBuff().getRawPtr()[(size_t)k * (size_t)nbytes], nbytes);
950  k++;
951  }
952  }
953  }
954  }
955 
956  return 0;
957 }
958 
959 /* Do_Post would require posting more receives than allowed on this platform.
960 * We use MPI_AlltoAllv instead, which is probably implemented such that each
961 * process does one receive at a time.
962 */
963 
964 int Zoltan2_Directory_Comm::do_all_to_all(
965  Zoltan2_Directory_Plan *plan, /* communication data structure */
966  const Teuchos::ArrayRCP<char> &send_data, /* array of data I currently own */
967  int nbytes, /* msg size */
968  Teuchos::ArrayRCP<char> &recv_data) /* array of data I'll own after comm */
969 {
970  throw std::logic_error("UNTESTED COMM 10"); // needs unit testing
971 
972  int sm = (plan->self_msg > 0) ? 1 : 0;
973 
974  int nSendMsgs = plan->nsends + sm;
975  int nRecvMsgs = plan->nrecvs + sm;
976 
977  int nSendItems = 0;
978  for (int i=0; i <nSendMsgs; i++) {
979  nSendItems += plan->lengths_to[i];
980  }
981  int nRecvItems = 0;
982  for (int i=0; i <nRecvMsgs; i++) {
983  nRecvItems += plan->lengths_from[i];
984  }
985 
986  int nprocs = plan->comm->getSize();
987 
988  Teuchos::ArrayRCP<int> outbufCounts(new int[nprocs], 0, nprocs, true);
989  Teuchos::ArrayRCP<int> outbufOffsets(new int[nprocs], 0, nprocs, true);
990  Teuchos::ArrayRCP<int> inbufCounts(new int[nprocs], 0, nprocs, true);
991  Teuchos::ArrayRCP<int> inbufOffsets(new int[nprocs], 0, nprocs, true);
992 
993  /* The *_to fields of the plan refer to the items in the send_data buffer,
994  * and how to pull out the correct items for each receiver. The
995  * *_from fields of the plan refer to the recv_data buffer. Items
996  * arrive in process rank order, and these fields tell us where to
997  * put them in the recv_data buffer.
998  */
999 
1000  /* CREATE SEND BUFFER */
1001 
1002  int sorted = 0;
1003  if (plan->indices_to == Teuchos::null){
1004  sorted = 1;
1005  for (int i=1; i< nSendMsgs; i++){
1006  if (plan->starts_to[i] < plan->starts_to[i-1]){
1007  sorted = 0;
1008  break;
1009  }
1010  }
1011  }
1012 
1013  Teuchos::ArrayRCP<char> outbuf;
1014  Teuchos::ArrayRCP<char> inbuf;
1015  Teuchos::ArrayRCP<char> buf;
1016 
1017  if (plan->sizes_to.size()){
1018  /*
1019  * Each message contains items for a process, and each item may be
1020  * a different size.
1021  */
1022 
1023  int outbufLen = 0;
1024  for (int i = 0; i < nSendMsgs; i++){
1025  outbufLen += plan->sizes_to[i];
1026  }
1027 
1028  if (plan->indices_to != Teuchos::null) {
1029  /*
1030  * items are not grouped by message
1031  */
1032  buf.resize(outbufLen*nbytes);
1033  outbuf.resize(outbufLen*nbytes);
1034  char * pBufPtr = &(outbuf[0]);
1035  int i = 0;
1036  int k = 0;
1037  for (int p = 0; p < nprocs; p++) {
1038 
1039  int length = 0;
1040 
1041  if (i < nSendMsgs){
1042  if (plan->procs_to[i] == p){ /* procs_to is sorted */
1043 
1044  for (int j=0; j < plan->lengths_to[i]; j++,k++){
1045  int itemSize = plan->sizes[plan->indices_to[k]] * nbytes;
1046  int offset = plan->indices_to_ptr[k] * nbytes;
1047 
1048  memcpy(pBufPtr, &(send_data[0]) + offset, itemSize);
1049 
1050  pBufPtr += itemSize;
1051  length += itemSize;
1052  }
1053  i++;
1054  }
1055  }
1056 
1057  outbufCounts[p] = length;
1058  if (p){
1059  outbufOffsets[p] = outbufOffsets[p-1] + outbufCounts[p-1];
1060  }
1061  }
1062  }
1063  else{
1064  /*
1065  * items are stored contiguously for each message
1066  */
1067 
1068  if (!sorted || (plan->nvals > nSendItems) ){
1069  buf.resize(outbufLen*nbytes);
1070  outbuf.resize(outbufLen*nbytes);
1071  }
1072  else{
1073  /* All items in send_data are being sent, and they are sorted
1074  * in process rank order.
1075  */
1076  // TODO: Optimize - original just set the ptr...
1077  for(int n = 0; n < outbufLen*nbytes; ++n) {
1078  outbuf[n] = send_data[n];
1079  }
1080  }
1081 
1082  char * pBufPtr = &(outbuf[0]);
1083 
1084  int i = 0;
1085  for (int p = 0; p < nprocs; p++) {
1086 
1087  int length = 0;
1088 
1089  if (i < nSendMsgs){
1090  if (plan->procs_to[i] == p){ /* procs_to is sorted */
1091  length = plan->sizes_to[i] * nbytes;
1092  int offset = plan->starts_to_ptr[i] * nbytes;
1093 
1094  if ((!sorted || (plan->nvals > nSendItems)) && length){
1095  memcpy(pBufPtr, &(send_data[0]) + offset, length);
1096  pBufPtr += length;
1097  }
1098  i++;
1099  }
1100  }
1101 
1102  outbufCounts[p] = length;
1103  if (p){
1104  outbufOffsets[p] = outbufOffsets[p-1] + outbufCounts[p-1];
1105  }
1106  }
1107  }
1108  }
1109  else if (plan->indices_to != Teuchos::null) {
1110  /*
1111  * item sizes are constant, however the items belonging in a given
1112  * message may not be contiguous in send_data
1113  */
1114  buf.resize(nSendItems*nbytes);
1115  outbuf.resize(nSendItems*nbytes);
1116  char * pBufPtr = &(outbuf[0]);
1117  int i = 0;
1118  int k = 0;
1119  for (int p = 0; p < nprocs; p++){
1120 
1121  int length = 0;
1122 
1123  if (i < nSendMsgs){
1124  if (plan->procs_to[i] == p){ /* procs_to is sorted */
1125  for (int j=0; j < plan->lengths_to[i]; j++,k++) {
1126  int offset = plan->indices_to[k] * nbytes;
1127  memcpy(pBufPtr, &(send_data[0]) + offset, nbytes);
1128  pBufPtr += nbytes;
1129  }
1130  length = plan->lengths_to[i] * nbytes;
1131  i++;
1132  }
1133  }
1134 
1135  outbufCounts[p] = length;
1136  if (p){
1137  outbufOffsets[p] = outbufOffsets[p-1] + outbufCounts[p-1];
1138  }
1139  }
1140  }
1141  else{
1142 
1143  /* item sizes are constant, and items belonging to a
1144  * given message are always stored contiguously in send_data
1145  */
1146 
1147  if (!sorted || (plan->nvals > nSendItems)){
1148  buf.resize(nSendItems*nbytes);
1149  outbuf.resize(nSendItems*nbytes);
1150  }
1151  else{
1152  /* send_data is sorted by process, and we don't skip
1153  * any of the data in the buffer, so we can use send_data
1154  * in the alltoall call
1155  */
1156  // TODO: Optimize - original just set ptr
1157  outbuf = send_data;
1158  }
1159 
1160  char * pBufPtr = &(outbuf[0]);
1161 
1162  int i = 0;
1163  for (int p=0; p < nprocs; p++) {
1164 
1165  int length = 0;
1166 
1167  if (i < nSendMsgs){
1168  if (plan->procs_to[i] == p){ /* procs_to is sorted */
1169  int offset = plan->starts_to[i] * nbytes;
1170  length = plan->lengths_to[i] * nbytes;
1171 
1172  if ((!sorted || (plan->nvals > nSendItems)) && length){
1173  memcpy(pBufPtr, &(send_data[0]) + offset, length);
1174  pBufPtr += length;
1175  }
1176  i++;
1177  }
1178  }
1179 
1180  outbufCounts[p] = length;
1181  if (p){
1182  outbufOffsets[p] = outbufOffsets[p-1] + outbufCounts[p-1];
1183  }
1184  }
1185  }
1186 
1187  /* CREATE RECEIVE BUFFER */
1188 
1189  sorted = 0;
1190  int i;
1191  if (plan->indices_from == Teuchos::null) {
1192  sorted = 1;
1193  for (i=1; i< nRecvMsgs; i++) {
1194  if (plan->starts_from[i] < plan->starts_from[i-1]){
1195  sorted = 0;
1196  break;
1197  }
1198  }
1199  }
1200 
1201  if (sorted){
1202  /* Caller already expects received data to be ordered by
1203  * the sending process rank.
1204  */
1205 
1206  // TODO: Optimize - original just set ptr
1207  outbuf = send_data;
1208  inbuf = recv_data;
1209  }
1210  else {
1211  inbuf.resize(plan->total_recv_size * nbytes);
1212  }
1213 
1214  for (int p = 0; p < nprocs; p++) {
1215  int length = 0;
1216 
1217  if (i < nRecvMsgs){
1218  if (plan->procs_from[i] == p){
1219 
1220  if (!plan->using_sizes){
1221  length = plan->lengths_from[i] * nbytes;
1222  }
1223  else{
1224  length = plan->sizes_from[i] * nbytes;
1225  }
1226  i++;
1227  }
1228  }
1229 
1230  inbufCounts[p] = length;
1231  if (p){
1232  inbufOffsets[p] = inbufOffsets[p-1] + inbufCounts[p-1];
1233  }
1234  }
1235 
1236  /* EXCHANGE DATA */
1237  MPI_Alltoallv(&(outbuf[0]), &(outbufCounts[0]), &(outbufOffsets[0]), MPI_BYTE,
1238  &(inbuf[0]), &(inbufCounts[0]), &(inbufOffsets[0]), MPI_BYTE, getRawComm());
1239 
1240  // TODO: Restorecan we just copy optimizations - original just set ptr
1241  //if (outbuf != send_data){
1242  // ZOLTAN_FREE(outbuf);
1243  //}
1244 
1245  /* WRITE RECEIVED DATA INTO USER'S BUFFER WHERE IT'S EXPECTED */
1246 
1247  if (!sorted){
1248 
1249  char * pBufPtr = &(inbuf[0]);
1250 
1251  if (!plan->using_sizes){
1252 
1253  /* each item in each message is nbytes long */
1254 
1255  if (plan->indices_from == Teuchos::null) {
1256  for (i=0; i < nRecvMsgs; i++){
1257  int offset = plan->starts_from[i] * nbytes;
1258  int length = plan->lengths_from[i] * nbytes;
1259  memcpy(&(recv_data[0]) + offset, pBufPtr, length);
1260  pBufPtr += length;
1261  }
1262  }
1263  else{
1264  int k = 0;
1265  for (i=0; i < nRecvMsgs; i++) {
1266 
1267  for (int j=0; j < plan->lengths_from[i]; j++,k++){
1268  int offset = plan->indices_from[k] * nbytes;
1269  memcpy(&(recv_data[0]) + offset, pBufPtr, nbytes);
1270  pBufPtr += nbytes;
1271  }
1272  }
1273  }
1274  }
1275  else{ /* (sizes!=NULL) && (indices_from!=NULL) not allowed by Zoltan_Comm_Resize */
1276 
1277  /* items can be different sizes */
1278 
1279  for (i=0; i < nRecvMsgs; i++){
1280  int offset = plan->starts_from_ptr[i] * nbytes;
1281  int length = plan->sizes_from[i] * nbytes;
1282  memcpy(&(recv_data[0]) + offset, pBufPtr, length);
1283  pBufPtr += length;
1284  }
1285  }
1286  }
1287 
1288  return 0;
1289 }
1290 
1292  int tag, /* message tag for communicating */
1293  const Teuchos::ArrayRCP<char> &send_data, /* array of data I currently own */
1294  int nbytes, /* msg size */
1295  const Teuchos::ArrayRCP<int> &sizes,
1296  Teuchos::ArrayRCP<char> &recv_data) /* array of data I'll own after reverse comm */
1297 {
1298  /* create plan->plan_reverse
1299  */
1300  int status = create_reverse_plan(tag, sizes);
1301 
1302  // NEW METHOD
1303  // Set up recv_data with the proper size
1304  // This information is only available after the above create_reverse_plan is
1305  // called so we can setup the return data with proper buffer size now.
1306  // However should we do this here?
1307  size_t new_size = plan_forward->plan_reverse->total_recv_size*nbytes;
1308  if(new_size > 0) {
1309  // have to be careful with new[0] which will for example turn a
1310  // arrayRCP.getRawPtr() from a valid NULL read to a debug assert fail.
1311  recv_data = Teuchos::arcp(new char[new_size], 0, new_size, true);
1312  }
1313 
1314  if (status == 0) {
1315 
1316  if (plan_forward->plan_reverse->maxed_recvs) {
1317 
1318  throw std::logic_error("UNTESTED COMM 11"); // needs unit testing
1319 
1320  /* use MPI_Alltoallv to implement plan->plan_reverse, because comm_do_post
1321  * would post more receives that allowed on this machine
1322  */
1323 
1324  status = do_all_to_all(plan_forward->plan_reverse, send_data,
1325  nbytes, recv_data);
1326  }
1327  else {
1328  /* use post/wait which is faster when each sends to few
1329  */
1330  status = do_post(plan_forward->plan_reverse, tag, send_data,
1331  nbytes, recv_data);
1332 
1333  if (status == 0) {
1334  status = do_wait (plan_forward->plan_reverse, tag, send_data,
1335  nbytes, recv_data);
1336  }
1337  }
1338  }
1339 
1340  free_reverse_plan(plan_forward);
1341 
1342  return status;
1343 }
1344 
1345 void Zoltan2_Directory_Comm::free_reverse_plan(Zoltan2_Directory_Plan *plan)
1346 {
1347  if(!plan) {
1348  throw std::logic_error("Plan is NULL!");
1349  }
1350  delete plan->plan_reverse;
1351  plan->plan_reverse = NULL;
1352 }
1353 
1354 int Zoltan2_Directory_Comm::create_reverse_plan(
1355  int tag,
1356  const Teuchos::ArrayRCP<int> &sizes)/* variable size of objects (if not size 0) */
1357 {
1358  /* Check input parameters */
1359  if (!plan_forward){
1360  throw std::logic_error("memory error");
1361  }
1362 
1363  /* Let Zoltan_Comm_Do check the remaining parameters. */
1364  plan_forward->plan_reverse = new Zoltan2_Directory_Plan;
1365  plan_forward->plan_reverse->getInvertedValues(plan_forward);
1366 
1367  if (MPI_RECV_LIMIT > 0){
1368  /* If we have a limit to the number of posted receives we are allowed,
1369  ** and our plan has exceeded that, then switch to an MPI_Alltoallv so
1370  ** that we will have fewer receives posted when we do the communication.
1371  */
1372  int global_nsends;
1373  Teuchos::reduceAll<int>(*plan_forward->comm, Teuchos::REDUCE_SUM, 1,
1374  &plan_forward->nsends, &global_nsends);
1375  if (global_nsends > MPI_RECV_LIMIT){
1376  plan_forward->plan_reverse->maxed_recvs = 1;
1377  }
1378  }
1379 
1380  if (plan_forward->plan_reverse->maxed_recvs == 0) {
1381  // See notes in header for MPI_Request
1382  // plan_forward->plan_reverse->request = Teuchos::arcp(new MPI_Request[plan_forward->plan_reverse->nrecvs], 0, plan_forward->plan_reverse->nrecvs, true);
1383  // plan_forward->plan_reverse->status = Teuchos::arcp(new MPI_Status[plan_forward->plan_reverse->nrecvs], 0, plan_forward->plan_reverse->nrecvs, true);
1384  plan_forward->plan_reverse->request.resize(plan_forward->plan_reverse->nrecvs);
1385  plan_forward->plan_reverse->status.resize(plan_forward->plan_reverse->nrecvs);
1386  }
1387 
1388  int sum_recv_sizes;
1389  int comm_flag = resize( plan_forward->plan_reverse,
1390  sizes, tag, &sum_recv_sizes);
1391 
1392  if (comm_flag != 0) {
1393  return(comm_flag);
1394  }
1395 
1396  if (sum_recv_sizes != plan_forward->plan_reverse->total_recv_size){
1397  /* Sanity check */
1398  return 1;
1399  }
1400 
1401  return 0;
1402 }
1403 
1405  const Teuchos::ArrayRCP<int> &sizes, /* size of each item I'm sending */
1406  int tag, /* message tag I can use */
1407  int *sum_recv_sizes) /* sum of the sizes of the items I'll receive */
1408 {
1409  return resize(plan_forward, sizes, tag, sum_recv_sizes);
1410 }
1411 
1413  Zoltan2_Directory_Plan *plan, /* communication plan object */
1414  const Teuchos::ArrayRCP<int> &sizes, /* size of each item I'm sending */
1415  int tag, /* message tag I can use */
1416  int *sum_recv_sizes) /* sum of the sizes of the items I'll receive */
1417 {
1418  /* If sizes vary, then I need to compute and communicate message lengths */
1419  /* First check if sizes array is NULL on all procs. */
1420  int my_proc = plan->comm->getRank(); /* my processor ID */
1421  int has_sizes = (sizes.size() != 0);
1422  int var_sizes; /* items have variable sizes? */
1423 
1424  // I think we'll need to do this raw, not with Teuchos
1425  MPI_Allreduce(&has_sizes, &var_sizes, 1, MPI_INT, MPI_LOR, getRawComm());
1426 
1427  if (var_sizes && plan->indices_from != Teuchos::null) {
1428  // NEW METHOD
1429  // Allow this to run now - the below implementation is working but perhaps
1430  // not done correctly for other usage cases I have not considered yet.
1431  // throw std::logic_error("Non-blocked, variable-sized recvs not supported");
1432  }
1433 
1434  int nsends = plan->nsends; /* number of msgs I'll send */
1435  int nrecvs = plan->nrecvs; /* number of msgs I'll recv */
1436  int self_msg = plan->self_msg;
1437 
1438  Teuchos::ArrayRCP<int> sizes_to;
1439  Teuchos::ArrayRCP<int> sizes_from;
1440  Teuchos::ArrayRCP<int> starts_to_ptr;
1441  Teuchos::ArrayRCP<int> starts_from_ptr;
1442  Teuchos::ArrayRCP<int> indices_to_ptr;
1443  Teuchos::ArrayRCP<int> indices_from_ptr;
1444 
1445  if (!var_sizes) { /* Easy case. Size = length */
1446  plan->total_recv_size = 0;
1447  for (int i = 0; i < nrecvs + self_msg; i++) {
1448  plan->total_recv_size += plan->lengths_from[i];
1449  }
1450 
1451  plan->max_send_size = 0;
1452  for (int i = 0; i < nsends + self_msg; i++) {
1453  if (plan->procs_to[i] != my_proc &&
1454  plan->lengths_to[i] > plan->max_send_size) {
1455  plan->max_send_size = plan->lengths_to[i];
1456  }
1457  }
1458  }
1459  else { /* Need to actually compute message sizes */
1460 
1461  // TODO Investigate purpose of the +1 in the old code. Is that used?
1462 
1463  // OLD CODE
1464  // plan->sizes.resize(plan->nvals + 1);
1465  // for (int i = 0; i < plan->nvals; i++) {
1466  // plan->sizes[i] = sizes[i];
1467  // }
1468 
1469  // NEW CODE
1470  plan->sizes = sizes; // can we just copy?
1471  plan->using_sizes = true;
1472 
1473  if(nsends + self_msg > 0) {
1474  sizes_to = Teuchos::arcp(
1475  new int[nsends + self_msg], 0, nsends + self_msg, true);
1476  for(int n = 0; n < sizes_to.size(); ++n) {
1477  sizes_to[n] = 0;
1478  }
1479  }
1480  if(nrecvs + self_msg > 0) {
1481  sizes_from = Teuchos::arcp(
1482  new int[nrecvs + self_msg], 0, nrecvs + self_msg, true);
1483  }
1484 
1485  /* Several cases:
1486  1. indices_to == NULL
1487  => starts_to != NULL, need to allocate, set starts_to_ptr
1488  2. indices_to != NULL (=> starts_to == NULL)
1489  need to allocate, set indices_to_ptr
1490  3,4. mirror cases for _from
1491  */
1492  if(nsends + self_msg > 0) {
1493  starts_to_ptr = Teuchos::arcp(
1494  new int[nsends + self_msg], 0, nsends + self_msg, true);
1495  }
1496  if (plan->indices_to == Teuchos::null) {
1497  /* Simpler case; sends already blocked by processor */
1498  Teuchos::ArrayRCP<int> index;
1499  Teuchos::ArrayRCP<int> sort_val;
1500  if(nsends + self_msg > 0) {
1501  index = Teuchos::arcp(
1502  new int[nsends + self_msg], 0, nsends + self_msg, true);
1503  sort_val = Teuchos::arcp(
1504  new int[nsends + self_msg], 0, nsends + self_msg, true);
1505  }
1506  for (int i = 0; i < nsends + self_msg; i++) {
1507  int j = plan->starts_to[i];
1508 
1509  for (int k = 0; k < plan->lengths_to[i]; k++) {
1510  sizes_to[i] += sizes[j++];
1511  }
1512  if (sizes_to[i] > plan->max_send_size &&
1513  plan->procs_to[i] != my_proc)
1514  plan->max_send_size = sizes_to[i];
1515  }
1516  for (int i = 0; i < nsends + self_msg; i++) {
1517  sort_val[i] = plan->starts_to[i];
1518  index[i] = i;
1519  }
1520  sort_ints(sort_val, index);
1521  int sum = 0;
1522  for (int i = 0; i < nsends + self_msg; i++) {
1523  starts_to_ptr[index[i]] = sum;
1524  sum += sizes_to[index[i]];
1525  }
1526  }
1527  else { /* Harder case, sends not blocked */
1528  Teuchos::ArrayRCP<int> offset;
1529  if(plan->nvals > 0) {
1530  offset = Teuchos::arcp(new int[plan->nvals], 0, plan->nvals, true);
1531  }
1532  indices_to_ptr.resize(plan->nvals);
1533 
1534  /* Compute address for every item in send array */
1535  int sum = 0;
1536  for (int i = 0; i < plan->nvals; i++) {
1537  offset[i] = sum;
1538  sum += sizes[i];
1539  }
1540 
1541  sum = 0;
1542  plan->max_send_size = 0;
1543  for (int i = 0; i < nsends + self_msg; i++) {
1544  starts_to_ptr[i] = sum;
1545  int j = plan->starts_to[i];
1546  for (int k = 0; k < plan->lengths_to[i]; k++) {
1547  indices_to_ptr[j] = offset[plan->indices_to[j]];
1548  sizes_to[i] += sizes[plan->indices_to[j++]];
1549  }
1550  if (sizes_to[i] > plan->max_send_size &&
1551  plan->procs_to[i] != my_proc)
1552  plan->max_send_size = sizes_to[i];
1553  sum += sizes_to[i];
1554  }
1555  }
1556 
1557  /* Note: This routine only gets message sizes, not object sizes. */
1558  /* Anything requiring item sizes requires more code */
1559  /* Should such functionality reside here? */
1560  exchange_sizes(sizes_to, plan->procs_to, nsends, self_msg,
1561  sizes_from, plan->procs_from, nrecvs,
1562  &plan->total_recv_size, my_proc, tag, plan->comm);
1563 
1564  if(nrecvs + self_msg > 0) {
1565  starts_from_ptr = Teuchos::arcp(
1566  new int[nrecvs + self_msg], 0, nrecvs + self_msg, true);
1567  }
1568 
1569  if (plan->indices_from == Teuchos::null) {
1570  /* Simpler case; recvs already blocked by processor */
1571  Teuchos::ArrayRCP<int> index;
1572  Teuchos::ArrayRCP<int> sort_val;
1573  if(nrecvs + self_msg > 0) {
1574  index = Teuchos::arcp(
1575  new int[nrecvs + self_msg], 0, nrecvs + self_msg, true);
1576  sort_val = Teuchos::arcp<int>(
1577  new int[nrecvs + self_msg], 0, nrecvs + self_msg, true);
1578  }
1579 
1580  for (int i = 0; i < nrecvs + self_msg; i++) {
1581  sort_val[i] = plan->starts_from[i];
1582  index[i] = i;
1583  }
1584  sort_ints(sort_val, index);
1585 
1586  int sum = 0;
1587  for (int i = 0; i < nrecvs + self_msg; i++) {
1588  starts_from_ptr[index[i]] = sum;
1589  sum += sizes_from[index[i]];
1590  }
1591  }
1592  else {
1593  // OLD COMMENT left here for reference but to be deleted
1594  /* Harder case, recvs not blocked */
1595  /* Not currently supported */
1596  /* Can't do w/o individual item sizes */
1597  /* Currently checked for at top of file */
1598 
1599  // NEW METHOD
1600  // Note this is currently just a duplicate of above block which is working
1601  // since I've set up do_wait to just copy the entire block. However I am
1602  // not sure yet how to organize this implementation and suspect this may
1603  // not be correct anyways - though it seems to work. After do_wait copies
1604  // the data (out of order) the directory will handle resorting it since
1605  // each element has index to indicate where it goes in the array. In the
1606  // original do_wait implementation it seemed it was setup to place the
1607  // elemenets in order, even though they would be sorted later, so that
1608  // part I need to discuss further
1609  Teuchos::ArrayRCP<int> index;
1610  Teuchos::ArrayRCP<int> sort_val;
1611  if(nrecvs + self_msg > 0) {
1612  index = Teuchos::arcp(
1613  new int[nrecvs + self_msg], 0, nrecvs + self_msg, true);
1614  sort_val = Teuchos::arcp(
1615  new int[nrecvs + self_msg], 0, nrecvs + self_msg, true);
1616  }
1617 
1618  for (int i = 0; i < nrecvs + self_msg; i++) {
1619  sort_val[i] = plan->starts_from[i];
1620  index[i] = i;
1621  }
1622  sort_ints(sort_val, index);
1623 
1624  int sum = 0;
1625  for (int i = 0; i < nrecvs + self_msg; i++) {
1626  starts_from_ptr[index[i]] = sum;
1627  sum += sizes_from[index[i]];
1628  }
1629  }
1630  }
1631  plan->sizes_to = sizes_to;
1632  plan->sizes_from = sizes_from;
1633  plan->starts_to_ptr = starts_to_ptr;
1634  plan->starts_from_ptr = starts_from_ptr;
1635  plan->indices_to_ptr = indices_to_ptr;
1636  plan->indices_from_ptr = indices_from_ptr;
1637 
1638  if (sum_recv_sizes) {
1639  *sum_recv_sizes = plan->total_recv_size;
1640  }
1641 
1642  return 0;
1643 }
1644 
1645 int Zoltan2_Directory_Comm::exchange_sizes(
1646  const Teuchos::ArrayRCP<int> &sizes_to, /* value I need to exchange (size of true msg) */
1647  const Teuchos::ArrayRCP<int> &procs_to, /* procs I send to */
1648  int nsends, /* number of messages I'll send */
1649  int self_msg, /* do I copy data to myself? */
1650  Teuchos::ArrayRCP<int> &sizes_from, /* (returned) size of all my receives */
1651  const Teuchos::ArrayRCP<int> &procs_from, /* procs I recv from */
1652  int nrecvs, /* number of messages I receive */
1653  int *total_recv_size, /* (returned) sum of all incoming sizes */
1654  int my_proc, /* my processor number */
1655  int tag, /* message tag I can use */
1656  Teuchos::RCP<const Teuchos::Comm<int> > /* comm */) { /* communicator */
1657 
1658  /* If sizes vary, then I need to communicate messaaage lengths */
1659  int self_index_to = -1; /* location of self in procs_to */
1660  for (int i = 0; i < nsends + self_msg; i++) {
1661  if (procs_to[i] != my_proc) {
1662  // TODO: Teuchos::send(...)
1663  MPI_Send((void *) &sizes_to[i], 1, MPI_INT, procs_to[i], tag, getRawComm());
1664  }
1665  else {
1666  self_index_to = i;
1667  }
1668  }
1669 
1670  *total_recv_size = 0;
1671 
1672  for (int i = 0; i < nrecvs + self_msg; i++) {
1673  if (procs_from[i] != my_proc) {
1674  MPI_Status status; /* status of commuication operation */
1675  // TODO: Teuchos::receive(...)
1676  MPI_Recv((void *) &(sizes_from[i]), 1, MPI_INT, procs_from[i],
1677  tag, getRawComm(), &status);
1678  }
1679  else {
1680  sizes_from[i] = sizes_to[self_index_to];
1681  }
1682  *total_recv_size += sizes_from[i];
1683  }
1684  return 0;
1685 }
1686 
1687 } // end namespace Zoltan2
Teuchos::ArrayRCP< int > indices_from_ptr
void print(const std::string &headerMessage) const
int do_reverse(int tag, const Teuchos::ArrayRCP< char > &send_data, int nbytes, const Teuchos::ArrayRCP< int > &sizes, Teuchos::ArrayRCP< char > &recv_data)
Teuchos::ArrayRCP< char > getRecvBuff() const
Teuchos::ArrayRCP< int > indices_to_ptr
#define MPI_RECV_LIMIT
Teuchos::ArrayRCP< int > starts_from_ptr
int do_forward(int tag, const Teuchos::ArrayRCP< char > &send_data, int nbytes, Teuchos::ArrayRCP< char > &recv_data)
int resize(const Teuchos::ArrayRCP< int > &sizes, int tag, int *sum_recv_sizes)
Teuchos::RCP< const Teuchos::Comm< int > > comm
#define PRINT_VAL(val)
#define PRINT_VECTOR(v)
Zoltan2_Directory_Comm(int nvals, const Teuchos::ArrayRCP< int > &assign, Teuchos::RCP< const Teuchos::Comm< int > > comm, int tag)
Teuchos::ArrayRCP< int > starts_to_ptr
void getInvertedValues(Zoltan2_Directory_Plan *from)