COMBINATORIAL_BLAS  1.6
SpParHelper.cpp
Go to the documentation of this file.
1 /****************************************************************/
2 /* Parallel Combinatorial BLAS Library (for Graph Computations) */
3 /* version 1.6 -------------------------------------------------*/
4 /* date: 6/15/2017 ---------------------------------------------*/
5 /* authors: Ariful Azad, Aydin Buluc --------------------------*/
6 /****************************************************************/
7 /*
8  Copyright (c) 2010-2017, The Regents of the University of California
9 
10  Permission is hereby granted, free of charge, to any person obtaining a copy
11  of this software and associated documentation files (the "Software"), to deal
12  in the Software without restriction, including without limitation the rights
13  to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14  copies of the Software, and to permit persons to whom the Software is
15  furnished to do so, subject to the following conditions:
16 
17  The above copyright notice and this permission notice shall be included in
18  all copies or substantial portions of the Software.
19 
20  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21  IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22  FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23  AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24  LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25  OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26  THE SOFTWARE.
27  */
28 
29 
30 #include "usort/parUtils.h"
31 
32 namespace combblas {
33 
34 template <typename IT>
35 void SpParHelper::ReDistributeToVector(int* & map_scnt, std::vector< std::vector< IT > > & locs_send, std::vector< std::vector< std::string > > & data_send,
36  std::vector<std::array<char, MAXVERTNAME>> & distmapper_array, const MPI_Comm & comm)
37 {
38  int nprocs, myrank;
39  MPI_Comm_size(comm, &nprocs);
40  MPI_Comm_rank(comm, &myrank);
41 
42  int * map_rcnt = new int[nprocs];
43  MPI_Alltoall(map_scnt, 1, MPI_INT, map_rcnt, 1, MPI_INT, comm);
44  int * map_sdspl = new int[nprocs]();
45  int * map_rdspl = new int[nprocs]();
46  std::partial_sum(map_scnt, map_scnt+nprocs-1, map_sdspl+1);
47  std::partial_sum(map_rcnt, map_rcnt+nprocs-1, map_rdspl+1);
48  IT totmapsend = map_sdspl[nprocs-1] + map_scnt[nprocs-1];
49  IT totmaprecv = map_rdspl[nprocs-1] + map_rcnt[nprocs-1];
50 
51  // sendbuf is a pointer to array of MAXVERTNAME chars.
52  // Explicit grouping syntax is due to precedence of [] over *
53  // char* sendbuf[MAXVERTNAME] would have declared a MAXVERTNAME-length array of char pointers
54  char (*sendbuf)[MAXVERTNAME]; // each sendbuf[i] is type char[MAXVERTNAME]
55  sendbuf = (char (*)[MAXVERTNAME]) malloc(sizeof(char[MAXVERTNAME])* totmapsend); // notice that this is allocating a contiguous block of memory
56 
57  IT * sendinds = new IT[totmapsend];
58  for(int i=0; i<nprocs; ++i)
59  {
60  int loccnt = 0;
61  for(std::string s:data_send[i])
62  {
63  std::strcpy(sendbuf[map_sdspl[i]+loccnt], s.c_str());
64  loccnt++;
65  }
66  std::vector<std::string>().swap(data_send[i]); // free memory
67  }
68  for(int i=0; i<nprocs; ++i) // sanity check: received indices should be sorted by definition
69  {
70  std::copy(locs_send[i].begin(), locs_send[i].end(), sendinds+map_sdspl[i]);
71  std::vector<IT>().swap(locs_send[i]); // free memory
72  }
73 
74  char (*recvbuf)[MAXVERTNAME]; // recvbuf is of type char (*)[MAXVERTNAME]
75  recvbuf = (char (*)[MAXVERTNAME]) malloc(sizeof(char[MAXVERTNAME])* totmaprecv);
76 
77  MPI_Datatype MPI_STRING; // this is not necessary (we could just use char) but easier for bookkeeping
78  MPI_Type_contiguous(sizeof(char[MAXVERTNAME]), MPI_CHAR, &MPI_STRING);
79  MPI_Type_commit(&MPI_STRING);
80 
81  MPI_Alltoallv(sendbuf, map_scnt, map_sdspl, MPI_STRING, recvbuf, map_rcnt, map_rdspl, MPI_STRING, comm);
82  free(sendbuf); // can't delete[] so use free
83  MPI_Type_free(&MPI_STRING);
84 
85  IT * recvinds = new IT[totmaprecv];
86  MPI_Alltoallv(sendinds, map_scnt, map_sdspl, MPIType<IT>(), recvinds, map_rcnt, map_rdspl, MPIType<IT>(), comm);
87  DeleteAll(sendinds, map_scnt, map_sdspl, map_rcnt, map_rdspl);
88 
89  if(!std::is_sorted(recvinds, recvinds+totmaprecv))
90  std::cout << "Assertion failed at proc " << myrank << ": Received indices are not sorted, this is unexpected" << std::endl;
91 
92  for(IT i=0; i< totmaprecv; ++i)
93  {
94  assert(i == recvinds[i]);
95  std::copy(recvbuf[i], recvbuf[i]+MAXVERTNAME, distmapper_array[i].begin());
96  }
97  free(recvbuf);
98  delete [] recvinds;
99 }
100 
101 
102 template<typename KEY, typename VAL, typename IT>
103 void SpParHelper::MemoryEfficientPSort(std::pair<KEY,VAL> * array, IT length, IT * dist, const MPI_Comm & comm)
104 {
105  int nprocs, myrank;
106  MPI_Comm_size(comm, &nprocs);
107  MPI_Comm_rank(comm, &myrank);
108  int nsize = nprocs / 2; // new size
109  if(nprocs < 10000)
110  {
111  bool excluded = false;
112  if(dist[myrank] == 0) excluded = true;
113 
114  int nreals = 0;
115  for(int i=0; i< nprocs; ++i)
116  if(dist[i] != 0) ++nreals;
117 
118  //SpParHelper::MemoryEfficientPSort(vecpair, nnz, dist, World);
119 
120  if(nreals == nprocs) // general case
121  {
122  long * dist_in = new long[nprocs];
123  for(int i=0; i< nprocs; ++i) dist_in[i] = (long) dist[i];
124  vpsort::parallel_sort (array, array+length, dist_in, comm);
125  delete [] dist_in;
126  }
127  else
128  {
129  long * dist_in = new long[nreals];
130  int * dist_out = new int[nprocs-nreals]; // ranks to exclude
131  int indin = 0;
132  int indout = 0;
133  for(int i=0; i< nprocs; ++i)
134  {
135  if(dist[i] == 0)
136  dist_out[indout++] = i;
137  else
138  dist_in[indin++] = (long) dist[i];
139  }
140 
141  #ifdef DEBUG
142  std::ostringstream outs;
143  outs << "To exclude indices: ";
144  std::copy(dist_out, dist_out+indout, std::ostream_iterator<int>(outs, " ")); outs << std::endl;
145  SpParHelper::Print(outs.str());
146  #endif
147 
148  MPI_Group sort_group, real_group;
149  MPI_Comm_group(comm, &sort_group);
150  MPI_Group_excl(sort_group, indout, dist_out, &real_group);
151  MPI_Group_free(&sort_group);
152 
153  // The Create() function should be executed by all processes in comm,
154  // even if they do not belong to the new group (in that case MPI_COMM_NULL is returned as real_comm?)
155  // MPI::Intracomm MPI::Intracomm::Create(const MPI::Group& group) const;
156  MPI_Comm real_comm;
157  MPI_Comm_create(comm, real_group, &real_comm);
158  if(!excluded)
159  {
160  vpsort::parallel_sort (array, array+length, dist_in, real_comm);
161  MPI_Comm_free(&real_comm);
162  }
163  MPI_Group_free(&real_group);
164  delete [] dist_in;
165  delete [] dist_out;
166  }
167  }
168  else
169  {
170  IT gl_median = std::accumulate(dist, dist+nsize, static_cast<IT>(0)); // global rank of the first element of the median processor
171  sort(array, array+length); // re-sort because we might have swapped data in previous iterations
172  int color = (myrank < nsize)? 0: 1;
173 
174  std::pair<KEY,VAL> * low = array;
175  std::pair<KEY,VAL> * upp = array;
176  GlobalSelect(gl_median, low, upp, array, length, comm);
177  BipartiteSwap(low, array, length, nsize, color, comm);
178 
179  if(color == 1) dist = dist + nsize; // adjust for the second half of processors
180 
181  // recursive call; two implicit 'spawn's where half of the processors execute different paramaters
182  // MPI::Intracomm MPI::Intracomm::Split(int color, int key) const;
183 
184  MPI_Comm halfcomm;
185  MPI_Comm_split(comm, color, myrank, &halfcomm); // split into two communicators
186  MemoryEfficientPSort(array, length, dist, halfcomm);
187  }
188 
189 }
190 
191 
192 /*
193  TODO: This function is just a hack at this moment.
194  The payload (VAL) can only be integer at this moment.
195  FIX this.
196  */
197 template<typename KEY, typename VAL, typename IT>
198 std::vector<std::pair<KEY,VAL>> SpParHelper::KeyValuePSort(std::pair<KEY,VAL> * array, IT length, IT * dist, const MPI_Comm & comm)
199 {
200  int nprocs, myrank;
201  MPI_Comm_size(comm, &nprocs);
202  MPI_Comm_rank(comm, &myrank);
203  int nsize = nprocs / 2; // new size
204 
205 
206 
207  bool excluded = false;
208  if(dist[myrank] == 0) excluded = true;
209 
210  int nreals = 0;
211  for(int i=0; i< nprocs; ++i)
212  if(dist[i] != 0) ++nreals;
213 
214  std::vector<IndexHolder<KEY>> in(length);
215 #ifdef THREADED
216 #pragma omp parallel for
217 #endif
218  for(int i=0; i< length; ++i)
219  {
220  in[i] = IndexHolder<KEY>(array[i].first, static_cast<unsigned long>(array[i].second));
221  }
222 
223  if(nreals == nprocs) // general case
224  {
225  par::sampleSort(in, comm);
226  }
227  else
228  {
229  long * dist_in = new long[nreals];
230  int * dist_out = new int[nprocs-nreals]; // ranks to exclude
231  int indin = 0;
232  int indout = 0;
233  for(int i=0; i< nprocs; ++i)
234  {
235  if(dist[i] == 0)
236  dist_out[indout++] = i;
237  else
238  dist_in[indin++] = (long) dist[i];
239  }
240 
241 #ifdef DEBUG
242  std::ostringstream outs;
243  outs << "To exclude indices: ";
244  std::copy(dist_out, dist_out+indout, std::ostream_iterator<int>(outs, " ")); outs << std::endl;
245  SpParHelper::Print(outs.str());
246 #endif
247 
248  MPI_Group sort_group, real_group;
249  MPI_Comm_group(comm, &sort_group);
250  MPI_Group_excl(sort_group, indout, dist_out, &real_group);
251  MPI_Group_free(&sort_group);
252 
253  // The Create() function should be executed by all processes in comm,
254  // even if they do not belong to the new group (in that case MPI_COMM_NULL is returned as real_comm?)
255  // MPI::Intracomm MPI::Intracomm::Create(const MPI::Group& group) const;
256  MPI_Comm real_comm;
257  MPI_Comm_create(comm, real_group, &real_comm);
258  if(!excluded)
259  {
260  par::sampleSort(in, real_comm);
261  MPI_Comm_free(&real_comm);
262  }
263  MPI_Group_free(&real_group);
264  delete [] dist_in;
265  delete [] dist_out;
266  }
267 
268  std::vector<std::pair<KEY,VAL>> sorted(in.size());
269  for(int i=0; i<in.size(); i++)
270  {
271  sorted[i].second = static_cast<VAL>(in[i].index);
272  sorted[i].first = in[i].value;
273  }
274  return sorted;
275 }
276 
277 
278 template<typename KEY, typename VAL, typename IT>
279 void SpParHelper::GlobalSelect(IT gl_rank, std::pair<KEY,VAL> * & low, std::pair<KEY,VAL> * & upp, std::pair<KEY,VAL> * array, IT length, const MPI_Comm & comm)
280 {
281  int nprocs, myrank;
282  MPI_Comm_size(comm, &nprocs);
283  MPI_Comm_rank(comm, &myrank);
284  IT begin = 0;
285  IT end = length; // initially everyone is active
286  std::pair<KEY, double> * wmminput = new std::pair<KEY,double>[nprocs]; // (median, #{actives})
287 
288  MPI_Datatype MPI_sortType;
289  MPI_Type_contiguous (sizeof(std::pair<KEY,double>), MPI_CHAR, &MPI_sortType);
290  MPI_Type_commit (&MPI_sortType);
291 
292  KEY wmm; // our median pick
293  IT gl_low, gl_upp;
294  IT active = end-begin; // size of the active range
295  IT nacts = 0;
296  bool found = 0;
297  int iters = 0;
298 
299  /* changes by shan : to add begin0 and end0 for exit condition */
300  IT begin0, end0;
301  do
302  {
303  iters++;
304  begin0 = begin; end0 = end;
305  KEY median = array[(begin + end)/2].first; // median of the active range
306  wmminput[myrank].first = median;
307  wmminput[myrank].second = static_cast<double>(active);
308  MPI_Allgather(MPI_IN_PLACE, 0, MPI_sortType, wmminput, 1, MPI_sortType, comm);
309  double totact = 0; // total number of active elements
310  for(int i=0; i<nprocs; ++i)
311  totact += wmminput[i].second;
312 
313  // input to weighted median of medians is a set of (object, weight) pairs
314  // the algorithm computes the first set of elements (according to total
315  // order of "object"s), whose sum is still less than or equal to 1/2
316  for(int i=0; i<nprocs; ++i)
317  wmminput[i].second /= totact ; // normalize the weights
318 
319  sort(wmminput, wmminput+nprocs); // sort w.r.t. medians
320  double totweight = 0;
321  int wmmloc=0;
322  while( wmmloc<nprocs && totweight < 0.5 )
323  {
324  totweight += wmminput[wmmloc++].second;
325  }
326 
327  wmm = wmminput[wmmloc-1].first; // weighted median of medians
328 
329  std::pair<KEY,VAL> wmmpair = std::make_pair(wmm, VAL());
330  low =std::lower_bound (array+begin, array+end, wmmpair);
331  upp =std::upper_bound (array+begin, array+end, wmmpair);
332  IT loc_low = low-array; // #{elements smaller than wmm}
333  IT loc_upp = upp-array; // #{elements smaller or equal to wmm}
334 
335  MPI_Allreduce( &loc_low, &gl_low, 1, MPIType<IT>(), MPI_SUM, comm);
336  MPI_Allreduce( &loc_upp, &gl_upp, 1, MPIType<IT>(), MPI_SUM, comm);
337 
338  if(gl_upp < gl_rank)
339  {
340  // our pick was too small; only recurse to the right
341  begin = (low - array);
342  }
343  else if(gl_rank < gl_low)
344  {
345  // our pick was too big; only recurse to the left
346  end = (upp - array);
347  }
348  else
349  {
350  found = true;
351  }
352  active = end-begin;
353  MPI_Allreduce(&active, &nacts, 1, MPIType<IT>(), MPI_SUM, comm);
354  if (begin0 == begin && end0 == end) break; // ABAB: Active range did not shrink, so we break (is this kosher?)
355  }
356  while((nacts > 2*nprocs) && (!found));
357  delete [] wmminput;
358 
359  MPI_Datatype MPI_pairType;
360  MPI_Type_contiguous (sizeof(std::pair<KEY,VAL>), MPI_CHAR, &MPI_pairType);
361  MPI_Type_commit (&MPI_pairType);
362 
363  int * nactives = new int[nprocs];
364  nactives[myrank] = static_cast<int>(active); // At this point, actives are small enough
365  MPI_Allgather(MPI_IN_PLACE, 0, MPI_INT, nactives, 1, MPI_INT, comm);
366  int * dpls = new int[nprocs](); // displacements (zero initialized pid)
367  std::partial_sum(nactives, nactives+nprocs-1, dpls+1);
368  std::pair<KEY,VAL> * recvbuf = new std::pair<KEY,VAL>[nacts];
369  low = array + begin; // update low to the beginning of the active range
370  MPI_Allgatherv(low, active, MPI_pairType, recvbuf, nactives, dpls, MPI_pairType, comm);
371 
372  std::pair<KEY,int> * allactives = new std::pair<KEY,int>[nacts];
373  int k = 0;
374  for(int i=0; i<nprocs; ++i)
375  {
376  for(int j=0; j<nactives[i]; ++j)
377  {
378  allactives[k] = std::make_pair(recvbuf[k].first, i);
379  k++;
380  }
381  }
382  DeleteAll(recvbuf, dpls, nactives);
383  sort(allactives, allactives+nacts);
384  MPI_Allreduce(&begin, &gl_low, 1, MPIType<IT>(), MPI_SUM, comm); // update
385  int diff = gl_rank - gl_low;
386  for(int k=0; k < diff; ++k)
387  {
388  if(allactives[k].second == myrank)
389  ++low; // increment the local pointer
390  }
391  delete [] allactives;
392  begin = low-array;
393  MPI_Allreduce(&begin, &gl_low, 1, MPIType<IT>(), MPI_SUM, comm); // update
394 }
395 
396 template<typename KEY, typename VAL, typename IT>
397 void SpParHelper::BipartiteSwap(std::pair<KEY,VAL> * low, std::pair<KEY,VAL> * array, IT length, int nfirsthalf, int color, const MPI_Comm & comm)
398 {
399  int nprocs, myrank;
400  MPI_Comm_size(comm, &nprocs);
401  MPI_Comm_rank(comm, &myrank);
402 
403  IT * firsthalves = new IT[nprocs];
404  IT * secondhalves = new IT[nprocs];
405  firsthalves[myrank] = low-array;
406  secondhalves[myrank] = length - (low-array);
407 
408  MPI_Allgather(MPI_IN_PLACE, 0, MPIType<IT>(), firsthalves, 1, MPIType<IT>(), comm);
409  MPI_Allgather(MPI_IN_PLACE, 0, MPIType<IT>(), secondhalves, 1, MPIType<IT>(), comm);
410 
411  int * sendcnt = new int[nprocs](); // zero initialize
412  int totrecvcnt = 0;
413 
414  std::pair<KEY,VAL> * bufbegin = NULL;
415  if(color == 0) // first processor half, only send second half of data
416  {
417  bufbegin = low;
418  totrecvcnt = length - (low-array);
419  IT beg_oftransfer = std::accumulate(secondhalves, secondhalves+myrank, static_cast<IT>(0));
420  IT spaceafter = firsthalves[nfirsthalf];
421  int i=nfirsthalf+1;
422  while(i < nprocs && spaceafter < beg_oftransfer)
423  {
424  spaceafter += firsthalves[i++]; // post-incremenet
425  }
426  IT end_oftransfer = beg_oftransfer + secondhalves[myrank]; // global index (within second half) of the end of my data
427  IT beg_pour = beg_oftransfer;
428  IT end_pour = std::min(end_oftransfer, spaceafter);
429  sendcnt[i-1] = end_pour - beg_pour;
430  while( i < nprocs && spaceafter < end_oftransfer ) // find other recipients until I run out of data
431  {
432  beg_pour = end_pour;
433  spaceafter += firsthalves[i];
434  end_pour = std::min(end_oftransfer, spaceafter);
435  sendcnt[i++] = end_pour - beg_pour; // post-increment
436  }
437  }
438  else if(color == 1) // second processor half, only send first half of data
439  {
440  bufbegin = array;
441  totrecvcnt = low-array;
442  // global index (within the second processor half) of the beginning of my data
443  IT beg_oftransfer = std::accumulate(firsthalves+nfirsthalf, firsthalves+myrank, static_cast<IT>(0));
444  IT spaceafter = secondhalves[0];
445  int i=1;
446  while( i< nfirsthalf && spaceafter < beg_oftransfer)
447  {
448  //spacebefore = spaceafter;
449  spaceafter += secondhalves[i++]; // post-increment
450  }
451  IT end_oftransfer = beg_oftransfer + firsthalves[myrank]; // global index (within second half) of the end of my data
452  IT beg_pour = beg_oftransfer;
453  IT end_pour = std::min(end_oftransfer, spaceafter);
454  sendcnt[i-1] = end_pour - beg_pour;
455  while( i < nfirsthalf && spaceafter < end_oftransfer ) // find other recipients until I run out of data
456  {
457  beg_pour = end_pour;
458  spaceafter += secondhalves[i];
459  end_pour = std::min(end_oftransfer, spaceafter);
460  sendcnt[i++] = end_pour - beg_pour; // post-increment
461  }
462  }
463  DeleteAll(firsthalves, secondhalves);
464  int * recvcnt = new int[nprocs];
465  MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, comm); // get the recv counts
466  // Alltoall is actually unnecessary, because sendcnt = recvcnt
467  // If I have n_mine > n_yours data to send, then I can send you only n_yours
468  // as this is your space, and you'll send me identical amount.
469  // Then I can only receive n_mine - n_yours from the third processor and
470  // that processor can only send n_mine - n_yours to me back.
471  // The proof follows from induction
472 
473  MPI_Datatype MPI_valueType;
474  MPI_Type_contiguous(sizeof(std::pair<KEY,VAL>), MPI_CHAR, &MPI_valueType);
475  MPI_Type_commit(&MPI_valueType);
476 
477  std::pair<KEY,VAL> * receives = new std::pair<KEY,VAL>[totrecvcnt];
478  int * sdpls = new int[nprocs](); // displacements (zero initialized pid)
479  int * rdpls = new int[nprocs]();
480  std::partial_sum(sendcnt, sendcnt+nprocs-1, sdpls+1);
481  std::partial_sum(recvcnt, recvcnt+nprocs-1, rdpls+1);
482 
483  MPI_Alltoallv(bufbegin, sendcnt, sdpls, MPI_valueType, receives, recvcnt, rdpls, MPI_valueType, comm); // sparse swap
484 
485  DeleteAll(sendcnt, recvcnt, sdpls, rdpls);
486  std::copy(receives, receives+totrecvcnt, bufbegin);
487  delete [] receives;
488 }
489 
490 
491 template<typename KEY, typename VAL, typename IT>
492 void SpParHelper::DebugPrintKeys(std::pair<KEY,VAL> * array, IT length, IT * dist, MPI_Comm & World)
493 {
494  int rank, nprocs;
495  MPI_Comm_rank(World, &rank);
496  MPI_Comm_size(World, &nprocs);
497  MPI_File thefile;
498 
499  char _fn[] = "temp_sortedkeys"; // AL: this is to avoid the problem that C++ string literals are const char* while C string literals are char*, leading to a const warning (technically error, but compilers are tolerant)
500  MPI_File_open(World, _fn, MPI_MODE_CREATE | MPI_MODE_WRONLY, MPI_INFO_NULL, &thefile);
501 
502  // The cast in the last parameter is crucial because the signature of the function is
503  // T accumulate ( InputIterator first, InputIterator last, T init )
504  // Hence if init if of type "int", the output also becomes an it (remember C++ signatures are ignorant of return value)
505  IT sizeuntil = std::accumulate(dist, dist+rank, static_cast<IT>(0));
506 
507  MPI_Offset disp = sizeuntil * sizeof(KEY); // displacement is in bytes
508  MPI_File_set_view(thefile, disp, MPIType<KEY>(), MPIType<KEY>(), "native", MPI_INFO_NULL);
509 
510  KEY * packed = new KEY[length];
511  for(int i=0; i<length; ++i)
512  {
513  packed[i] = array[i].first;
514  }
515  MPI_File_write(thefile, packed, length, MPIType<KEY>(), NULL);
516  MPI_File_close(&thefile);
517  delete [] packed;
518 
519  // Now let processor-0 read the file and print
520  if(rank == 0)
521  {
522  FILE * f = fopen("temp_sortedkeys", "r");
523  if(!f)
524  {
525  std::cerr << "Problem reading binary input file\n";
526  return;
527  }
528  IT maxd = *std::max_element(dist, dist+nprocs);
529  KEY * data = new KEY[maxd];
530 
531  for(int i=0; i<nprocs; ++i)
532  {
533  // read n_per_proc integers and print them
534  fread(data, sizeof(KEY), dist[i],f);
535 
536  std::cout << "Elements stored on proc " << i << ": " << std::endl;
537  std::copy(data, data+dist[i], std::ostream_iterator<KEY>(std::cout, "\n"));
538  }
539  delete [] data;
540  }
541 }
542 
543 
551 template <class IT, class NT, class DER>
552 void SpParHelper::FetchMatrix(SpMat<IT,NT,DER> & MRecv, const std::vector<IT> & essentials, std::vector<MPI_Win> & arrwin, int ownind)
553 {
554  MRecv.Create(essentials); // allocate memory for arrays
555 
556  Arr<IT,NT> arrinfo = MRecv.GetArrays();
557  assert( (arrwin.size() == arrinfo.totalsize()));
558 
559  // C-binding for MPI::Get
560  // int MPI_Get(void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, MPI_Aint target_disp,
561  // int target_count, MPI_Datatype target_datatype, MPI_Win win)
562 
563  IT essk = 0;
564  for(int i=0; i< arrinfo.indarrs.size(); ++i) // get index arrays
565  {
566  //arrwin[essk].Lock(MPI::LOCK_SHARED, ownind, 0);
567  MPI_Get( arrinfo.indarrs[i].addr, arrinfo.indarrs[i].count, MPIType<IT>(), ownind, 0, arrinfo.indarrs[i].count, MPIType<IT>(), arrwin[essk++]);
568  }
569  for(int i=0; i< arrinfo.numarrs.size(); ++i) // get numerical arrays
570  {
571  //arrwin[essk].Lock(MPI::LOCK_SHARED, ownind, 0);
572  MPI_Get(arrinfo.numarrs[i].addr, arrinfo.numarrs[i].count, MPIType<NT>(), ownind, 0, arrinfo.numarrs[i].count, MPIType<NT>(), arrwin[essk++]);
573  }
574 }
575 
576 
582 template<typename IT, typename NT, typename DER>
583 void SpParHelper::BCastMatrix(MPI_Comm & comm1d, SpMat<IT,NT,DER> & Matrix, const std::vector<IT> & essentials, int root)
584 {
585  int myrank;
586  MPI_Comm_rank(comm1d, &myrank);
587  if(myrank != root)
588  {
589  Matrix.Create(essentials); // allocate memory for arrays
590  }
591 
592  Arr<IT,NT> arrinfo = Matrix.GetArrays();
593  for(unsigned int i=0; i< arrinfo.indarrs.size(); ++i) // get index arrays
594  {
595  MPI_Bcast(arrinfo.indarrs[i].addr, arrinfo.indarrs[i].count, MPIType<IT>(), root, comm1d);
596  }
597  for(unsigned int i=0; i< arrinfo.numarrs.size(); ++i) // get numerical arrays
598  {
599  MPI_Bcast(arrinfo.numarrs[i].addr, arrinfo.numarrs[i].count, MPIType<NT>(), root, comm1d);
600  }
601 }
602 
603 
611 template<typename IT, typename NT, typename DER>
612 void SpParHelper::GatherMatrix(MPI_Comm & comm1d, SpMat<IT,NT,DER> & Matrix, int root)
613 {
614  int myrank, nprocs;
615  MPI_Comm_rank(comm1d, &myrank);
616  MPI_Comm_size(comm1d,&nprocs);
617 
618  /*
619  if(myrank != root)
620  {
621  Matrix.Create(essentials); // allocate memory for arrays
622  }
623  */
624 
625  Arr<IT,NT> arrinfo = Matrix.GetArrays();
626  std::vector<std::vector<int>> recvcnt_ind(arrinfo.indarrs.size());
627  std::vector<std::vector<int>> recvcnt_num(arrinfo.numarrs.size());
628  for(unsigned int i=0; i< arrinfo.indarrs.size(); ++i) // get index arrays
629  {
630  recvcnt_ind[i].resize(nprocs);
631  int lcount = (int)arrinfo.indarrs[i].count;
632  MPI_Gather(&lcount, 1, MPI_INT, recvcnt_ind[i].data(),1, MPI_INT, root, comm1d);
633  }
634  for(unsigned int i=0; i< arrinfo.numarrs.size(); ++i) // get numerical arrays
635  {
636  recvcnt_num[i].resize(nprocs);
637  int lcount = (int) arrinfo.numarrs[i].count;
638  MPI_Gather(&lcount, 1, MPI_INT, recvcnt_num[i].data(),1, MPI_INT, root, comm1d);
639  }
640 
641  // now gather the actual vector
642  std::vector<std::vector<int>> recvdsp_ind(arrinfo.indarrs.size());
643  std::vector<std::vector<int>> recvdsp_num(arrinfo.numarrs.size());
644  std::vector<std::vector<IT>> recvind(arrinfo.indarrs.size());
645  std::vector<std::vector<IT>> recvnum(arrinfo.numarrs.size());
646  for(unsigned int i=0; i< arrinfo.indarrs.size(); ++i) // get index arrays
647  {
648  recvdsp_ind[i].resize(nprocs);
649  recvdsp_ind[i][0] = 0;
650  for(int j=1; j<nprocs; j++)
651  recvdsp_ind[i][j] = recvdsp_ind[i][j-1] + recvcnt_ind[i][j-1];
652  recvind[i].resize(recvdsp_ind[i][nprocs-1] + recvcnt_ind[i][nprocs-1]);
653  MPI_Gatherv(arrinfo.indarrs[i].addr, arrinfo.indarrs[i].count, MPIType<IT>(), recvind[i].data(),recvcnt_ind[i].data(), recvdsp_ind[i].data(), MPIType<IT>(), root, comm1d);
654  }
655 
656 
657  for(unsigned int i=0; i< arrinfo.numarrs.size(); ++i) // gather num arrays
658  {
659  recvdsp_num[i].resize(nprocs);
660  recvdsp_num[i][0] = 0;
661  for(int j=1; j<nprocs; j++)
662  recvdsp_num[i][j] = recvdsp_num[i][j-1] + recvcnt_num[i][j-1];
663  recvnum[i].resize(recvdsp_num[i][nprocs-1] + recvcnt_num[i][nprocs-1]);
664  MPI_Gatherv(arrinfo.numarrs[i].addr, arrinfo.numarrs[i].count, MPIType<NT>(), recvnum[i].data(),recvcnt_num[i].data(), recvdsp_num[i].data(), MPIType<NT>(), root, comm1d);
665  }
666 }
667 
668 
669 template <class IT, class NT, class DER>
670 void SpParHelper::SetWindows(MPI_Comm & comm1d, const SpMat< IT,NT,DER > & Matrix, std::vector<MPI_Win> & arrwin)
671 {
672  Arr<IT,NT> arrs = Matrix.GetArrays();
673 
674  // static MPI::Win MPI::Win::create(const void *base, MPI::Aint size, int disp_unit, MPI::Info info, const MPI_Comm & comm);
675  // The displacement unit argument is provided to facilitate address arithmetic in RMA operations
676  // *** COLLECTIVE OPERATION ***, everybody exposes its own array to everyone else in the communicator
677 
678  for(int i=0; i< arrs.indarrs.size(); ++i)
679  {
680  MPI_Win nWin;
681  MPI_Win_create(arrs.indarrs[i].addr,
682  arrs.indarrs[i].count * sizeof(IT), sizeof(IT), MPI_INFO_NULL, comm1d, &nWin);
683  arrwin.push_back(nWin);
684  }
685  for(int i=0; i< arrs.numarrs.size(); ++i)
686  {
687  MPI_Win nWin;
688  MPI_Win_create(arrs.numarrs[i].addr,
689  arrs.numarrs[i].count * sizeof(NT), sizeof(NT), MPI_INFO_NULL, comm1d, &nWin);
690  arrwin.push_back(nWin);
691  }
692 }
693 
694 inline void SpParHelper::LockWindows(int ownind, std::vector<MPI_Win> & arrwin)
695 {
696  for(std::vector<MPI_Win>::iterator itr = arrwin.begin(); itr != arrwin.end(); ++itr)
697  {
698  MPI_Win_lock(MPI_LOCK_SHARED, ownind, 0, *itr);
699  }
700 }
701 
702 inline void SpParHelper::UnlockWindows(int ownind, std::vector<MPI_Win> & arrwin)
703 {
704  for(std::vector<MPI_Win>::iterator itr = arrwin.begin(); itr != arrwin.end(); ++itr)
705  {
706  MPI_Win_unlock( ownind, *itr);
707  }
708 }
709 
710 
715 inline void SpParHelper::StartAccessEpoch(int owner, std::vector<MPI_Win> & arrwin, MPI_Group & group)
716 {
717  /* Now start using the whole comm as a group */
718  int acc_ranks[1];
719  acc_ranks[0] = owner;
720  MPI_Group access;
721  MPI_Group_incl(group, 1, acc_ranks, &access); // take only the owner
722 
723  // begin the ACCESS epochs for the arrays of the remote matrices A and B
724  // Start() *may* block until all processes in the target group have entered their exposure epoch
725  for(unsigned int i=0; i< arrwin.size(); ++i)
726  MPI_Win_start(access, 0, arrwin[i]);
727 
728  MPI_Group_free(&access);
729 }
730 
734 inline void SpParHelper::PostExposureEpoch(int self, std::vector<MPI_Win> & arrwin, MPI_Group & group)
735 {
736  // begin the EXPOSURE epochs for the arrays of the local matrices A and B
737  for(unsigned int i=0; i< arrwin.size(); ++i)
738  MPI_Win_post(group, MPI_MODE_NOPUT, arrwin[i]);
739 }
740 
741 template <class IT, class DER>
742 void SpParHelper::AccessNFetch(DER * & Matrix, int owner, std::vector<MPI_Win> & arrwin, MPI_Group & group, IT ** sizes)
743 {
744  StartAccessEpoch(owner, arrwin, group); // start the access epoch to arrwin of owner
745 
746  std::vector<IT> ess(DER::esscount); // pack essentials to a vector
747  for(int j=0; j< DER::esscount; ++j)
748  ess[j] = sizes[j][owner];
749 
750  Matrix = new DER(); // create the object first
751  FetchMatrix(*Matrix, ess, arrwin, owner); // then start fetching its elements
752 }
753 
754 template <class IT, class DER>
755 void SpParHelper::LockNFetch(DER * & Matrix, int owner, std::vector<MPI_Win> & arrwin, MPI_Group & group, IT ** sizes)
756 {
757  LockWindows(owner, arrwin);
758 
759  std::vector<IT> ess(DER::esscount); // pack essentials to a vector
760  for(int j=0; j< DER::esscount; ++j)
761  ess[j] = sizes[j][owner];
762 
763  Matrix = new DER(); // create the object first
764  FetchMatrix(*Matrix, ess, arrwin, owner); // then start fetching its elements
765 }
766 
772 template <class IT, class NT, class DER>
773 void SpParHelper::GetSetSizes(const SpMat<IT,NT,DER> & Matrix, IT ** & sizes, MPI_Comm & comm1d)
774 {
775  std::vector<IT> essentials = Matrix.GetEssentials();
776  int index;
777  MPI_Comm_rank(comm1d, &index);
778 
779  for(IT i=0; (unsigned)i < essentials.size(); ++i)
780  {
781  sizes[i][index] = essentials[i];
782  MPI_Allgather(MPI_IN_PLACE, 1, MPIType<IT>(), sizes[i], 1, MPIType<IT>(), comm1d);
783  }
784 }
785 
786 inline void SpParHelper::PrintFile(const std::string & s, const std::string & filename)
787 {
788  int myrank;
789  MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
790  if(myrank == 0)
791  {
792  std::ofstream out(filename.c_str(), std::ofstream::app);
793  out << s;
794  out.close();
795  }
796 }
797 
798 inline void SpParHelper::PrintFile(const std::string & s, const std::string & filename, MPI_Comm & world)
799 {
800  int myrank;
801  MPI_Comm_rank(world, &myrank);
802  if(myrank == 0)
803  {
804  std::ofstream out(filename.c_str(), std::ofstream::app);
805  out << s;
806  out.close();
807  }
808 }
809 
810 
811 inline void SpParHelper::Print(const std::string & s)
812 {
813  int myrank;
814  MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
815  if(myrank == 0)
816  {
817  std::cerr << s;
818  }
819 }
820 
821 inline void SpParHelper::Print(const std::string & s, MPI_Comm & world)
822 {
823  int myrank;
824  MPI_Comm_rank(world, &myrank);
825  if(myrank == 0)
826  {
827  std::cerr << s;
828  }
829 }
830 
831 
832 inline void SpParHelper::check_newline(int *bytes_read, int bytes_requested, char *buf)
833 {
834  if ((*bytes_read) < bytes_requested) {
835  // fewer bytes than expected, this means EOF
836  if (buf[(*bytes_read) - 1] != '\n') {
837  // doesn't terminate with a newline, add one to prevent infinite loop later
838  buf[(*bytes_read) - 1] = '\n';
839  std::cout << "Error in Matrix Market format, appending missing newline at end of file" << std::endl;
840  (*bytes_read)++;
841  }
842  }
843 }
844 
845 
846 inline bool SpParHelper::FetchBatch(MPI_File & infile, MPI_Offset & curpos, MPI_Offset end_fpos, bool firstcall, std::vector<std::string> & lines, int myrank)
847 {
848  size_t bytes2fetch = ONEMILLION; // we might read more than needed but no problem as we won't process them
849  MPI_Status status;
850  int bytes_read;
851  if(firstcall && myrank != 0)
852  {
853  curpos -= 1; // first byte is to check whether we started at the beginning of a line
854  bytes2fetch += 1;
855  }
856  char * buf = new char[bytes2fetch]; // needs to happen **after** bytes2fetch is updated
857  char * originalbuf = buf; // so that we can delete it later because "buf" will move
858 
859  MPI_File_read_at(infile, curpos, buf, bytes2fetch, MPI_CHAR, &status);
860  MPI_Get_count(&status, MPI_CHAR, &bytes_read); // MPI_Get_Count can only return 32-bit integers
861  if(!bytes_read)
862  {
863  delete [] originalbuf;
864  return true; // done
865  }
866  SpParHelper::check_newline(&bytes_read, bytes2fetch, buf);
867  if(firstcall && myrank != 0)
868  {
869  if(buf[0] == '\n') // we got super lucky and hit the line break
870  {
871  buf += 1;
872  bytes_read -= 1;
873  curpos += 1;
874  }
875  else // skip to the next line and let the preceeding processor take care of this partial line
876  {
877  char *c = (char*)memchr(buf, '\n', MAXLINELENGTH); // return a pointer to the matching byte or NULL if the character does not occur
878  if (c == NULL) {
879  std::cout << "Unexpected line without a break" << std::endl;
880  }
881  int n = c - buf + 1;
882  bytes_read -= n;
883  buf += n;
884  curpos += n;
885  }
886  }
887  while(bytes_read > 0 && curpos < end_fpos) // this will also finish the last line
888  {
889  char *c = (char*)memchr(buf, '\n', bytes_read); // return a pointer to the matching byte or NULL if the character does not occur
890  if (c == NULL) {
891  delete [] originalbuf;
892  return false; // if bytes_read stops in the middle of a line, that line will be re-read next time since curpos has not been moved forward yet
893  }
894  int n = c - buf + 1;
895 
896  // string constructor from char * buffer: copies the first n characters from the array of characters pointed by s
897  lines.push_back(std::string(buf, n-1)); // no need to copy the newline character
898  bytes_read -= n; // reduce remaining bytes
899  buf += n; // move forward the buffer
900  curpos += n;
901  }
902  delete [] originalbuf;
903  if (curpos >= end_fpos) return true; // don't call it again, nothing left to read
904  else return false;
905 }
906 
907 
908 inline void SpParHelper::WaitNFree(std::vector<MPI_Win> & arrwin)
909 {
910  // End the exposure epochs for the arrays of the local matrices A and B
911  // The Wait() call matches calls to Complete() issued by ** EACH OF THE ORIGIN PROCESSES **
912  // that were granted access to the window during this epoch.
913  for(unsigned int i=0; i< arrwin.size(); ++i)
914  {
915  MPI_Win_wait(arrwin[i]);
916  }
917  FreeWindows(arrwin);
918 }
919 
920 inline void SpParHelper::FreeWindows(std::vector<MPI_Win> & arrwin)
921 {
922  for(unsigned int i=0; i< arrwin.size(); ++i)
923  {
924  MPI_Win_free(&arrwin[i]);
925  }
926 }
927 
928 }
static void LockNFetch(DER *&Matrix, int owner, std::vector< MPI_Win > &arrwin, MPI_Group &group, IT **sizes)
std::vector< IT > GetEssentials() const
Definition: SpMat.h:86
A set of parallel utilities.
static void AccessNFetch(DER *&Matrix, int owner, std::vector< MPI_Win > &arrwin, MPI_Group &group, IT **sizes)
static void StartAccessEpoch(int owner, std::vector< MPI_Win > &arrwin, MPI_Group &group)
std::vector< LocArr< NT, IT > > numarrs
Definition: LocArr.h:55
bool is_sorted(_RandomAccessIter first, _RandomAccessIter last, _Compare comp, MPI_Comm comm)
Definition: psort_util.h:58
void parallel_sort(_RandomAccessIter first, _RandomAccessIter last, _Compare comp, long *dist_in, SeqSort< _SeqSortType > &mysort, Split< _SplitType > &mysplit, Merge< _MergeType > &mymerge, MPI_Comm comm)
Definition: psort.h:38
static void GetSetSizes(const SpMat< IT, NT, DER > &Matrix, IT **&sizes, MPI_Comm &comm1d)
#define MAXVERTNAME
Definition: SpDefs.h:68
std::vector< LocArr< IT, IT > > indarrs
Definition: LocArr.h:54
T median(const T &a, const T &b, const T &c, Pred comp)
Definition: sort.timpl.h:49
void DeleteAll(A arr1)
Definition: Deleter.h:48
static void GatherMatrix(MPI_Comm &comm1d, SpMat< IT, NT, DER > &Matrix, int root)
static void BipartiteSwap(std::pair< KEY, VAL > *low, std::pair< KEY, VAL > *array, IT length, int nfirsthalf, int color, const MPI_Comm &comm)
static void WaitNFree(std::vector< MPI_Win > &arrwin)
static void FetchMatrix(SpMat< IT, NT, DER > &MRecv, const std::vector< IT > &essentials, std::vector< MPI_Win > &arrwin, int ownind)
static void MemoryEfficientPSort(std::pair< KEY, VAL > *array, IT length, IT *dist, const MPI_Comm &comm)
void Create(const std::vector< IT > &essentials)
Definition: SpMat.h:61
static void Print(const std::string &s)
IT totalsize()
Definition: LocArr.h:57
static void GlobalSelect(IT gl_rank, std::pair< KEY, VAL > *&low, std::pair< KEY, VAL > *&upp, std::pair< KEY, VAL > *array, IT length, const MPI_Comm &comm)
static void BCastMatrix(MPI_Comm &comm1d, SpMat< IT, NT, DER > &Matrix, const std::vector< IT > &essentials, int root)
static std::vector< std::pair< KEY, VAL > > KeyValuePSort(std::pair< KEY, VAL > *array, IT length, IT *dist, const MPI_Comm &comm)
static void check_newline(int *bytes_read, int bytes_requested, char *buf)
static void LockWindows(int ownind, std::vector< MPI_Win > &arrwin)
#define MAXLINELENGTH
Definition: SpDefs.h:61
static void ReDistributeToVector(int *&map_scnt, std::vector< std::vector< IT > > &locs_send, std::vector< std::vector< std::string > > &data_send, std::vector< std::array< char, MAXVERTNAME >> &distmapper_array, const MPI_Comm &comm)
Definition: SpParHelper.cpp:35
static void DebugPrintKeys(std::pair< KEY, VAL > *array, IT length, IT *dist, MPI_Comm &World)
static void UnlockWindows(int ownind, std::vector< MPI_Win > &arrwin)
static bool FetchBatch(MPI_File &infile, MPI_Offset &curpos, MPI_Offset end_fpos, bool firstcall, std::vector< std::string > &lines, int myrank)
Definition: CCGrid.h:4
static void PrintFile(const std::string &s, const std::string &filename)
static void PostExposureEpoch(int self, std::vector< MPI_Win > &arrwin, MPI_Group &group)
Arr< IT, NT > GetArrays() const
Definition: SpMat.h:82
#define ONEMILLION
Definition: SpDefs.h:60
A small helper class used while sorting a pair of value and its index in an array/vector.
Definition: indexHolder.h:20
static void FreeWindows(std::vector< MPI_Win > &arrwin)
int rank
static void SetWindows(MPI_Comm &comm1d, const SpMat< IT, NT, DER > &Matrix, std::vector< MPI_Win > &arrwin)
int sampleSort(std::vector< T > &in, std::vector< T > &out, MPI_Comm comm)
A parallel sample sort implementation. In our implementation, we do not pose any restriction on the i...