COMBINATORIAL_BLAS  1.6
FullyDistSpVec.cpp
Go to the documentation of this file.
1 /****************************************************************/
2 /* Parallel Combinatorial BLAS Library (for Graph Computations) */
3 /* version 1.6 -------------------------------------------------*/
4 /* date: 6/15/2017 ---------------------------------------------*/
5 /* authors: Ariful Azad, Aydin Buluc --------------------------*/
6 /****************************************************************/
7 /*
8  Copyright (c) 2010-2017, The Regents of the University of California
9 
10  Permission is hereby granted, free of charge, to any person obtaining a copy
11  of this software and associated documentation files (the "Software"), to deal
12  in the Software without restriction, including without limitation the rights
13  to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14  copies of the Software, and to permit persons to whom the Software is
15  furnished to do so, subject to the following conditions:
16 
17  The above copyright notice and this permission notice shall be included in
18  all copies or substantial portions of the Software.
19 
20  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21  IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22  FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23  AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24  LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25  OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26  THE SOFTWARE.
27  */
28 
29 
30 #include <limits>
31 #include "FullyDistSpVec.h"
32 #include "SpDefs.h"
33 #include "SpHelper.h"
34 #include "hash.hpp"
35 #include "FileHeader.h"
36 #include <sys/types.h>
37 #include <sys/stat.h>
38 
39 #ifdef GNU_PARALLEL
40 #include <parallel/algorithm>
41 #include <parallel/numeric>
42 #endif
43 
44 #include "usort/parUtils.h"
45 
46 namespace combblas {
47 
48 template <class IT, class NT>
49 FullyDistSpVec<IT, NT>::FullyDistSpVec ( std::shared_ptr<CommGrid> grid)
50 : FullyDist<IT,NT,typename combblas::disable_if< combblas::is_boolean<NT>::value, NT >::type>(grid)
51 { };
52 
53 template <class IT, class NT>
54 FullyDistSpVec<IT, NT>::FullyDistSpVec ( std::shared_ptr<CommGrid> grid, IT globallen)
55 : FullyDist<IT,NT,typename combblas::disable_if< combblas::is_boolean<NT>::value, NT >::type>(grid,globallen)
56 { };
57 
58 template <class IT, class NT>
60 : FullyDist<IT,NT,typename combblas::disable_if< combblas::is_boolean<NT>::value, NT >::type>()
61 { };
62 
63 template <class IT, class NT>
65 : FullyDist<IT,NT,typename combblas::disable_if< combblas::is_boolean<NT>::value, NT >::type>(globallen)
66 { }
67 
68 
69 template <class IT, class NT>
71 {
72  if(this != &rhs)
73  {
74  FullyDist<IT,NT,typename combblas::disable_if< combblas::is_boolean<NT>::value, NT >::type>::operator= (rhs); // to update glen and commGrid
75  ind = rhs.ind;
76  num = rhs.num;
77  }
78  return *this;
79 }
80 
81 template <class IT, class NT>
82 FullyDistSpVec<IT,NT>::FullyDistSpVec (const FullyDistVec<IT,NT> & rhs) // Conversion copy-constructor
83 : FullyDist<IT,NT,typename combblas::disable_if< combblas::is_boolean<NT>::value, NT >::type>(rhs.commGrid,rhs.glen)
84 {
85  *this = rhs;
86 }
87 
88 // Conversion copy-constructor where unary op is true
89 template <class IT, class NT>
90 template <typename _UnaryOperation>
92 : FullyDist<IT,NT,typename combblas::disable_if< combblas::is_boolean<NT>::value, NT >::type>(rhs.commGrid,rhs.glen)
93 {
94  //FullyDist<IT,NT,typename combblas::disable_if< combblas::is_boolean<NT>::value, NT >::type>::operator= (rhs); // to update glen and commGrid
95 
96  std::vector<IT>().swap(ind);
97  std::vector<NT>().swap(num);
98  IT vecsize = rhs.LocArrSize();
99  for(IT i=0; i< vecsize; ++i)
100  {
101  if(unop(rhs.arr[i]))
102  {
103  ind.push_back(i);
104  num.push_back(rhs.arr[i]);
105  }
106  }
107 }
108 
109 
110 
111 // create a sparse vector from local vectors
112 template <class IT, class NT>
113 FullyDistSpVec<IT,NT>::FullyDistSpVec (std::shared_ptr<CommGrid> grid, IT globallen, const std::vector<IT>& indvec, const std::vector<NT> & numvec, bool SumDuplicates, bool sorted)
114 : FullyDist<IT,NT,typename combblas::disable_if< combblas::is_boolean<NT>::value, NT >::type>(grid, globallen)
115 {
116 
117  assert(indvec.size()==numvec.size());
118  IT vecsize = indvec.size();
119  if(!sorted)
120  {
121  std::vector< std::pair<IT,NT> > tosort(vecsize);
122 #ifdef _OPENMP
123 #pragma omp parallel for
124 #endif
125  for(IT i=0; i<vecsize; ++i)
126  {
127  tosort[i] = std::make_pair(indvec[i], numvec[i]);
128  }
129 
130 #if defined(GNU_PARALLEL) && defined(_OPENMP)
131  __gnu_parallel::sort(tosort.begin(), tosort.end());
132 #else
133  std::sort(tosort.begin(), tosort.end());
134 #endif
135 
136 
137  ind.reserve(vecsize);
138  num.reserve(vecsize);
139  IT lastIndex=-1;
140  for(auto itr = tosort.begin(); itr != tosort.end(); ++itr)
141  {
142  if(lastIndex!=itr->first) //if SumDuplicates=false, keep only the first one
143  {
144  ind.push_back(itr->first);
145  num.push_back(itr->second);
146  lastIndex = itr->first;
147  }
148  else if(SumDuplicates)
149  {
150  num.back() += itr->second;
151  }
152  }
153 
154  }
155  else
156  {
157 
158  ind.reserve(vecsize);
159  num.reserve(vecsize);
160  IT lastIndex=-1;
161 
162  for(IT i=0; i< vecsize; ++i)
163  {
164  if(lastIndex!=indvec[i]) //if SumDuplicates=false, keep only the first one
165  {
166  ind.push_back(indvec[i]);
167  num.push_back(numvec[i]);
168  lastIndex = indvec[i];
169  }
170  else if(SumDuplicates)
171  {
172  num.back() += numvec[i];
173  }
174  }
175  }
176 }
177 
178 
179 
180 // ABAB: This function probably operates differently than a user would immediately expect
181 // ABAB: Write a well-posed description for it
182 template <class IT, class NT>
184 {
185  FullyDist<IT,NT,typename combblas::disable_if< combblas::is_boolean<NT>::value, NT >::type>::operator= (rhs); // to update glen and commGrid
186 
187  std::vector<IT>().swap(ind);
188  std::vector<NT>().swap(num);
189  IT vecsize = rhs.LocArrSize();
190  for(IT i=0; i< vecsize; ++i)
191  {
192  // rhs.zero does not exist after CombBLAS 1.2
193  ind.push_back(i);
194  num.push_back(rhs.arr[i]);
195  }
196  return *this;
197 }
198 
199 
200 /************************************************************************
201  * Create a sparse vector from index and value vectors (dense vectors)
202  * FullyDistSpVec v(globalsize, inds, vals):
203  * nnz(v) = size(inds) = size(vals)
204  * size(v) = globallen
205  * if inds has duplicate entries and SumDuplicates is true then we sum
206  * the values of duplicate indices. Otherwise, only the first entry is kept.
207  ************************************************************************/
208 template <class IT, class NT>
209 FullyDistSpVec<IT,NT>::FullyDistSpVec (IT globallen, const FullyDistVec<IT,IT> & inds, const FullyDistVec<IT,NT> & vals, bool SumDuplicates)
210 : FullyDist<IT,NT,typename combblas::disable_if< combblas::is_boolean<NT>::value, NT >::type>(inds.commGrid,globallen)
211 {
212  if(*(inds.commGrid) != *(vals.commGrid))
213  {
214  SpParHelper::Print("Grids are not comparable, FullyDistSpVec() fails !");
215  MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
216  }
217  if(inds.TotalLength() != vals.TotalLength())
218  {
219  SpParHelper::Print("Index and value vectors have different sizes, FullyDistSpVec() fails !");
220  MPI_Abort(MPI_COMM_WORLD, DIMMISMATCH);
221  }
222  //commGrid = inds.commGrid;
223  //glen = globallen;
224 
225  IT maxind = inds.Reduce(maximum<IT>(), (IT) 0);
226  if(maxind>=globallen)
227  {
228  SpParHelper::Print("At least one index is greater than globallen, FullyDistSpVec() fails !");
229  MPI_Abort(MPI_COMM_WORLD, DIMMISMATCH);
230  }
231 
232 
233 
234  MPI_Comm World = commGrid->GetWorld();
235  int nprocs = commGrid->GetSize();
236  int * rdispls = new int[nprocs];
237  int * recvcnt = new int[nprocs];
238  int * sendcnt = new int[nprocs](); // initialize to 0
239  int * sdispls = new int[nprocs];
240 
241  // ----- share count --------
242  IT locsize = inds.LocArrSize();
243  for(IT i=0; i<locsize; ++i)
244  {
245  IT locind;
246  int owner = Owner(inds.arr[i], locind);
247  sendcnt[owner]++;
248  }
249  MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, World);
250 
251 
252  // ----- compute send and receive displacements --------
253 
254  sdispls[0] = 0;
255  rdispls[0] = 0;
256  for(int i=0; i<nprocs-1; ++i)
257  {
258  sdispls[i+1] = sdispls[i] + sendcnt[i];
259  rdispls[i+1] = rdispls[i] + recvcnt[i];
260  }
261 
262 
263  // ----- prepare data to be sent --------
264 
265  NT * datbuf = new NT[locsize];
266  IT * indbuf = new IT[locsize];
267  int *count = new int[nprocs](); //current position
268  for(IT i=0; i < locsize; ++i)
269  {
270  IT locind;
271  int owner = Owner(inds.arr[i], locind);
272  int id = sdispls[owner] + count[owner];
273  datbuf[id] = vals.arr[i];
274  indbuf[id] = locind;
275  count[owner]++;
276  }
277  delete [] count;
278  IT totrecv = std::accumulate(recvcnt,recvcnt+nprocs, static_cast<IT>(0));
279 
280 
281  // ----- Send and receive indices and values --------
282 
283  NT * recvdatbuf = new NT[totrecv];
284  MPI_Alltoallv(datbuf, sendcnt, sdispls, MPIType<NT>(), recvdatbuf, recvcnt, rdispls, MPIType<NT>(), World);
285  delete [] datbuf;
286 
287  IT * recvindbuf = new IT[totrecv];
288  MPI_Alltoallv(indbuf, sendcnt, sdispls, MPIType<IT>(), recvindbuf, recvcnt, rdispls, MPIType<IT>(), World);
289  delete [] indbuf;
290 
291 
292  // ------ merge and sort received data ----------
293 
294  std::vector< std::pair<IT,NT> > tosort;
295  tosort.resize(totrecv);
296  for(int i=0; i<totrecv; ++i)
297  {
298  tosort[i] = std::make_pair(recvindbuf[i], recvdatbuf[i]);
299  }
300  DeleteAll(recvindbuf, recvdatbuf);
301  DeleteAll(sdispls, rdispls, sendcnt, recvcnt);
302  std::sort(tosort.begin(), tosort.end());
303 
304 
305  // ------ create local sparse vector ----------
306 
307  ind.reserve(totrecv);
308  num.reserve(totrecv);
309  IT lastIndex=-1;
310  for(auto itr = tosort.begin(); itr != tosort.end(); ++itr)
311  {
312  if(lastIndex!=itr->first) //if SumDuplicates=false, keep only the first one
313  {
314  ind.push_back(itr->first);
315  num.push_back(itr->second);
316  lastIndex = itr->first;
317  }
318  else if(SumDuplicates)
319  {
320  num.back() += itr->second;
321  }
322  }
323 }
324 
325 
328 template <class IT, class NT>
329 template <typename _Predicate>
331 {
332  FullyDistVec<IT,NT> found(commGrid);
333  MPI_Comm World = commGrid->GetWorld();
334  int nprocs = commGrid->GetSize();
335  int rank = commGrid->GetRank();
336 
337  IT sizelocal = getlocnnz();
338  for(IT i=0; i<sizelocal; ++i)
339  {
340  if(pred(num[i]))
341  {
342  found.arr.push_back(num[i]);
343  }
344  }
345  IT * dist = new IT[nprocs];
346  IT nsize = found.arr.size();
347  dist[rank] = nsize;
348  MPI_Allgather(MPI_IN_PLACE, 1, MPIType<IT>(), dist, 1, MPIType<IT>(), World);
349  IT lengthuntil = std::accumulate(dist, dist+rank, static_cast<IT>(0));
350  found.glen = std::accumulate(dist, dist+nprocs, static_cast<IT>(0));
351 
352  // Although the found vector is not reshuffled yet, its glen and commGrid are set
353  // We can call the Owner/MyLocLength/LengthUntil functions (to infer future distribution)
354 
355  // rebalance/redistribute
356  int * sendcnt = new int[nprocs];
357  std::fill(sendcnt, sendcnt+nprocs, 0);
358  for(IT i=0; i<nsize; ++i)
359  {
360  IT locind;
361  int owner = found.Owner(i+lengthuntil, locind);
362  ++sendcnt[owner];
363  }
364  int * recvcnt = new int[nprocs];
365  MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, World); // share the counts
366 
367  int * sdispls = new int[nprocs];
368  int * rdispls = new int[nprocs];
369  sdispls[0] = 0;
370  rdispls[0] = 0;
371  for(int i=0; i<nprocs-1; ++i)
372  {
373  sdispls[i+1] = sdispls[i] + sendcnt[i];
374  rdispls[i+1] = rdispls[i] + recvcnt[i];
375  }
376  IT totrecv = std::accumulate(recvcnt,recvcnt+nprocs, static_cast<IT>(0));
377  std::vector<NT> recvbuf(totrecv);
378 
379  // data is already in the right order in found.arr
380  MPI_Alltoallv(found.arr.data(), sendcnt, sdispls, MPIType<NT>(), recvbuf.data(), recvcnt, rdispls, MPIType<NT>(), World);
381  found.arr.swap(recvbuf);
382  delete [] dist;
383  DeleteAll(sendcnt, recvcnt, sdispls, rdispls);
384 
385  return found;
386 }
387 
388 
389 
392 template <class IT, class NT>
393 template <typename _Predicate>
395 {
396  FullyDistVec<IT,IT> found(commGrid);
397  MPI_Comm World = commGrid->GetWorld();
398  int nprocs = commGrid->GetSize();
399  int rank = commGrid->GetRank();
400 
401  IT sizelocal = getlocnnz();
402  IT sizesofar = LengthUntil();
403  for(IT i=0; i<sizelocal; ++i)
404  {
405  if(pred(num[i]))
406  {
407  found.arr.push_back(ind[i]+sizesofar);
408  }
409  }
410  IT * dist = new IT[nprocs];
411  IT nsize = found.arr.size();
412  dist[rank] = nsize;
413  MPI_Allgather(MPI_IN_PLACE, 1, MPIType<IT>(), dist, 1, MPIType<IT>(), World);
414  IT lengthuntil = std::accumulate(dist, dist+rank, static_cast<IT>(0));
415  found.glen = std::accumulate(dist, dist+nprocs, static_cast<IT>(0));
416 
417  // Although the found vector is not reshuffled yet, its glen and commGrid are set
418  // We can call the Owner/MyLocLength/LengthUntil functions (to infer future distribution)
419 
420  // rebalance/redistribute
421  int * sendcnt = new int[nprocs];
422  std::fill(sendcnt, sendcnt+nprocs, 0);
423  for(IT i=0; i<nsize; ++i)
424  {
425  IT locind;
426  int owner = found.Owner(i+lengthuntil, locind);
427  ++sendcnt[owner];
428  }
429  int * recvcnt = new int[nprocs];
430  MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, World); // share the counts
431 
432  int * sdispls = new int[nprocs];
433  int * rdispls = new int[nprocs];
434  sdispls[0] = 0;
435  rdispls[0] = 0;
436  for(int i=0; i<nprocs-1; ++i)
437  {
438  sdispls[i+1] = sdispls[i] + sendcnt[i];
439  rdispls[i+1] = rdispls[i] + recvcnt[i];
440  }
441  IT totrecv = std::accumulate(recvcnt,recvcnt+nprocs, static_cast<IT>(0));
442  std::vector<IT> recvbuf(totrecv);
443 
444  // data is already in the right order in found.arr
445  MPI_Alltoallv(found.arr.data(), sendcnt, sdispls, MPIType<IT>(), recvbuf.data(), recvcnt, rdispls, MPIType<IT>(), World);
446  found.arr.swap(recvbuf);
447  delete [] dist;
448  DeleteAll(sendcnt, recvcnt, sdispls, rdispls);
449 
450  return found;
451 }
452 
453 
454 
455 template <class IT, class NT>
457 {
458  FullyDist<IT,NT,typename combblas::disable_if< combblas::is_boolean<NT>::value, NT >::type>::operator= (victim); // to update glen and commGrid
459  ind.swap(victim.ind);
460  num.swap(victim.num);
461 }
462 
463 template <class IT, class NT>
465 {
466  NT val;
467  IT locind;
468  int owner = Owner(indx, locind);
469  int found = 0;
470  if(commGrid->GetRank() == owner)
471  {
472  typename std::vector<IT>::const_iterator it = std::lower_bound(ind.begin(), ind.end(), locind); // ind is a sorted vector
473  if(it != ind.end() && locind == (*it)) // found
474  {
475  val = num[it-ind.begin()];
476  found = 1;
477  }
478  else
479  {
480  val = NT(); // return NULL
481  found = 0;
482  }
483  }
484  MPI_Bcast(&found, 1, MPI_INT, owner, commGrid->GetWorld());
485  MPI_Bcast(&val, 1, MPIType<NT>(), owner, commGrid->GetWorld());
486  wasFound = found;
487  return val;
488 }
489 
490 template <class IT, class NT>
492 {
493  NT val = NT();
494  IT locind;
495  int owner = Owner(indx, locind);
496  int found = 0;
497  typename std::vector<IT>::const_iterator it = std::lower_bound(ind.begin(), ind.end(), locind); // ind is a sorted vector
498  if(commGrid->GetRank() == owner) {
499  if(it != ind.end() && locind == (*it)) // found
500  {
501  val = num[it-ind.begin()];
502  found = 1;
503  }
504  }
505  wasFound = found;
506  return val;
507 }
508 
510 template <class IT, class NT>
511 void FullyDistSpVec<IT,NT>::SetElement (IT indx, NT numx)
512 {
513  if(glen == 0)
514  SpParHelper::Print("WARNING: SetElement() called on a vector with zero length\n");
515 
516  IT locind;
517  int owner = Owner(indx, locind);
518  if(commGrid->GetRank() == owner)
519  {
520  typename std::vector<IT>::iterator iter = std::lower_bound(ind.begin(), ind.end(), locind);
521  if(iter == ind.end()) // beyond limits, insert from back
522  {
523  ind.push_back(locind);
524  num.push_back(numx);
525  }
526  else if (locind < *iter) // not found, insert in the middle
527  {
528  // the order of insertions is crucial
529  // if we first insert to ind, then ind.begin() is invalidated !
530  num.insert(num.begin() + (iter-ind.begin()), numx);
531  ind.insert(iter, locind);
532  }
533  else // found
534  {
535  *(num.begin() + (iter-ind.begin())) = numx;
536  }
537  }
538 }
539 
540 template <class IT, class NT>
542 {
543  IT locind;
544  int owner = Owner(indx, locind);
545  if(commGrid->GetRank() == owner)
546  {
547  typename std::vector<IT>::iterator iter = std::lower_bound(ind.begin(), ind.end(), locind);
548  if(iter != ind.end() && !(locind < *iter))
549  {
550  num.erase(num.begin() + (iter-ind.begin()));
551  ind.erase(iter);
552  }
553  }
554 }
555 
564 template <class IT, class NT>
566 {
567  MPI_Comm World = commGrid->GetWorld();
568  FullyDistVec<IT,NT> Indexed(ri.commGrid, ri.glen, NT()); // NT() is the initial value
569  int nprocs;
570  MPI_Comm_size(World, &nprocs);
571  std::unordered_map<IT, IT> revr_map; // inverted index that maps indices of *this to indices of output
572  std::vector< std::vector<IT> > data_req(nprocs);
573  IT locnnz = ri.LocArrSize();
574 
575  // ABAB: Input sanity check
576  int local = 1;
577  int whole = 1;
578  for(IT i=0; i < locnnz; ++i)
579  {
580  if(ri.arr[i] >= glen || ri.arr[i] < 0)
581  {
582  local = 0;
583  }
584  }
585  MPI_Allreduce( &local, &whole, 1, MPI_INT, MPI_BAND, World);
586  if(whole == 0)
587  {
588  throw outofrangeexception();
589  }
590 
591  for(IT i=0; i < locnnz; ++i)
592  {
593  IT locind;
594  int owner = Owner(ri.arr[i], locind); // numerical values in ri are 0-based
595  data_req[owner].push_back(locind);
596  revr_map.insert(typename std::unordered_map<IT, IT>::value_type(locind, i));
597  }
598  IT * sendbuf = new IT[locnnz];
599  int * sendcnt = new int[nprocs];
600  int * sdispls = new int[nprocs];
601  for(int i=0; i<nprocs; ++i)
602  sendcnt[i] = data_req[i].size();
603 
604  int * rdispls = new int[nprocs];
605  int * recvcnt = new int[nprocs];
606  MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, World); // share the request counts
607 
608  sdispls[0] = 0;
609  rdispls[0] = 0;
610  for(int i=0; i<nprocs-1; ++i)
611  {
612  sdispls[i+1] = sdispls[i] + sendcnt[i];
613  rdispls[i+1] = rdispls[i] + recvcnt[i];
614  }
615  IT totrecv = std::accumulate(recvcnt,recvcnt+nprocs,0);
616  IT * recvbuf = new IT[totrecv];
617 
618  for(int i=0; i<nprocs; ++i)
619  {
620  std::copy(data_req[i].begin(), data_req[i].end(), sendbuf+sdispls[i]);
621  std::vector<IT>().swap(data_req[i]);
622  }
623  MPI_Alltoallv(sendbuf, sendcnt, sdispls, MPIType<IT>(), recvbuf, recvcnt, rdispls, MPIType<IT>(), World); // request data
624 
625  // We will return the requested data,
626  // our return can be at most as big as the request
627  // and smaller if we are missing some elements
628  IT * indsback = new IT[totrecv];
629  NT * databack = new NT[totrecv];
630 
631  int * ddispls = new int[nprocs];
632  std::copy(rdispls, rdispls+nprocs, ddispls);
633  for(int i=0; i<nprocs; ++i)
634  {
635  // this is not the most efficient method because it scans ind vector nprocs = sqrt(p) times
636  IT * it = std::set_intersection(recvbuf+rdispls[i], recvbuf+rdispls[i]+recvcnt[i], ind.begin(), ind.end(), indsback+rdispls[i]);
637  recvcnt[i] = (it - (indsback+rdispls[i])); // update with size of the intersection
638 
639  IT vi = 0;
640  for(int j = rdispls[i]; j < rdispls[i] + recvcnt[i]; ++j) // fetch the numerical values
641  {
642  // indsback is a subset of ind
643  while(indsback[j] > ind[vi])
644  ++vi;
645  databack[j] = num[vi++];
646  }
647  }
648 
649  DeleteAll(recvbuf, ddispls);
650  NT * databuf = new NT[ri.LocArrSize()];
651 
652  MPI_Alltoall(recvcnt, 1, MPI_INT, sendcnt, 1, MPI_INT, World); // share the response counts, overriding request counts
653  MPI_Alltoallv(indsback, recvcnt, rdispls, MPIType<IT>(), sendbuf, sendcnt, sdispls, MPIType<IT>(), World); // send indices
654  MPI_Alltoallv(databack, recvcnt, rdispls, MPIType<NT>(), databuf, sendcnt, sdispls, MPIType<NT>(), World); // send data
655  DeleteAll(rdispls, recvcnt, indsback, databack);
656 
657  // Now create the output from databuf (holds numerical values) and sendbuf (holds indices)
658  // arr is already resized during its construction
659  for(int i=0; i<nprocs; ++i)
660  {
661  // data will come globally sorted from processors
662  // i.e. ind owned by proc_i is always smaller than
663  // ind owned by proc_j for j < i
664  for(int j=sdispls[i]; j< sdispls[i]+sendcnt[i]; ++j)
665  {
666  typename std::unordered_map<IT,IT>::iterator it = revr_map.find(sendbuf[j]);
667  Indexed.arr[it->second] = databuf[j];
668  // cout << it->second << "(" << sendbuf[j] << "):" << databuf[j] << endl;
669  }
670  }
671  DeleteAll(sdispls, sendcnt, sendbuf, databuf);
672  return Indexed;
673 }
674 
675 template <class IT, class NT>
676 void FullyDistSpVec<IT,NT>::iota(IT globalsize, NT first)
677 {
678  glen = globalsize;
679  IT length = MyLocLength();
680  ind.resize(length);
681  num.resize(length);
682  SpHelper::iota(ind.begin(), ind.end(), 0); // offset'd within processors
683  SpHelper::iota(num.begin(), num.end(), LengthUntil() + first); // global across processors
684 }
685 
686 
688 template <class IT, class NT>
690 {
691  std::iota(num.begin(), num.end(), NnzUntil() + first); // global across processors
692 }
693 
695 template <class IT, class NT>
697 {
698  IT mynnz = ind.size();
699  IT prevnnz = 0;
700  MPI_Scan(&mynnz, &prevnnz, 1, MPIType<IT>(), MPI_SUM, commGrid->GetWorld());
701  return (prevnnz - mynnz);
702 }
703 
704 
705 
706 /* old version
707 // - sorts the entries with respect to nonzero values
708 // - ignores structural zeros
709 // - keeps the sparsity structure intact
710 // - returns a permutation representing the mapping from old to new locations
711 template <class IT, class NT>
712 FullyDistSpVec<IT, IT> FullyDistSpVec<IT, NT>::sort()
713 {
714  MPI_Comm World = commGrid->GetWorld();
715  FullyDistSpVec<IT,IT> temp(commGrid);
716  if(getnnz()==0) return temp;
717  IT nnz = getlocnnz();
718  pair<NT,IT> * vecpair = new pair<NT,IT>[nnz];
719 
720 
721 
722  int nprocs, rank;
723  MPI_Comm_size(World, &nprocs);
724  MPI_Comm_rank(World, &rank);
725 
726  IT * dist = new IT[nprocs];
727  dist[rank] = nnz;
728  MPI_Allgather(MPI_IN_PLACE, 1, MPIType<IT>(), dist, 1, MPIType<IT>(), World);
729  IT until = LengthUntil();
730  for(IT i=0; i< nnz; ++i)
731  {
732  vecpair[i].first = num[i]; // we'll sort wrt numerical values
733  vecpair[i].second = ind[i] + until;
734 
735  }
736  SpParHelper::MemoryEfficientPSort(vecpair, nnz, dist, World);
737 
738  vector< IT > nind(nnz);
739  vector< IT > nnum(nnz);
740 
741  for(IT i=0; i< nnz; ++i)
742  {
743  num[i] = vecpair[i].first; // sorted range (change the object itself)
744  nind[i] = ind[i]; // make sure the sparsity distribution is the same
745  nnum[i] = vecpair[i].second; // inverse permutation stored as numerical values
746 
747  }
748  delete [] vecpair;
749  delete [] dist;
750 
751  temp.glen = glen;
752  temp.ind = nind;
753  temp.num = nnum;
754  return temp;
755 }
756 */
757 
758 
759 /*
760  TODO: This function is just a hack at this moment.
761  The indices of the return vector is not correct.
762  FIX this
763  */
764 // - sorts the entries with respect to nonzero values
765 // - ignores structural zeros
766 // - keeps the sparsity structure intact
767 // - returns a permutation representing the mapping from old to new locations
768 template <class IT, class NT>
770 {
771  MPI_Comm World = commGrid->GetWorld();
772  FullyDistSpVec<IT,IT> temp(commGrid);
773  if(getnnz()==0) return temp;
774  IT nnz = getlocnnz();
775  std::pair<NT,IT> * vecpair = new std::pair<NT,IT>[nnz];
776 
777 
778 
779  int nprocs, rank;
780  MPI_Comm_size(World, &nprocs);
781  MPI_Comm_rank(World, &rank);
782 
783  IT * dist = new IT[nprocs];
784  dist[rank] = nnz;
785  MPI_Allgather(MPI_IN_PLACE, 1, MPIType<IT>(), dist, 1, MPIType<IT>(), World);
786  IT until = LengthUntil();
787 #ifdef THREADED
788 #pragma omp parallel for
789 #endif
790  for(IT i=0; i< nnz; ++i)
791  {
792  vecpair[i].first = num[i]; // we'll sort wrt numerical values
793  vecpair[i].second = ind[i] + until;
794 
795  }
796  std::vector<std::pair<NT,IT>> sorted = SpParHelper::KeyValuePSort(vecpair, nnz, dist, World);
797 
798  nnz = sorted.size();
799  temp.num.resize(nnz);
800  temp.ind.resize(nnz);
801 
802 #ifdef THREADED
803 #pragma omp parallel for
804 #endif
805  for(IT i=0; i< nnz; ++i)
806  {
807  //num[i] = sorted[i].first; // sorted range (change the object itself)
808  //nind[i] = ind[i]; // make sure the sparsity distribution is the same
809  temp.num[i] = sorted[i].second; // inverse permutation stored as numerical values
810  temp.ind[i] = i; // we are not using this information at this moment
811  }
812 
813  delete [] vecpair;
814  delete [] dist;
815 
816  temp.glen = glen;
817  return temp;
818 }
819 
820 
821 
822 /*
823 // - sorts the entries with respect to nonzero values
824 // - ignores structural zeros
825 // - keeps the sparsity structure intact
826 // - returns a permutation representing the mapping from old to new locations
827 template <class IT, class NT>
828 FullyDistSpVec<IT, IT> FullyDistSpVec<IT, NT>::sort()
829 {
830  MPI_Comm World = commGrid->GetWorld();
831  FullyDistSpVec<IT,IT> temp(commGrid);
832  if(getnnz()==0) return temp;
833  IT nnz = getlocnnz();
834  //pair<NT,IT> * vecpair = new pair<NT,IT>[nnz];
835  vector<IndexHolder<NT>> in(nnz);
836  //vector<IndexHolder<NT>> out;
837 
838 
839 
840  int nprocs, rank;
841  MPI_Comm_size(World, &nprocs);
842  MPI_Comm_rank(World, &rank);
843 
844  //IT * dist = new IT[nprocs];
845  //dist[rank] = nnz;
846  //MPI_Allgather(MPI_IN_PLACE, 1, MPIType<IT>(), dist, 1, MPIType<IT>(), World);
847  IT until = LengthUntil();
848  for(IT i=0; i< nnz; ++i)
849  {
850  //vecpair[i].first = num[i]; // we'll sort wrt numerical values
851  //vecpair[i].second = ind[i] + until;
852 
853  in[i] = IndexHolder<NT>(num[i], static_cast<unsigned long>(ind[i] + until));
854  }
855  //SpParHelper::MemoryEfficientPSort(vecpair, nnz, dist, World);
856 
857  //MPI_Barrier(World);
858  //cout << "before sorting " << in.size() << endl;
859  par::sampleSort(in, World);
860  //MPI_Barrier(World);
861  //cout << "after sorting " << in.size() << endl;
862  //MPI_Barrier(World);
863 
864  //vector< IT > nind(out.size());
865  //vector< IT > nnum(out.size());
866 
867  temp.ind.resize(in.size());
868  temp.num.resize(in.size());
869  for(IT i=0; i< in.size(); ++i)
870  {
871  //num[i] = vecpair[i].first; // sorted range (change the object itself)
872  //nind[i] = ind[i]; // make sure the sparsity distribution is the same
873  //nnum[i] = vecpair[i].second; // inverse permutation stored as numerical values
874 
875  //num[i] = out[i].value; // sorted range (change the object itself)
876  //nind[i] = ind[i]; // make sure the sparsity distribution is the same
877  //nnum[i] = static_cast<IT>(out[i].index); // inverse permutation stored as numerical values
878  temp.num[i] = static_cast<IT>(in[i].index); // inverse permutation stored as numerical values
879  //cout << temp.num[i] << " " ;
880  }
881 
882  temp.glen = glen;
883  return temp;
884 }
885  */
886 
887 
888 template <class IT, class NT>
889 template <typename _BinaryOperation >
890 FullyDistSpVec<IT,NT> FullyDistSpVec<IT, NT>::UniqAll2All(_BinaryOperation __binary_op, MPI_Op mympiop)
891 {
892  MPI_Comm World = commGrid->GetWorld();
893  int nprocs = commGrid->GetSize();
894 
895  std::vector< std::vector< NT > > datsent(nprocs);
896  std::vector< std::vector< IT > > indsent(nprocs);
897 
898  IT locind;
899  size_t locvec = num.size(); // nnz in local vector
900  IT lenuntil = LengthUntil(); // to convert to global index
901  for(size_t i=0; i< locvec; ++i)
902  {
903  uint64_t myhash; // output of MurmurHash3_x64_64 is 64-bits regardless of the input length
904  MurmurHash3_x64_64((const void*) &(num[i]),sizeof(NT), 0, &myhash);
905  double range = static_cast<double>(myhash) * static_cast<double>(glen);
906  NT mapped = range / static_cast<double>(std::numeric_limits<uint64_t>::max()); // mapped is in range [0,n)
907  int owner = Owner(mapped, locind);
908 
909  datsent[owner].push_back(num[i]); // all identical entries will be hashed to the same value -> same processor
910  indsent[owner].push_back(ind[i]+lenuntil);
911  }
912  int * sendcnt = new int[nprocs];
913  int * sdispls = new int[nprocs];
914  for(int i=0; i<nprocs; ++i)
915  sendcnt[i] = (int) datsent[i].size();
916 
917  int * rdispls = new int[nprocs];
918  int * recvcnt = new int[nprocs];
919  MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, World); // share the request counts
920  sdispls[0] = 0;
921  rdispls[0] = 0;
922  for(int i=0; i<nprocs-1; ++i)
923  {
924  sdispls[i+1] = sdispls[i] + sendcnt[i];
925  rdispls[i+1] = rdispls[i] + recvcnt[i];
926  }
927  NT * datbuf = new NT[locvec];
928  for(int i=0; i<nprocs; ++i)
929  {
930  std::copy(datsent[i].begin(), datsent[i].end(), datbuf+sdispls[i]);
931  std::vector<NT>().swap(datsent[i]);
932  }
933  IT * indbuf = new IT[locvec];
934  for(int i=0; i<nprocs; ++i)
935  {
936  std::copy(indsent[i].begin(), indsent[i].end(), indbuf+sdispls[i]);
937  std::vector<IT>().swap(indsent[i]);
938  }
939  IT totrecv = std::accumulate(recvcnt,recvcnt+nprocs, static_cast<IT>(0));
940  NT * recvdatbuf = new NT[totrecv];
941  MPI_Alltoallv(datbuf, sendcnt, sdispls, MPIType<NT>(), recvdatbuf, recvcnt, rdispls, MPIType<NT>(), World);
942  delete [] datbuf;
943 
944  IT * recvindbuf = new IT[totrecv];
945  MPI_Alltoallv(indbuf, sendcnt, sdispls, MPIType<IT>(), recvindbuf, recvcnt, rdispls, MPIType<IT>(), World);
946  delete [] indbuf;
947 
948  std::vector< std::pair<NT,IT> > tosort; // in fact, tomerge would be a better name but it is unlikely to be faster
949 
950  for(int i=0; i<nprocs; ++i)
951  {
952  for(int j = rdispls[i]; j < rdispls[i] + recvcnt[i]; ++j) // fetch the numerical values
953  {
954  tosort.push_back(std::make_pair(recvdatbuf[j], recvindbuf[j]));
955  }
956  }
957  DeleteAll(recvindbuf, recvdatbuf);
958  std::sort(tosort.begin(), tosort.end());
959  //std::unique returns an iterator to the element that follows the last element not removed.
960  typename std::vector< std::pair<NT,IT> >::iterator last;
961  last = std::unique (tosort.begin(), tosort.end(), equal_first<NT,IT>());
962 
963  std::vector< std::vector< NT > > datback(nprocs);
964  std::vector< std::vector< IT > > indback(nprocs);
965 
966  for(typename std::vector< std::pair<NT,IT> >::iterator itr = tosort.begin(); itr != last; ++itr)
967  {
968  IT locind;
969  int owner = Owner(itr->second, locind);
970 
971  datback[owner].push_back(itr->first);
972  indback[owner].push_back(locind);
973  }
974  for(int i=0; i<nprocs; ++i) sendcnt[i] = (int) datback[i].size();
975  MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, World); // share the request counts
976  for(int i=0; i<nprocs-1; ++i)
977  {
978  sdispls[i+1] = sdispls[i] + sendcnt[i];
979  rdispls[i+1] = rdispls[i] + recvcnt[i];
980  }
981  datbuf = new NT[tosort.size()];
982  for(int i=0; i<nprocs; ++i)
983  {
984  std::copy(datback[i].begin(), datback[i].end(), datbuf+sdispls[i]);
985  std::vector<NT>().swap(datback[i]);
986  }
987  indbuf = new IT[tosort.size()];
988  for(int i=0; i<nprocs; ++i)
989  {
990  std::copy(indback[i].begin(), indback[i].end(), indbuf+sdispls[i]);
991  std::vector<IT>().swap(indback[i]);
992  }
993  totrecv = std::accumulate(recvcnt,recvcnt+nprocs, static_cast<IT>(0)); // update value
994 
995  recvdatbuf = new NT[totrecv];
996  MPI_Alltoallv(datbuf, sendcnt, sdispls, MPIType<NT>(), recvdatbuf, recvcnt, rdispls, MPIType<NT>(), World);
997  delete [] datbuf;
998 
999  recvindbuf = new IT[totrecv];
1000  MPI_Alltoallv(indbuf, sendcnt, sdispls, MPIType<IT>(), recvindbuf, recvcnt, rdispls, MPIType<IT>(), World);
1001  delete [] indbuf;
1002 
1003  FullyDistSpVec<IT,NT> Indexed(commGrid, glen); // length(Indexed) = length(glen) = length(*this)
1004 
1005  std::vector< std::pair<IT,NT> > back2sort;
1006  for(int i=0; i<nprocs; ++i)
1007  {
1008  for(int j = rdispls[i]; j < rdispls[i] + recvcnt[i]; ++j) // fetch the numerical values
1009  {
1010  back2sort.push_back(std::make_pair(recvindbuf[j], recvdatbuf[j]));
1011  }
1012  }
1013  std::sort(back2sort.begin(), back2sort.end());
1014  for(typename std::vector< std::pair<IT,NT> >::iterator itr = back2sort.begin(); itr != back2sort.end(); ++itr)
1015  {
1016  Indexed.ind.push_back(itr->first);
1017  Indexed.num.push_back(itr->second);
1018  }
1019 
1020  DeleteAll(sdispls, rdispls, sendcnt, recvcnt);
1021  DeleteAll(recvindbuf, recvdatbuf);
1022  return Indexed;
1023 }
1024 
1025 
1026 // ABAB: \todo Concept control so it only gets called in integers
1027 template <class IT, class NT>
1028 template <typename _BinaryOperation >
1029 FullyDistSpVec<IT,NT> FullyDistSpVec<IT, NT>::Uniq(_BinaryOperation __binary_op, MPI_Op mympiop)
1030 {
1031  return UniqAll2All(__binary_op, mympiop);
1032 }
1033 
1034 template <class IT, class NT>
1036 {
1037  if(this != &rhs)
1038  {
1039  if(glen != rhs.glen)
1040  {
1041  std::cerr << "Vector dimensions don't match for addition\n";
1042  return *this;
1043  }
1044  IT lsize = getlocnnz();
1045  IT rsize = rhs.getlocnnz();
1046 
1047  std::vector< IT > nind;
1048  std::vector< NT > nnum;
1049  nind.reserve(lsize+rsize);
1050  nnum.reserve(lsize+rsize);
1051 
1052  IT i=0, j=0;
1053  while(i < lsize && j < rsize)
1054  {
1055  // assignment won't change the size of vector, push_back is necessary
1056  if(ind[i] > rhs.ind[j])
1057  {
1058  nind.push_back( rhs.ind[j] );
1059  nnum.push_back( rhs.num[j++] );
1060  }
1061  else if(ind[i] < rhs.ind[j])
1062  {
1063  nind.push_back( ind[i] );
1064  nnum.push_back( num[i++] );
1065  }
1066  else
1067  {
1068  nind.push_back( ind[i] );
1069  nnum.push_back( num[i++] + rhs.num[j++] );
1070  }
1071  }
1072  while( i < lsize) // rhs was depleted first
1073  {
1074  nind.push_back( ind[i] );
1075  nnum.push_back( num[i++] );
1076  }
1077  while( j < rsize) // *this was depleted first
1078  {
1079  nind.push_back( rhs.ind[j] );
1080  nnum.push_back( rhs.num[j++] );
1081  }
1082  ind.swap(nind); // ind will contain the elements of nind with capacity shrunk-to-fit size
1083  num.swap(nnum);
1084  }
1085  else
1086  {
1087  typename std::vector<NT>::iterator it;
1088  for(it = num.begin(); it != num.end(); ++it)
1089  (*it) *= 2;
1090  }
1091  return *this;
1092 };
1093 template <class IT, class NT>
1095 {
1096  if(this != &rhs)
1097  {
1098  if(glen != rhs.glen)
1099  {
1100  std::cerr << "Vector dimensions don't match for addition\n";
1101  return *this;
1102  }
1103  IT lsize = getlocnnz();
1104  IT rsize = rhs.getlocnnz();
1105  std::vector< IT > nind;
1106  std::vector< NT > nnum;
1107  nind.reserve(lsize+rsize);
1108  nnum.reserve(lsize+rsize);
1109 
1110  IT i=0, j=0;
1111  while(i < lsize && j < rsize)
1112  {
1113  // assignment won't change the size of vector, push_back is necessary
1114  if(ind[i] > rhs.ind[j])
1115  {
1116  nind.push_back( rhs.ind[j] );
1117  nnum.push_back( -static_cast<NT>(rhs.num[j++]) );
1118  }
1119  else if(ind[i] < rhs.ind[j])
1120  {
1121  nind.push_back( ind[i] );
1122  nnum.push_back( num[i++] );
1123  }
1124  else
1125  {
1126  nind.push_back( ind[i] );
1127  nnum.push_back( num[i++] - rhs.num[j++] ); // ignore numerical cancellations
1128  }
1129  }
1130  while( i < lsize) // rhs was depleted first
1131  {
1132  nind.push_back( ind[i] );
1133  nnum.push_back( num[i++] );
1134  }
1135  while( j < rsize) // *this was depleted first
1136  {
1137  nind.push_back( rhs.ind[j] );
1138  nnum.push_back( NT() - (rhs.num[j++]) );
1139  }
1140  ind.swap(nind); // ind will contain the elements of nind with capacity shrunk-to-fit size
1141  num.swap(nnum);
1142  }
1143  else
1144  {
1145  ind.clear();
1146  num.clear();
1147  }
1148  return *this;
1149 };
1150 
1151 
1152 template <class IT, class NT>
1153 template <typename _BinaryOperation>
1154 void FullyDistSpVec<IT,NT>::SparseCommon(std::vector< std::vector < std::pair<IT,NT> > > & data, _BinaryOperation BinOp)
1155 {
1156  int nprocs = commGrid->GetSize();
1157  int * sendcnt = new int[nprocs];
1158  int * recvcnt = new int[nprocs];
1159  for(int i=0; i<nprocs; ++i)
1160  sendcnt[i] = data[i].size(); // sizes are all the same
1161 
1162  MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, commGrid->GetWorld()); // share the counts
1163  int * sdispls = new int[nprocs]();
1164  int * rdispls = new int[nprocs]();
1165  std::partial_sum(sendcnt, sendcnt+nprocs-1, sdispls+1);
1166  std::partial_sum(recvcnt, recvcnt+nprocs-1, rdispls+1);
1167  IT totrecv = rdispls[nprocs-1]+recvcnt[nprocs-1];
1168  IT totsend = sdispls[nprocs-1]+sendcnt[nprocs-1];
1169 
1170 
1171  std::pair<IT,NT> * senddata = new std::pair<IT,NT>[totsend]; // re-used for both rows and columns
1172  for(int i=0; i<nprocs; ++i)
1173  {
1174  std::copy(data[i].begin(), data[i].end(), senddata+sdispls[i]);
1175  std::vector< std::pair<IT,NT> >().swap(data[i]); // clear memory
1176  }
1177  MPI_Datatype MPI_pair;
1178  MPI_Type_contiguous(sizeof(std::tuple<IT,NT>), MPI_CHAR, &MPI_pair);
1179  MPI_Type_commit(&MPI_pair);
1180 
1181  std::pair<IT,NT> * recvdata = new std::pair<IT,NT>[totrecv];
1182  MPI_Alltoallv(senddata, sendcnt, sdispls, MPI_pair, recvdata, recvcnt, rdispls, MPI_pair, commGrid->GetWorld());
1183 
1184  DeleteAll(senddata, sendcnt, recvcnt, sdispls, rdispls);
1185  MPI_Type_free(&MPI_pair);
1186 
1187  if(!is_sorted(recvdata, recvdata+totrecv))
1188  std::sort(recvdata, recvdata+totrecv);
1189 
1190  ind.push_back(recvdata[0].first);
1191  num.push_back(recvdata[0].second);
1192  for(IT i=1; i< totrecv; ++i)
1193  {
1194  if(ind.back() == recvdata[i].first)
1195  {
1196  num.back() = BinOp(num.back(), recvdata[i].second);
1197  }
1198  else
1199  {
1200  ind.push_back(recvdata[i].first);
1201  num.push_back(recvdata[i].second);
1202  }
1203  }
1204  delete [] recvdata;
1205 }
1206 
1207 template <class IT, class NT>
1208 template <typename _BinaryOperation>
1209 void FullyDistSpVec<IT,NT>::ParallelRead (const std::string & filename, bool onebased, _BinaryOperation BinOp)
1210 {
1211  int64_t gnnz; // global nonzeros (glen is already declared as part of this class's private data)
1212  int64_t linesread = 0;
1213 
1214  FILE *f;
1215  int myrank = commGrid->GetRank();
1216  int nprocs = commGrid->GetSize();
1217  if(myrank == 0)
1218  {
1219  if((f = fopen(filename.c_str(),"r"))==NULL)
1220  {
1221  std::cout << "File failed to open\n";
1222  MPI_Abort(commGrid->commWorld, NOFILE);
1223  }
1224  else
1225  {
1226  fscanf(f,"%lld %lld\n", &glen, &gnnz);
1227  }
1228  std::cout << "Total number of nonzeros expected across all processors is " << gnnz << std::endl;
1229 
1230  }
1231  MPI_Bcast(&glen, 1, MPIType<int64_t>(), 0, commGrid->commWorld);
1232  MPI_Bcast(&gnnz, 1, MPIType<int64_t>(), 0, commGrid->commWorld);
1233 
1234 
1235  struct stat st; // get file size
1236  if (stat(filename.c_str(), &st) == -1)
1237  {
1238  MPI_Abort(commGrid->commWorld, NOFILE);
1239  }
1240  int64_t file_size = st.st_size;
1241  MPI_Offset fpos, end_fpos;
1242  if(myrank == 0) // the offset needs to be for this rank
1243  {
1244  std::cout << "File is " << file_size << " bytes" << std::endl;
1245  fpos = ftell(f);
1246  fclose(f);
1247  }
1248  else
1249  {
1250  fpos = myrank * file_size / nprocs;
1251  }
1252  if(myrank != (nprocs-1)) end_fpos = (myrank + 1) * file_size / nprocs;
1253  else end_fpos = file_size;
1254  MPI_Barrier(commGrid->commWorld);
1255 
1256  MPI_File mpi_fh;
1257  MPI_File_open (commGrid->commWorld, const_cast<char*>(filename.c_str()), MPI_MODE_RDONLY, MPI_INFO_NULL, &mpi_fh);
1258 
1259  std::vector< std::vector < std::pair<IT,NT> > > data(nprocs); // data to send
1260 
1261  std::vector<std::string> lines;
1262 
1263  SpParHelper::Print("Fetching first piece\n");
1264 
1265  MPI_Barrier(commGrid->commWorld);
1266  bool finished = SpParHelper::FetchBatch(mpi_fh, fpos, end_fpos, true, lines, myrank);
1267  int64_t entriesread = lines.size();
1268  SpParHelper::Print("Fetched first piece\n");
1269 
1270  MPI_Barrier(commGrid->commWorld);
1271 
1272  IT ii;
1273  NT vv;
1274  for (auto itr=lines.begin(); itr != lines.end(); ++itr)
1275  {
1276  std::stringstream ss(*itr);
1277  ss >> ii >> vv;
1278  if(onebased) ii--;
1279  IT locind;
1280  int owner = Owner(ii, locind); // recipient (owner) processor
1281  data[owner].push_back(std::make_pair(locind,vv));
1282  }
1283 
1284  while(!finished)
1285  {
1286  finished = SpParHelper::FetchBatch(mpi_fh, fpos, end_fpos, false, lines, myrank);
1287  entriesread += lines.size();
1288  for (auto itr=lines.begin(); itr != lines.end(); ++itr)
1289  {
1290  std::stringstream ss(*itr);
1291  ss >> ii >> vv;
1292  if(onebased) ii--;
1293  IT locind;
1294  int owner = Owner(ii, locind); // recipient (owner) processor
1295  data[owner].push_back(std::make_pair(locind,vv));
1296  }
1297  }
1298  int64_t allentriesread;
1299  MPI_Reduce(&entriesread, &allentriesread, 1, MPIType<int64_t>(), MPI_SUM, 0, commGrid->commWorld);
1300 #ifdef COMBBLAS_DEBUG
1301  if(myrank == 0)
1302  std::cout << "Reading finished. Total number of entries read across all processors is " << allentriesread << std::endl;
1303 #endif
1304 
1305  SparseCommon(data, BinOp);
1306 }
1307 
1308 template <class IT, class NT>
1309 template <class HANDLER>
1310 void FullyDistSpVec<IT,NT>::ParallelWrite(const std::string & filename, bool onebased, HANDLER handler, bool includeindices, bool includeheader)
1311 {
1312  int myrank = commGrid->GetRank();
1313  int nprocs = commGrid->GetSize();
1314  IT totalLength = TotalLength();
1315  IT totalNNZ = getnnz();
1316 
1317  std::stringstream ss;
1318  if(includeheader && myrank == 0)
1319  {
1320  ss << totalLength << '\t' << totalNNZ << '\n'; // rank-0 has the header
1321  }
1322  IT entries = getlocnnz();
1323  IT sizeuntil = 0;
1324  MPI_Exscan( &entries, &sizeuntil, 1, MPIType<IT>(), MPI_SUM, commGrid->GetWorld() );
1325  if(myrank == 0) sizeuntil = 0; // because MPI_Exscan says the recvbuf in process 0 is undefined
1326 
1327  if(includeindices)
1328  {
1329  if(onebased) sizeuntil += 1; // increment by 1
1330 
1331  for(IT i=0; i< entries; ++i)
1332  {
1333  ss << ind[i]+sizeuntil << '\t';
1334  handler.save(ss, num[i], ind[i]+sizeuntil);
1335  ss << '\n';
1336  }
1337  }
1338  else // the base doesn't matter if we don't include indices
1339  {
1340  IT dummy = 0; // dummy because we don't want indices to be printed
1341  for(IT i=0; i< entries; ++i)
1342  {
1343  handler.save(ss, num[i], dummy);
1344  ss << '\n';
1345  }
1346  }
1347 
1348  std::string text = ss.str();
1349 
1350  int64_t * bytes = new int64_t[nprocs];
1351  bytes[myrank] = text.size();
1352  MPI_Allgather(MPI_IN_PLACE, 1, MPIType<int64_t>(), bytes, 1, MPIType<int64_t>(), commGrid->GetWorld());
1353  int64_t bytesuntil = std::accumulate(bytes, bytes+myrank, static_cast<int64_t>(0));
1354  int64_t bytestotal = std::accumulate(bytes, bytes+nprocs, static_cast<int64_t>(0));
1355 
1356  if(myrank == 0) // only leader rights the original file with no content
1357  {
1358  std::ofstream ofs(filename.c_str(), std::ios::binary | std::ios::out);
1359 #ifdef COMBBLAS_DEBUG
1360  std::cout << "Creating file with " << bytestotal << " bytes" << std::endl;
1361 #endif
1362  ofs.seekp(bytestotal - 1);
1363  ofs.write("", 1); // this will likely create a sparse file so the actual disks won't spin yet
1364  ofs.close();
1365  }
1366  MPI_Barrier(commGrid->GetWorld());
1367 
1368  struct stat st; // get file size
1369  if (stat(filename.c_str(), &st) == -1)
1370  {
1371  MPI_Abort(commGrid->GetWorld(), NOFILE);
1372  }
1373  if(myrank == nprocs-1) // let some other processor do the testing
1374  {
1375 #ifdef COMBBLAS_DEBUG
1376  std::cout << "File is actually " << st.st_size << " bytes seen from process " << myrank << std::endl;
1377 #endif
1378  }
1379 
1380  FILE *ffinal;
1381  if ((ffinal = fopen(filename.c_str(), "rb+")) == NULL) // then everyone fills it
1382  {
1383  printf("COMBBLAS: Vector output file %s failed to open at process %d\n", filename.c_str(), myrank);
1384  MPI_Abort(commGrid->GetWorld(), NOFILE);
1385  }
1386  fseek (ffinal , bytesuntil , SEEK_SET );
1387  fwrite(text.c_str(),1, bytes[myrank] ,ffinal);
1388  fflush(ffinal);
1389  fclose(ffinal);
1390  delete [] bytes;
1391 }
1392 
1395 template <class IT, class NT>
1396 template <class HANDLER>
1397 std::ifstream& FullyDistSpVec<IT,NT>::ReadDistribute (std::ifstream& infile, int master, HANDLER handler)
1398 {
1399  IT total_nnz;
1400  MPI_Comm World = commGrid->GetWorld();
1401  int neighs = commGrid->GetSize(); // number of neighbors (including oneself)
1402  int buffperneigh = MEMORYINBYTES / (neighs * (sizeof(IT) + sizeof(NT)));
1403 
1404  int * displs = new int[neighs];
1405  for (int i=0; i<neighs; ++i)
1406  displs[i] = i*buffperneigh;
1407 
1408  int * curptrs = NULL;
1409  int recvcount = 0;
1410  IT * inds = NULL;
1411  NT * vals = NULL;
1412  int rank = commGrid->GetRank();
1413  if(rank == master) // 1 processor only
1414  {
1415  inds = new IT [ buffperneigh * neighs ];
1416  vals = new NT [ buffperneigh * neighs ];
1417  curptrs = new int[neighs];
1418  std::fill_n(curptrs, neighs, 0); // fill with zero
1419  if (infile.is_open())
1420  {
1421  infile.clear();
1422  infile.seekg(0);
1423  IT numrows, numcols;
1424  bool indIsRow = true;
1425  infile >> numrows >> numcols >> total_nnz;
1426  if (numcols == 1)
1427  { // column vector, read vector indices from the row index
1428  indIsRow = true;
1429  glen = numrows;
1430  }
1431  else
1432  { // row vector, read vector indices from the column index
1433  indIsRow = false;
1434  glen = numcols;
1435  }
1436  MPI_Bcast(&glen, 1, MPIType<IT>(), master, World);
1437 
1438  IT tempind;
1439  IT temprow, tempcol;
1440  IT cnz = 0;
1441  while ( (!infile.eof()) && cnz < total_nnz)
1442  {
1443  infile >> temprow >> tempcol;
1444  if (indIsRow)
1445  tempind = temprow;
1446  else
1447  tempind = tempcol;
1448  tempind--;
1449  IT locind;
1450  int rec = Owner(tempind, locind); // recipient (owner) processor (ABAB: But if the length is not set yet, this should be wrong)
1451  inds[ rec * buffperneigh + curptrs[rec] ] = locind;
1452  vals[ rec * buffperneigh + curptrs[rec] ] = handler.read(infile, tempind);
1453  ++ (curptrs[rec]);
1454 
1455  if(curptrs[rec] == buffperneigh || (cnz == (total_nnz-1)) ) // one buffer is full, or file is done !
1456  {
1457  // first, send the receive counts ...
1458  MPI_Scatter(curptrs, 1, MPI_INT, &recvcount, 1, MPI_INT, master, World);
1459 
1460  // generate space for own recv data ... (use arrays because vector<bool> is cripled, if NT=bool)
1461  IT * tempinds = new IT[recvcount];
1462  NT * tempvals = new NT[recvcount];
1463 
1464  // then, send all buffers that to their recipients ...
1465  MPI_Scatterv(inds, curptrs, displs, MPIType<IT>(), tempinds, recvcount, MPIType<IT>(), master, World);
1466  MPI_Scatterv(vals, curptrs, displs, MPIType<NT>(), tempvals, recvcount, MPIType<NT>(), master, World);
1467 
1468  // now push what is ours to tuples
1469  for(IT i=0; i< recvcount; ++i)
1470  {
1471  ind.push_back( tempinds[i] ); // already offset'd by the sender
1472  num.push_back( tempvals[i] );
1473  }
1474 
1475  // reset current pointers so that we can reuse {inds,vals} buffers
1476  std::fill_n(curptrs, neighs, 0);
1477  DeleteAll(tempinds, tempvals);
1478  }
1479  ++ cnz;
1480  }
1481  assert (cnz == total_nnz);
1482 
1483  // Signal the end of file to other processors along the diagonal
1484  std::fill_n(curptrs, neighs, std::numeric_limits<int>::max());
1485  MPI_Scatter(curptrs, 1, MPI_INT, &recvcount, 1, MPI_INT, master, World);
1486  }
1487  else // input file does not exist !
1488  {
1489  glen = 0;
1490  MPI_Bcast(&glen, 1, MPIType<IT>(), master, World);
1491  }
1492  DeleteAll(inds,vals, curptrs);
1493  }
1494  else // all other processors
1495  {
1496  MPI_Bcast(&glen, 1, MPIType<IT>(), master, World);
1497  while(glen > 0) // otherwise, input file do not exist
1498  {
1499  // first receive the receive counts ...
1500  MPI_Scatter(curptrs, 1, MPI_INT, &recvcount, 1, MPI_INT, master, World);
1501 
1502  if( recvcount == std::numeric_limits<int>::max())
1503  break;
1504 
1505  // create space for incoming data ...
1506  IT * tempinds = new IT[recvcount];
1507  NT * tempvals = new NT[recvcount];
1508 
1509  // receive actual data ... (first 4 arguments are ignored in the receiver side)
1510  MPI_Scatterv(inds, curptrs, displs, MPIType<IT>(), tempinds, recvcount, MPIType<IT>(), master, World);
1511  MPI_Scatterv(vals, curptrs, displs, MPIType<NT>(), tempvals, recvcount, MPIType<NT>(), master, World);
1512 
1513  // now push what is ours to tuples
1514  for(IT i=0; i< recvcount; ++i)
1515  {
1516  ind.push_back( tempinds[i] );
1517  num.push_back( tempvals[i] );
1518  }
1519  DeleteAll(tempinds, tempvals);
1520  }
1521  }
1522  delete [] displs;
1523  MPI_Barrier(World);
1524  return infile;
1525 }
1526 
1527 template <class IT, class NT>
1528 template <class HANDLER>
1529 void FullyDistSpVec<IT,NT>::SaveGathered(std::ofstream& outfile, int master, HANDLER handler, bool printProcSplits)
1530 {
1531  int rank, nprocs;
1532  MPI_Comm World = commGrid->GetWorld();
1533  MPI_Comm_rank(World, &rank);
1534  MPI_Comm_size(World, &nprocs);
1535  MPI_File thefile;
1536 
1537  char _fn[] = "temp_fullydistspvec"; // AL: this is to avoid the problem that C++ string literals are const char* while C string literals are char*, leading to a const warning (technically error, but compilers are tolerant)
1538  int mpi_err = MPI_File_open(World, _fn, MPI_MODE_CREATE | MPI_MODE_WRONLY, MPI_INFO_NULL, &thefile);
1539  if(mpi_err != MPI_SUCCESS)
1540  {
1541  char mpi_err_str[MPI_MAX_ERROR_STRING];
1542  int mpi_err_strlen;
1543  MPI_Error_string(mpi_err, mpi_err_str, &mpi_err_strlen);
1544  printf("MPI_File_open failed (%s)\n", mpi_err_str);
1545  MPI_Abort(World, 1);
1546  }
1547 
1548  IT * dist = new IT[nprocs];
1549  dist[rank] = getlocnnz();
1550  MPI_Allgather(MPI_IN_PLACE, 1, MPIType<IT>(), dist, 1, MPIType<IT>(), World);
1551  IT sizeuntil = std::accumulate(dist, dist+rank, static_cast<IT>(0));
1552  IT totalLength = TotalLength();
1553  IT totalNNZ = getnnz();
1554 
1555  struct mystruct
1556  {
1557  IT ind;
1558  NT num;
1559  };
1560 
1561  MPI_Datatype datatype;
1562  MPI_Type_contiguous(sizeof(mystruct), MPI_CHAR, &datatype );
1563  MPI_Type_commit(&datatype);
1564  int dsize;
1565  MPI_Type_size(datatype, &dsize);
1566 
1567  // The disp displacement argument specifies the position
1568  // (absolute offset in bytes from the beginning of the file)
1569  char native[] = "native"; // AL: this is to avoid the problem that C++ string literals are const char* while C string literals are char*, leading to a const warning (technically error, but compilers are tolerant)
1570  MPI_File_set_view(thefile, static_cast<int>(sizeuntil * dsize), datatype, datatype, native, MPI_INFO_NULL);
1571 
1572  int count = ind.size();
1573  mystruct * packed = new mystruct[count];
1574  for(int i=0; i<count; ++i)
1575  {
1576  packed[i].ind = ind[i] + sizeuntil;
1577  packed[i].num = num[i];
1578  }
1579 
1580  mpi_err = MPI_File_write(thefile, packed, count, datatype, NULL);
1581  if(mpi_err != MPI_SUCCESS)
1582  {
1583  char mpi_err_str[MPI_MAX_ERROR_STRING];
1584  int mpi_err_strlen;
1585  MPI_Error_string(mpi_err, mpi_err_str, &mpi_err_strlen);
1586  printf("MPI_File_write failed (%s)\n", mpi_err_str);
1587  MPI_Abort(World, 1);
1588  }
1589 
1590  MPI_Barrier(World);
1591  MPI_File_close(&thefile);
1592  delete [] packed;
1593 
1594  // Now let processor-0 read the file and print
1595  if(rank == 0)
1596  {
1597  FILE * f = fopen("temp_fullydistspvec", "r");
1598  if(!f)
1599  {
1600  std::cerr << "Problem reading binary input file\n";
1601  return;
1602  }
1603  IT maxd = *std::max_element(dist, dist+nprocs);
1604  mystruct * data = new mystruct[maxd];
1605 
1606  std::streamsize oldPrecision = outfile.precision();
1607  outfile.precision(21);
1608  outfile << totalLength << "\t1\t" << totalNNZ << std::endl;
1609  for(int i=0; i<nprocs; ++i)
1610  {
1611  // read n_per_proc integers and print them
1612  if (fread(data, dsize, dist[i], f) < static_cast<size_t>(dist[i]))
1613  {
1614  std::cout << "fread 660 failed! attempting to continue..." << std::endl;
1615  }
1616 
1617  if (printProcSplits)
1618  outfile << "Elements stored on proc " << i << ":" << std::endl;
1619 
1620  for (int j = 0; j < dist[i]; j++)
1621  {
1622  outfile << data[j].ind+1 << "\t1\t";
1623  handler.save(outfile, data[j].num, data[j].ind);
1624  outfile << std::endl;
1625  }
1626  }
1627  outfile.precision(oldPrecision);
1628  fclose(f);
1629  remove("temp_fullydistspvec");
1630  delete [] data;
1631  delete [] dist;
1632  }
1633  MPI_Barrier(World);
1634 }
1635 
1636 
1637 template <class IT, class NT>
1638 template <typename _Predicate>
1639 IT FullyDistSpVec<IT,NT>::Count(_Predicate pred) const
1640 {
1641  IT local = count_if( num.begin(), num.end(), pred );
1642  IT whole = 0;
1643  MPI_Allreduce( &local, &whole, 1, MPIType<IT>(), MPI_SUM, commGrid->GetWorld());
1644  return whole;
1645 }
1646 
1647 
1648 template <class IT, class NT>
1649 template <typename _BinaryOperation>
1650 NT FullyDistSpVec<IT,NT>::Reduce(_BinaryOperation __binary_op, NT init) const
1651 {
1652  // std::accumulate returns init for empty sequences
1653  // the semantics are init + num[0] + ... + num[n]
1654  NT localsum = std::accumulate( num.begin(), num.end(), init, __binary_op);
1655 
1656  NT totalsum = init;
1657  MPI_Allreduce( &localsum, &totalsum, 1, MPIType<NT>(), MPIOp<_BinaryOperation, NT>::op(), commGrid->GetWorld());
1658  return totalsum;
1659 }
1660 
1661 template <class IT, class NT>
1662 template <typename OUT, typename _BinaryOperation, typename _UnaryOperation>
1663 OUT FullyDistSpVec<IT,NT>::Reduce(_BinaryOperation __binary_op, OUT default_val, _UnaryOperation __unary_op) const
1664 {
1665  // std::accumulate returns identity for empty sequences
1666  OUT localsum = default_val;
1667 
1668  if (num.size() > 0)
1669  {
1670  typename std::vector< NT >::const_iterator iter = num.begin();
1671  //localsum = __unary_op(*iter);
1672  //iter++;
1673  while (iter < num.end())
1674  {
1675  localsum = __binary_op(localsum, __unary_op(*iter));
1676  iter++;
1677  }
1678  }
1679 
1680  OUT totalsum = default_val;
1681  MPI_Allreduce( &localsum, &totalsum, 1, MPIType<OUT>(), MPIOp<_BinaryOperation, OUT>::op(), commGrid->GetWorld());
1682  return totalsum;
1683 }
1684 
1685 template <class IT, class NT>
1686 void FullyDistSpVec<IT,NT>::PrintInfo(std::string vectorname) const
1687 {
1688  IT nznz = getnnz();
1689  if (commGrid->GetRank() == 0)
1690  std::cout << "As a whole, " << vectorname << " has: " << nznz << " nonzeros and length " << glen << std::endl;
1691 }
1692 
1693 template <class IT, class NT>
1695 {
1696  int rank, nprocs;
1697  MPI_Comm World = commGrid->GetWorld();
1698  MPI_Comm_rank(World, &rank);
1699  MPI_Comm_size(World, &nprocs);
1700  MPI_File thefile;
1701 
1702  char tfilename[32] = "temp_fullydistspvec";
1703  MPI_File_open(World, tfilename, MPI_MODE_CREATE | MPI_MODE_WRONLY, MPI_INFO_NULL, &thefile);
1704 
1705  IT * dist = new IT[nprocs];
1706  dist[rank] = getlocnnz();
1707  MPI_Allgather(MPI_IN_PLACE, 1, MPIType<IT>(), dist, 1, MPIType<IT>(), World);
1708  IT sizeuntil = std::accumulate(dist, dist+rank, static_cast<IT>(0));
1709 
1710  struct mystruct
1711  {
1712  IT ind;
1713  NT num;
1714  };
1715 
1716  MPI_Datatype datatype;
1717  MPI_Type_contiguous(sizeof(mystruct), MPI_CHAR, &datatype );
1718  MPI_Type_commit(&datatype);
1719  int dsize;
1720  MPI_Type_size(datatype, &dsize);
1721 
1722  // The disp displacement argument specifies the position
1723  // (absolute offset in bytes from the beginning of the file)
1724  char openmode[32] = "native";
1725  MPI_File_set_view(thefile, static_cast<int>(sizeuntil * dsize), datatype, datatype, openmode, MPI_INFO_NULL);
1726 
1727  int count = ind.size();
1728  mystruct * packed = new mystruct[count];
1729  for(int i=0; i<count; ++i)
1730  {
1731  packed[i].ind = ind[i];
1732  packed[i].num = num[i];
1733  }
1734  MPI_File_write(thefile, packed, count, datatype, MPI_STATUS_IGNORE);
1735  MPI_Barrier(World);
1736  MPI_File_close(&thefile);
1737  delete [] packed;
1738 
1739  // Now let processor-0 read the file and print
1740  if(rank == 0)
1741  {
1742  FILE * f = fopen("temp_fullydistspvec", "r");
1743  if(!f)
1744  {
1745  std::cerr << "Problem reading binary input file\n";
1746  return;
1747  }
1748  IT maxd = *std::max_element(dist, dist+nprocs);
1749  mystruct * data = new mystruct[maxd];
1750 
1751  for(int i=0; i<nprocs; ++i)
1752  {
1753  // read n_per_proc integers and print them
1754  if (fread(data, dsize, dist[i],f) < static_cast<size_t>(dist[i]))
1755  {
1756  std::cout << "fread 802 failed! attempting to continue..." << std::endl;
1757  }
1758 
1759  std::cout << "Elements stored on proc " << i << ": {";
1760  for (int j = 0; j < dist[i]; j++)
1761  {
1762  std::cout << "(" << data[j].ind << "," << data[j].num << "), ";
1763  }
1764  std::cout << "}" << std::endl;
1765  }
1766  fclose(f);
1767  remove("temp_fullydistspvec");
1768  delete [] data;
1769  delete [] dist;
1770  }
1771  MPI_Barrier(World);
1772 }
1773 
1774 template <class IT, class NT>
1776 {
1777  ind.resize(0);
1778  num.resize(0);
1779 }
1780 
1781 // Assigns given locations their value, needs to be sorted
1782 template <class IT, class NT>
1783 void FullyDistSpVec<IT,NT>::BulkSet(IT inds[], int count) {
1784  ind.resize(count);
1785  num.resize(count);
1786  std::copy(inds, inds+count, ind.data());
1787  std::copy(inds, inds+count, num.data());
1788 }
1789 
1790 
1791 
1792 /*
1793  ** Create a new sparse vector vout by swaping the indices and values of a sparse vector vin.
1794  ** the length of vout is globallen, which must be greater than the maximum entry of vin.
1795  ** nnz(vin) = nnz(vout)
1796  ** for every nonzero entry vin[k]: vout[vin[k]] = k
1797  */
1798 /*
1799 template <class IT, class NT>
1800 FullyDistSpVec<IT,NT> FullyDistSpVec<IT,NT>::Invert (IT globallen)
1801 {
1802  FullyDistSpVec<IT,NT> Inverted(commGrid, globallen);
1803  IT max_entry = Reduce(maximum<IT>(), (IT) 0 ) ;
1804  if(max_entry >= globallen)
1805  {
1806  cout << "Sparse vector has entries (" << max_entry << ") larger than requested global vector length " << globallen << endl;
1807  return Inverted;
1808  }
1809 
1810 
1811  int nprocs = commGrid->GetSize();
1812  vector< vector< NT > > datsent(nprocs);
1813  vector< vector< IT > > indsent(nprocs);
1814 
1815  IT ploclen = getlocnnz();
1816  for(IT k=0; k < ploclen; ++k)
1817  {
1818  IT locind;
1819  int owner = Inverted.Owner(num[k], locind); // numerical values in rhs are 0-based indices
1820  IT gind = ind[k] + LengthUntil();
1821  datsent[owner].push_back(gind);
1822  indsent[owner].push_back(locind); // so that we don't need no correction at the recipient
1823  }
1824  int * sendcnt = new int[nprocs];
1825  int * sdispls = new int[nprocs];
1826  for(int i=0; i<nprocs; ++i)
1827  sendcnt[i] = (int) datsent[i].size();
1828 
1829  int * rdispls = new int[nprocs];
1830  int * recvcnt = new int[nprocs];
1831  MPI_Comm World = commGrid->GetWorld();
1832  MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, World); // share the request counts
1833  sdispls[0] = 0;
1834  rdispls[0] = 0;
1835  for(int i=0; i<nprocs-1; ++i)
1836  {
1837  sdispls[i+1] = sdispls[i] + sendcnt[i];
1838  rdispls[i+1] = rdispls[i] + recvcnt[i];
1839  }
1840  NT * datbuf = new NT[ploclen];
1841  for(int i=0; i<nprocs; ++i)
1842  {
1843  copy(datsent[i].begin(), datsent[i].end(), datbuf+sdispls[i]);
1844  vector<NT>().swap(datsent[i]);
1845  }
1846  IT * indbuf = new IT[ploclen];
1847  for(int i=0; i<nprocs; ++i)
1848  {
1849  copy(indsent[i].begin(), indsent[i].end(), indbuf+sdispls[i]);
1850  vector<IT>().swap(indsent[i]);
1851  }
1852  IT totrecv = accumulate(recvcnt,recvcnt+nprocs, static_cast<IT>(0));
1853  NT * recvdatbuf = new NT[totrecv];
1854  MPI_Alltoallv(datbuf, sendcnt, sdispls, MPIType<NT>(), recvdatbuf, recvcnt, rdispls, MPIType<NT>(), World);
1855  delete [] datbuf;
1856 
1857  IT * recvindbuf = new IT[totrecv];
1858  MPI_Alltoallv(indbuf, sendcnt, sdispls, MPIType<IT>(), recvindbuf, recvcnt, rdispls, MPIType<IT>(), World);
1859  delete [] indbuf;
1860 
1861 
1862  vector< pair<IT,NT> > tosort; // in fact, tomerge would be a better name but it is unlikely to be faster
1863 
1864  for(int i=0; i<nprocs; ++i)
1865  {
1866  for(int j = rdispls[i]; j < rdispls[i] + recvcnt[i]; ++j) // fetch the numerical values
1867  {
1868  tosort.push_back(make_pair(recvindbuf[j], recvdatbuf[j]));
1869  }
1870  }
1871  DeleteAll(recvindbuf, recvdatbuf);
1872  DeleteAll(sdispls, rdispls, sendcnt, recvcnt);
1873  std::sort(tosort.begin(), tosort.end());
1874 
1875  IT lastIndex=-1;
1876  for(typename vector<pair<IT,NT>>::iterator itr = tosort.begin(); itr != tosort.end(); ++itr)
1877  {
1878  if(lastIndex!=itr->first) // avoid duplicate indices
1879  {
1880  Inverted.ind.push_back(itr->first);
1881  Inverted.num.push_back(itr->second);
1882  }
1883  lastIndex = itr->first;
1884 
1885  }
1886  return Inverted;
1887 
1888 }
1889 */
1890 
1891 
1892 
1893 /*
1894  ** Create a new sparse vector vout by swaping the indices and values of a sparse vector vin.
1895  ** the length of vout is globallen, which must be greater than the maximum entry of vin.
1896  ** nnz(vin) = nnz(vout)
1897  ** for every nonzero entry vin[k]: vout[vin[k]] = k
1898  */
1899 
1900 template <class IT, class NT>
1902 {
1903  FullyDistSpVec<IT,NT> Inverted(commGrid, globallen);
1904  IT max_entry = Reduce(maximum<IT>(), (IT) 0 ) ;
1905  if(max_entry >= globallen)
1906  {
1907  std::cout << "Sparse vector has entries (" << max_entry << ") larger than requested global vector length " << globallen << std::endl;
1908  return Inverted;
1909  }
1910 
1911  MPI_Comm World = commGrid->GetWorld();
1912  int nprocs = commGrid->GetSize();
1913  int * rdispls = new int[nprocs+1];
1914  int * recvcnt = new int[nprocs];
1915  int * sendcnt = new int[nprocs](); // initialize to 0
1916  int * sdispls = new int[nprocs+1];
1917 
1918 
1919  IT ploclen = getlocnnz();
1920 #ifdef _OPENMP
1921 #pragma omp parallel for
1922 #endif
1923  for(IT k=0; k < ploclen; ++k)
1924  {
1925  IT locind;
1926  int owner = Inverted.Owner(num[k], locind);
1927 #ifdef _OPENMP
1928  __sync_fetch_and_add(&sendcnt[owner], 1);
1929 #else
1930  sendcnt[owner]++;
1931 #endif
1932  }
1933 
1934 
1935  MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, World); // share the request counts
1936 
1937  sdispls[0] = 0;
1938  rdispls[0] = 0;
1939  for(int i=0; i<nprocs; ++i)
1940  {
1941  sdispls[i+1] = sdispls[i] + sendcnt[i];
1942  rdispls[i+1] = rdispls[i] + recvcnt[i];
1943  }
1944 
1945 
1946 
1947  NT * datbuf = new NT[ploclen];
1948  IT * indbuf = new IT[ploclen];
1949  int *count = new int[nprocs](); //current position
1950 #ifdef _OPENMP
1951 #pragma omp parallel for
1952 #endif
1953  for(IT i=0; i < ploclen; ++i)
1954  {
1955  IT locind;
1956  int owner = Inverted.Owner(num[i], locind);
1957  int id;
1958 #ifdef _OPENMP
1959  id = sdispls[owner] + __sync_fetch_and_add(&count[owner], 1);
1960 #else
1961  id = sdispls[owner] + count[owner];
1962  count[owner]++;
1963 #endif
1964  datbuf[id] = ind[i] + LengthUntil();
1965  indbuf[id] = locind;
1966  }
1967  delete [] count;
1968 
1969 
1970  IT totrecv = rdispls[nprocs];
1971  NT * recvdatbuf = new NT[totrecv];
1972  MPI_Alltoallv(datbuf, sendcnt, sdispls, MPIType<NT>(), recvdatbuf, recvcnt, rdispls, MPIType<NT>(), World);
1973  delete [] datbuf;
1974 
1975  IT * recvindbuf = new IT[totrecv];
1976  MPI_Alltoallv(indbuf, sendcnt, sdispls, MPIType<IT>(), recvindbuf, recvcnt, rdispls, MPIType<IT>(), World);
1977  delete [] indbuf;
1978 
1979 
1980  std::vector< std::pair<IT,NT> > tosort;
1981  tosort.resize(totrecv);
1982 #ifdef _OPENMP
1983 #pragma omp parallel for
1984 #endif
1985  for(int i=0; i<totrecv; ++i)
1986  {
1987  tosort[i] = std::make_pair(recvindbuf[i], recvdatbuf[i]);
1988  }
1989  DeleteAll(recvindbuf, recvdatbuf);
1990  DeleteAll(sdispls, rdispls, sendcnt, recvcnt);
1991 
1992 #if defined(GNU_PARALLEL) && defined(_OPENMP)
1993  __gnu_parallel::sort(tosort.begin(), tosort.end());
1994 #else
1995  std::sort(tosort.begin(), tosort.end());
1996 #endif
1997 
1998  Inverted.ind.reserve(totrecv);
1999  Inverted.num.reserve(totrecv);
2000  IT lastIndex=-1;
2001 
2002  // not threaded because Inverted.ind is kept sorted
2003  for(typename std::vector<std::pair<IT,NT>>::iterator itr = tosort.begin(); itr != tosort.end(); ++itr)
2004  {
2005  if(lastIndex!=itr->first) // avoid duplicate indices
2006  {
2007  Inverted.ind.push_back(itr->first);
2008  Inverted.num.push_back(itr->second);
2009  }
2010  lastIndex = itr->first;
2011 
2012  }
2013 
2014  return Inverted;
2015 
2016 }
2017 
2018 
2019 
2020 /*
2021  // generalized invert taking binary operations to define index and values of the inverted vector
2022  // _BinaryOperationDuplicate: function to reduce duplicate entries
2023  */
2024 
2025 template <class IT, class NT>
2026 template <typename _BinaryOperationIdx, typename _BinaryOperationVal, typename _BinaryOperationDuplicate>
2027 FullyDistSpVec<IT,NT> FullyDistSpVec<IT,NT>::Invert (IT globallen, _BinaryOperationIdx __binopIdx, _BinaryOperationVal __binopVal, _BinaryOperationDuplicate __binopDuplicate)
2028 
2029 {
2030 
2031  FullyDistSpVec<IT,NT> Inverted(commGrid, globallen);
2032 
2033 
2034  // identify the max index in the composed vector
2035  IT localmax = (IT) 0;
2036  for(size_t k=0; k < num.size(); ++k)
2037  {
2038  localmax = std::max(localmax, __binopIdx(num[k], ind[k] + LengthUntil()));
2039  }
2040  IT globalmax = (IT) 0;
2041  MPI_Allreduce( &localmax, &globalmax, 1, MPIType<IT>(), MPI_MAX, commGrid->GetWorld());
2042 
2043  if(globalmax >= globallen)
2044  {
2045  std::cout << "Sparse vector has entries (" << globalmax << ") larger than requested global vector length " << globallen << std::endl;
2046  return Inverted;
2047  }
2048 
2049 
2050 
2051  MPI_Comm World = commGrid->GetWorld();
2052  int nprocs = commGrid->GetSize();
2053  int * rdispls = new int[nprocs+1];
2054  int * recvcnt = new int[nprocs];
2055  int * sendcnt = new int[nprocs](); // initialize to 0
2056  int * sdispls = new int[nprocs+1];
2057 
2058 
2059  IT ploclen = getlocnnz();
2060 #ifdef _OPENMP
2061 #pragma omp parallel for
2062 #endif
2063  for(IT k=0; k < ploclen; ++k)
2064  {
2065  IT locind;
2066  IT globind = __binopIdx(num[k], ind[k] + LengthUntil()); // get global index of the inverted vector
2067  int owner = Inverted.Owner(globind, locind);
2068 
2069 #ifdef _OPENMP
2070  __sync_fetch_and_add(&sendcnt[owner], 1);
2071 #else
2072  sendcnt[owner]++;
2073 #endif
2074  }
2075 
2076 
2077  MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, World);
2078 
2079  sdispls[0] = 0;
2080  rdispls[0] = 0;
2081  for(int i=0; i<nprocs; ++i)
2082  {
2083  sdispls[i+1] = sdispls[i] + sendcnt[i];
2084  rdispls[i+1] = rdispls[i] + recvcnt[i];
2085  }
2086 
2087 
2088  NT * datbuf = new NT[ploclen];
2089  IT * indbuf = new IT[ploclen];
2090  int *count = new int[nprocs](); //current position
2091 #ifdef _OPENMP
2092 #pragma omp parallel for
2093 #endif
2094  for(IT i=0; i < ploclen; ++i)
2095  {
2096  IT locind;
2097  IT globind = __binopIdx(num[i], ind[i] + LengthUntil()); // get global index of the inverted vector
2098  int owner = Inverted.Owner(globind, locind);
2099  int id;
2100 #ifdef _OPENMP
2101  id = sdispls[owner] + __sync_fetch_and_add(&count[owner], 1);
2102 #else
2103  id = sdispls[owner] + count[owner];
2104  count[owner]++;
2105 #endif
2106  datbuf[id] = __binopVal(num[i], ind[i] + LengthUntil());
2107  indbuf[id] = locind;
2108  }
2109  delete [] count;
2110 
2111 
2112  IT totrecv = rdispls[nprocs];
2113  NT * recvdatbuf = new NT[totrecv];
2114  MPI_Alltoallv(datbuf, sendcnt, sdispls, MPIType<NT>(), recvdatbuf, recvcnt, rdispls, MPIType<NT>(), World);
2115  delete [] datbuf;
2116 
2117  IT * recvindbuf = new IT[totrecv];
2118  MPI_Alltoallv(indbuf, sendcnt, sdispls, MPIType<IT>(), recvindbuf, recvcnt, rdispls, MPIType<IT>(), World);
2119  delete [] indbuf;
2120 
2121 
2122  std::vector< std::pair<IT,NT> > tosort;
2123  tosort.resize(totrecv);
2124 #ifdef _OPENMP
2125 #pragma omp parallel for
2126 #endif
2127  for(int i=0; i<totrecv; ++i)
2128  {
2129  tosort[i] = std::make_pair(recvindbuf[i], recvdatbuf[i]);
2130  }
2131  DeleteAll(recvindbuf, recvdatbuf);
2132  DeleteAll(sdispls, rdispls, sendcnt, recvcnt);
2133 
2134 #if defined(GNU_PARALLEL) && defined(_OPENMP)
2135  __gnu_parallel::sort(tosort.begin(), tosort.end());
2136 #else
2137  std::sort(tosort.begin(), tosort.end());
2138 #endif
2139 
2140  Inverted.ind.reserve(totrecv);
2141  Inverted.num.reserve(totrecv);
2142 
2143 
2144  // not threaded because Inverted.ind is kept sorted
2145  for(typename std::vector<std::pair<IT,NT>>::iterator itr = tosort.begin(); itr != tosort.end(); )
2146  {
2147  IT ind = itr->first;
2148  NT val = itr->second;
2149  ++itr;
2150 
2151  while(itr != tosort.end() && itr->first == ind)
2152  {
2153  val = __binopDuplicate(val, itr->second);
2154  ++itr;
2155  }
2156 
2157 
2158  Inverted.ind.push_back(ind);
2159  Inverted.num.push_back(val);
2160 
2161  }
2162 
2163  return Inverted;
2164 
2165 }
2166 
2167 
2168 // Invert using RMA
2169 
2170 template <class IT, class NT>
2171 template <typename _BinaryOperationIdx, typename _BinaryOperationVal>
2172 FullyDistSpVec<IT,NT> FullyDistSpVec<IT,NT>::InvertRMA (IT globallen, _BinaryOperationIdx __binopIdx, _BinaryOperationVal __binopVal)
2173 
2174 {
2175 
2176  FullyDistSpVec<IT,NT> Inverted(commGrid, globallen);
2177  int myrank;
2178  MPI_Comm_rank(commGrid->GetWorld(), &myrank);
2179 
2180  // identify the max index in the composed vector
2181  IT localmax = (IT) 0;
2182  for(size_t k=0; k < num.size(); ++k)
2183  {
2184  localmax = std::max(localmax, __binopIdx(num[k], ind[k] + LengthUntil()));
2185  }
2186  IT globalmax = (IT) 0;
2187  MPI_Allreduce( &localmax, &globalmax, 1, MPIType<IT>(), MPI_MAX, commGrid->GetWorld());
2188 
2189  if(globalmax >= globallen)
2190  {
2191  std::cout << "Sparse vector has entries (" << globalmax << ") larger than requested global vector length " << globallen << std::endl;
2192  return Inverted;
2193  }
2194 
2195 
2196 
2197  MPI_Comm World = commGrid->GetWorld();
2198  int nprocs = commGrid->GetSize();
2199  int * rdispls = new int[nprocs+1];
2200  int * recvcnt = new int[nprocs](); // initialize to 0
2201  int * sendcnt = new int[nprocs](); // initialize to 0
2202  int * sdispls = new int[nprocs+1];
2203 
2204 
2205  IT ploclen = getlocnnz();
2206 #ifdef _OPENMP
2207 #pragma omp parallel for
2208 #endif
2209  for(IT k=0; k < ploclen; ++k)
2210  {
2211  IT locind;
2212  IT globind = __binopIdx(num[k], ind[k] + LengthUntil()); // get global index of the inverted vector
2213  int owner = Inverted.Owner(globind, locind);
2214 
2215 #ifdef _OPENMP
2216  __sync_fetch_and_add(&sendcnt[owner], 1);
2217 #else
2218  sendcnt[owner]++;
2219 #endif
2220  }
2221 
2222 
2223  MPI_Win win2;
2224  MPI_Win_create(recvcnt, nprocs * sizeof(MPI_INT), sizeof(MPI_INT), MPI_INFO_NULL, World, &win2);
2225  for(int i=0; i<nprocs; ++i)
2226  {
2227  if(sendcnt[i]>0)
2228  {
2229  MPI_Win_lock(MPI_LOCK_SHARED,i,MPI_MODE_NOCHECK,win2);
2230  MPI_Put(&sendcnt[i], 1, MPI_INT, i, myrank, 1, MPI_INT, win2);
2231  MPI_Win_unlock(i, win2);
2232  }
2233  }
2234  MPI_Win_free(&win2);
2235 
2236 
2237 
2238  sdispls[0] = 0;
2239  rdispls[0] = 0;
2240  for(int i=0; i<nprocs; ++i)
2241  {
2242  sdispls[i+1] = sdispls[i] + sendcnt[i];
2243  rdispls[i+1] = rdispls[i] + recvcnt[i];
2244  }
2245 
2246  int * rmadispls = new int[nprocs+1];
2247  MPI_Win win3;
2248  MPI_Win_create(rmadispls, nprocs * sizeof(MPI_INT), sizeof(MPI_INT), MPI_INFO_NULL, World, &win3);
2249  for(int i=0; i<nprocs; ++i)
2250  {
2251  if(recvcnt[i]>0)
2252  {
2253  MPI_Win_lock(MPI_LOCK_SHARED,i,MPI_MODE_NOCHECK,win3);
2254  MPI_Put(&rdispls[i], 1, MPI_INT, i, myrank, 1, MPI_INT, win3);
2255  MPI_Win_unlock(i, win3);
2256  }
2257  }
2258  MPI_Win_free(&win3);
2259 
2260 
2261  NT * datbuf = new NT[ploclen];
2262  IT * indbuf = new IT[ploclen];
2263  int *count = new int[nprocs](); //current position
2264 #ifdef _OPENMP
2265 #pragma omp parallel for
2266 #endif
2267  for(IT i=0; i < ploclen; ++i)
2268  {
2269  IT locind;
2270  IT globind = __binopIdx(num[i], ind[i] + LengthUntil()); // get global index of the inverted vector
2271  int owner = Inverted.Owner(globind, locind);
2272  int id;
2273 #ifdef _OPENMP
2274  id = sdispls[owner] + __sync_fetch_and_add(&count[owner], 1);
2275 #else
2276  id = sdispls[owner] + count[owner];
2277  count[owner]++;
2278 #endif
2279  datbuf[id] = __binopVal(num[i], ind[i] + LengthUntil());
2280  indbuf[id] = locind;
2281  }
2282  delete [] count;
2283 
2284 
2285  IT totrecv = rdispls[nprocs];
2286  NT * recvdatbuf = new NT[totrecv];
2287  IT * recvindbuf = new IT[totrecv];
2288  MPI_Win win, win1;
2289  MPI_Win_create(recvdatbuf, totrecv * sizeof(NT), sizeof(NT), MPI_INFO_NULL, commGrid->GetWorld(), &win);
2290  MPI_Win_create(recvindbuf, totrecv * sizeof(IT), sizeof(IT), MPI_INFO_NULL, commGrid->GetWorld(), &win1);
2291  //MPI_Win_fence(0, win);
2292  //MPI_Win_fence(0, win1);
2293  for(int i=0; i<nprocs; ++i)
2294  {
2295  if(sendcnt[i]>0)
2296  {
2297  MPI_Win_lock(MPI_LOCK_SHARED, i, 0, win);
2298  MPI_Put(&datbuf[sdispls[i]], sendcnt[i], MPIType<NT>(), i, rmadispls[i], sendcnt[i], MPIType<NT>(), win);
2299  MPI_Win_unlock(i, win);
2300 
2301  MPI_Win_lock(MPI_LOCK_SHARED, i, 0, win1);
2302  MPI_Put(&indbuf[sdispls[i]], sendcnt[i], MPIType<IT>(), i, rmadispls[i], sendcnt[i], MPIType<IT>(), win1);
2303  MPI_Win_unlock(i, win1);
2304  }
2305  }
2306  //MPI_Win_fence(0, win);
2307  //MPI_Win_fence(0, win1);
2308  MPI_Win_free(&win);
2309  MPI_Win_free(&win1);
2310 
2311  delete [] datbuf;
2312  delete [] indbuf;
2313  delete [] rmadispls;
2314 
2315 
2316  std::vector< std::pair<IT,NT> > tosort;
2317  tosort.resize(totrecv);
2318 #ifdef _OPENMP
2319 #pragma omp parallel for
2320 #endif
2321  for(int i=0; i<totrecv; ++i)
2322  {
2323  tosort[i] = std::make_pair(recvindbuf[i], recvdatbuf[i]);
2324  }
2325  DeleteAll(recvindbuf, recvdatbuf);
2326  DeleteAll(sdispls, rdispls, sendcnt, recvcnt);
2327 
2328 #if defined(GNU_PARALLEL) && defined(_OPENMP)
2329  __gnu_parallel::sort(tosort.begin(), tosort.end());
2330 #else
2331  std::sort(tosort.begin(), tosort.end());
2332 #endif
2333 
2334  Inverted.ind.reserve(totrecv);
2335  Inverted.num.reserve(totrecv);
2336  IT lastIndex=-1;
2337 
2338  // not threaded because Inverted.ind is kept sorted
2339  for(typename std::vector<std::pair<IT,NT>>::iterator itr = tosort.begin(); itr != tosort.end(); ++itr)
2340  {
2341  if(lastIndex!=itr->first) // avoid duplicate indices
2342  {
2343  Inverted.ind.push_back(itr->first);
2344  Inverted.num.push_back(itr->second);
2345  }
2346  lastIndex = itr->first;
2347 
2348  }
2349 
2350  return Inverted;
2351 
2352 }
2353 
2354 
2355 
2356 template <typename IT, typename NT>
2357 template <typename NT1, typename _UnaryOperation>
2358 void FullyDistSpVec<IT,NT>::Select (const FullyDistVec<IT,NT1> & denseVec, _UnaryOperation __unop)
2359 {
2360  if(*commGrid == *(denseVec.commGrid))
2361  {
2362  if(TotalLength() != denseVec.TotalLength())
2363  {
2364  std::ostringstream outs;
2365  outs << "Vector dimensions don't match (" << TotalLength() << " vs " << denseVec.TotalLength() << ") for Select\n";
2366  SpParHelper::Print(outs.str());
2367  MPI_Abort(MPI_COMM_WORLD, DIMMISMATCH);
2368  }
2369  else
2370  {
2371 
2372  IT spsize = getlocnnz();
2373  IT k = 0;
2374  // iterate over the sparse vector
2375  for(IT i=0; i< spsize; ++i)
2376  {
2377  if(__unop(denseVec.arr[ind[i]]))
2378  {
2379  ind[k] = ind[i];
2380  num[k++] = num[i];
2381  }
2382  }
2383  ind.resize(k);
2384  num.resize(k);
2385  }
2386  }
2387  else
2388  {
2389  std::ostringstream outs;
2390  outs << "Grids are not comparable for Select" << std::endl;
2391  SpParHelper::Print(outs.str());
2392  MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
2393  }
2394 }
2395 
2396 
2397 // \todo: Shouldn't this wrap EWiseApply for code maintanence instead?
2398 template <typename IT, typename NT>
2399 template <typename NT1>
2401 {
2402  if(*commGrid == *(other.commGrid))
2403  {
2404  if(TotalLength() != other.TotalLength())
2405  {
2406  std::ostringstream outs;
2407  outs << "Vector dimensions don't match (" << TotalLength() << " vs " << other.TotalLength() << ") for Select\n";
2408  SpParHelper::Print(outs.str());
2409  MPI_Abort(MPI_COMM_WORLD, DIMMISMATCH);
2410  }
2411  else
2412  {
2413 
2414  IT mysize = getlocnnz();
2415  IT othersize = other.getlocnnz();
2416  IT k = 0, i=0, j=0;
2417  // iterate over the sparse vector
2418  for(; i< mysize && j < othersize;)
2419  {
2420  if(other.ind[j] == ind[i]) //skip
2421  {
2422  i++; j++;
2423  }
2424  else if(other.ind[j] > ind[i])
2425  {
2426  ind[k] = ind[i];
2427  num[k++] = num[i++];
2428  }
2429  else j++;
2430  }
2431  while(i< mysize)
2432  {
2433  ind[k] = ind[i];
2434  num[k++] = num[i++];
2435  }
2436 
2437  ind.resize(k);
2438  num.resize(k);
2439  }
2440  }
2441  else
2442  {
2443  std::ostringstream outs;
2444  outs << "Grids are not comparable for Select" << std::endl;
2445  SpParHelper::Print(outs.str());
2446  MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
2447  }
2448 }
2449 
2450 
2451 
2452 template <typename IT, typename NT>
2453 template <typename NT1, typename _UnaryOperation, typename _BinaryOperation>
2454 void FullyDistSpVec<IT,NT>::SelectApply (const FullyDistVec<IT,NT1> & denseVec, _UnaryOperation __unop, _BinaryOperation __binop)
2455 {
2456  if(*commGrid == *(denseVec.commGrid))
2457  {
2458  if(TotalLength() != denseVec.TotalLength())
2459  {
2460  std::ostringstream outs;
2461  outs << "Vector dimensions don't match (" << TotalLength() << " vs " << denseVec.TotalLength() << ") for Select\n";
2462  SpParHelper::Print(outs.str());
2463  MPI_Abort(MPI_COMM_WORLD, DIMMISMATCH);
2464  }
2465  else
2466  {
2467 
2468  IT spsize = getlocnnz();
2469  IT k = 0;
2470  // iterate over the sparse vector
2471  for(IT i=0; i< spsize; ++i)
2472  {
2473  if(__unop(denseVec.arr[ind[i]]))
2474  {
2475  ind[k] = ind[i];
2476  num[k++] = __binop(num[i], denseVec.arr[ind[i]]);
2477  }
2478  }
2479  ind.resize(k);
2480  num.resize(k);
2481  }
2482  }
2483  else
2484  {
2485  std::ostringstream outs;
2486  outs << "Grids are not comparable for Select" << std::endl;
2487  SpParHelper::Print(outs.str());
2488  MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
2489  }
2490 }
2491 
2492 
2493 // apply an unary function to each nnz and return a new vector
2494 // can be a constrauctor
2495 /*
2496 template <typename IT, typename NT>
2497 template <typename NT1, typename _UnaryOperation>
2498 FullyDistSpVec<IT,NT1> FullyDistSpVec<IT,NT>::Apply(_UnaryOperation __unop)
2499 {
2500  FullyDistSpVec<IT,NT1> composed(commGrid, TotalLength());
2501  IT spsize = getlocnnz();
2502  for(IT i=0; i< spsize; ++i)
2503  {
2504  composed.ind.push_back(ind[i]);
2505  composed.num.push_back( __unop(num[i]));
2506  }
2507  return composed;
2508 }
2509 */
2510 
2511 
2512 
2513 /* exp version
2514  */
2515 template <class IT, class NT>
2516 template <typename _UnaryOperation>
2517 void FullyDistSpVec<IT,NT>::FilterByVal (FullyDistSpVec<IT,IT> Selector, _UnaryOperation __unop, bool filterByIndex)
2518 {
2519  if(*commGrid != *(Selector.commGrid))
2520  {
2521  std::ostringstream outs;
2522  outs << "Grids are not comparable for Filter" << std::endl;
2523  SpParHelper::Print(outs.str());
2524  MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
2525  }
2526  int nprocs = commGrid->GetSize();
2527  MPI_Comm World = commGrid->GetWorld();
2528 
2529 
2530  int * rdispls = new int[nprocs];
2531  int sendcnt = Selector.ind.size();
2532  int * recvcnt = new int[nprocs];
2533  MPI_Allgather(&sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, World);
2534 
2535  rdispls[0] = 0;
2536  for(int i=0; i<nprocs-1; ++i)
2537  {
2538  rdispls[i+1] = rdispls[i] + recvcnt[i];
2539  }
2540 
2541 
2542  IT * sendbuf = new IT[sendcnt];
2543 #ifdef _OPENMP
2544 #pragma omp parallel for
2545 #endif
2546  for(int k=0; k<sendcnt; k++)
2547  {
2548  if(filterByIndex)
2549  sendbuf[k] = Selector.ind[k] + Selector.LengthUntil();
2550  else
2551  sendbuf[k] = Selector.num[k];
2552  }
2553 
2554  IT totrecv = std::accumulate(recvcnt,recvcnt+nprocs, static_cast<IT>(0));
2555 
2556  std::vector<IT> recvbuf;
2557  recvbuf.resize(totrecv);
2558 
2559  MPI_Allgatherv(sendbuf, sendcnt, MPIType<IT>(), recvbuf.data(), recvcnt, rdispls, MPIType<IT>(), World);
2560  delete [] sendbuf;
2561  DeleteAll(rdispls,recvcnt);
2562 
2563  if(!filterByIndex) // need to sort
2564  {
2565 #if defined(GNU_PARALLEL) && defined(_OPENMP)
2566  __gnu_parallel::sort(recvbuf.begin(), recvbuf.end());
2567 #else
2568  std::sort(recvbuf.begin(), recvbuf.end());
2569 #endif
2570  }
2571 
2572  // now perform filter (recvbuf is sorted) // TODO: OpenMP parallel and keep things sorted
2573  IT k=0;
2574 
2575  for(IT i=0; i<num.size(); i++)
2576  {
2577  IT val = __unop(num[i]);
2578  if(!std::binary_search(recvbuf.begin(), recvbuf.end(), val))
2579  {
2580  ind[k] = ind[i];
2581  num[k++] = num[i];
2582  }
2583  }
2584  ind.resize(k);
2585  num.resize(k);
2586 
2587 
2588 }
2589 
2590 }
static void iota(_ForwardIter __first, _ForwardIter __last, T __val)
Definition: SpHelper.h:226
void SetElement(IT indx, NT numx)
Indexing is performed 0-based.
void Setminus(const FullyDistSpVec< IT, NT1 > &other)
void stealFrom(FullyDistSpVec< IT, NT > &victim)
A set of parallel utilities.
Compute the maximum of two values.
Definition: Operations.h:154
MPI_Datatype MPIType< int64_t >(void)
Definition: MPIType.cpp:64
bool is_sorted(_RandomAccessIter first, _RandomAccessIter last, _Compare comp, MPI_Comm comm)
Definition: psort_util.h:58
#define NOFILE
Definition: SpDefs.h:75
int size
void nziota(NT first)
iota over existing nonzero entries
void FilterByVal(FullyDistSpVec< IT, IT > Selector, _UnaryOperation __unop, bool filterByIndex)
void ParallelRead(const std::string &filename, bool onebased, _BinaryOperation BinOp)
void DeleteAll(A arr1)
Definition: Deleter.h:48
FullyDistSpVec< IT, NT > & operator=(const FullyDistSpVec< IT, NT > &rhs)
void Select(const FullyDistVec< IT, NT1 > &denseVec, _UnaryOperation unop)
void BulkSet(IT inds[], int count)
FullyDistSpVec< IT, NT > Invert(IT globallen)
IT NnzUntil() const
Returns the number of nonzeros until this processor.
FullyDistSpVec< IT, NT > InvertRMA(IT globallen, _BinaryOperationIdx __binopIdx, _BinaryOperationVal __binopVal)
void PrintInfo(std::string vecname) const
NT Reduce(_BinaryOperation __binary_op, NT init) const
FullyDistVec< IT, IT > FindInds(_Predicate pred) const
static void Print(const std::string &s)
void SaveGathered(std::ofstream &outfile, int master, HANDLER handler, bool printProcSplits=false)
void SelectApply(const FullyDistVec< IT, NT1 > &denseVec, _UnaryOperation __unop, _BinaryOperation __binop)
FullyDistVec< IT, NT > operator()(const FullyDistVec< IT, IT > &ri) const
SpRef (expects ri to be 0-based)
void iota(IT globalsize, NT first)
#define MEMORYINBYTES
Definition: SpDefs.h:129
#define GRIDMISMATCH
Definition: SpDefs.h:72
static std::vector< std::pair< KEY, VAL > > KeyValuePSort(std::pair< KEY, VAL > *array, IT length, IT *dist, const MPI_Comm &comm)
long int64_t
Definition: compat.h:21
std::ifstream & ReadDistribute(std::ifstream &infile, int master, HANDLER handler)
Totally obsolete version that only accepts an ifstream object and ascii files.
FullyDistSpVec< IT, NT > Uniq(_BinaryOperation __binary_op, MPI_Op mympiop)
void ParallelWrite(const std::string &filename, bool onebased, HANDLER handler, bool includeindices=true, bool includeheader=false)
FullyDistSpVec< IT, NT > & operator+=(const FullyDistSpVec< IT, NT > &rhs)
#define DIMMISMATCH
Definition: SpDefs.h:73
static bool FetchBatch(MPI_File &infile, MPI_Offset &curpos, MPI_Offset end_fpos, bool firstcall, std::vector< std::string > &lines, int myrank)
Definition: CCGrid.h:4
FullyDistVec< IT, NT > FindVals(_Predicate pred) const
FullyDistSpVec< IT, NT > & operator-=(const FullyDistSpVec< IT, NT > &rhs)
NT Reduce(_BinaryOperation __binary_op, NT identity) const
void MurmurHash3_x64_64(const void *key, int len, uint32_t seed, void *out)
Definition: hash.cpp:194
int rank
FullyDistSpVec< IT, IT > sort()
sort the vector itself, return the permutation vector (0-based)
IT Count(_Predicate pred) const
Return the number of elements for which pred is true.