COMBINATORIAL_BLAS  1.6
SpParMat.cpp
Go to the documentation of this file.
1 /****************************************************************/
2 /* Parallel Combinatorial BLAS Library (for Graph Computations) */
3 /* version 1.6 -------------------------------------------------*/
4 /* date: 6/15/2017 ---------------------------------------------*/
5 /* authors: Ariful Azad, Aydin Buluc --------------------------*/
6 /****************************************************************/
7 /*
8  Copyright (c) 2010-2017, The Regents of the University of California
9 
10  Permission is hereby granted, free of charge, to any person obtaining a copy
11  of this software and associated documentation files (the "Software"), to deal
12  in the Software without restriction, including without limitation the rights
13  to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14  copies of the Software, and to permit persons to whom the Software is
15  furnished to do so, subject to the following conditions:
16 
17  The above copyright notice and this permission notice shall be included in
18  all copies or substantial portions of the Software.
19 
20  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21  IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22  FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23  AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24  LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25  OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26  THE SOFTWARE.
27  */
28 
29 
30 
31 #include "SpParMat.h"
32 #include "ParFriends.h"
33 #include "Operations.h"
34 #include "FileHeader.h"
35 extern "C" {
36 #include "mmio.h"
37 }
38 #include <sys/types.h>
39 #include <sys/stat.h>
40 
41 #include <mpi.h>
42 #include <fstream>
43 #include <algorithm>
44 #include <set>
45 #include <stdexcept>
46 
47 namespace combblas {
48 
52 template <class IT, class NT, class DER>
53 SpParMat< IT,NT,DER >::SpParMat (std::ifstream & input, MPI_Comm & world)
54 {
55  assert( (sizeof(IT) >= sizeof(typename DER::LocalIT)) );
56  if(!input.is_open())
57  {
58  perror("Input file doesn't exist\n");
59  exit(-1);
60  }
61  commGrid.reset(new CommGrid(world, 0, 0));
62  input >> (*spSeq);
63 }
64 
65 template <class IT, class NT, class DER>
66 SpParMat< IT,NT,DER >::SpParMat (DER * myseq, MPI_Comm & world): spSeq(myseq)
67 {
68  assert( (sizeof(IT) >= sizeof(typename DER::LocalIT)) );
69  commGrid.reset(new CommGrid(world, 0, 0));
70 }
71 
72 template <class IT, class NT, class DER>
73 SpParMat< IT,NT,DER >::SpParMat (DER * myseq, std::shared_ptr<CommGrid> grid): spSeq(myseq)
74 {
75  assert( (sizeof(IT) >= sizeof(typename DER::LocalIT)) );
76  commGrid = grid;
77 }
78 
79 template <class IT, class NT, class DER>
80 SpParMat< IT,NT,DER >::SpParMat (std::shared_ptr<CommGrid> grid)
81 {
82  assert( (sizeof(IT) >= sizeof(typename DER::LocalIT)) );
83  spSeq = new DER();
84  commGrid = grid;
85 }
86 
88 template <class IT, class NT, class DER>
90 {
91  SpParHelper::Print("COMBBLAS Warning: It is dangerous to create (matrix) objects without specifying the communicator, are you sure you want to create this object in MPI_COMM_WORLD?\n");
92  assert( (sizeof(IT) >= sizeof(typename DER::LocalIT)) );
93  spSeq = new DER();
94  commGrid.reset(new CommGrid(MPI_COMM_WORLD, 0, 0));
95 }
96 
100 template <class IT, class NT, class DER>
102 {
103 
104  assert( (sizeof(IT) >= sizeof(typename DER::LocalIT)) );
105  spSeq = new DER();
106  commGrid.reset(new CommGrid(world, 0, 0));
107 }
108 
109 template <class IT, class NT, class DER>
111 {
112  if(spSeq != NULL) delete spSeq;
113 }
114 
115 template <class IT, class NT, class DER>
117 {
118  if(spSeq != NULL) delete spSeq;
119  spSeq = NULL;
120 }
121 
122 
128 template <class IT, class NT, class DER>
129 template <typename VT, typename GIT> // GIT: global index type of vector
130 void SpParMat<IT,NT,DER>::TopKGather(std::vector<NT> & all_medians, std::vector<IT> & nnz_per_col, int & thischunk, int & chunksize,
131  const std::vector<NT> & activemedians, const std::vector<IT> & activennzperc, int itersuntil,
132  std::vector< std::vector<NT> > & localmat, const std::vector<IT> & actcolsmap, std::vector<IT> & klimits,
133  std::vector<IT> & toretain, std::vector<std::vector<std::pair<IT,NT>>> & tmppair, IT coffset, const FullyDistVec<GIT,VT> & rvec) const
134 {
135  int rankincol = commGrid->GetRankInProcCol();
136  int colneighs = commGrid->GetGridRows();
137  int nprocs = commGrid->GetSize();
138  std::vector<double> finalWeightedMedians(thischunk, 0.0);
139 
140  MPI_Gather(activemedians.data() + itersuntil*chunksize, thischunk, MPIType<NT>(), all_medians.data(), thischunk, MPIType<NT>(), 0, commGrid->GetColWorld());
141  MPI_Gather(activennzperc.data() + itersuntil*chunksize, thischunk, MPIType<IT>(), nnz_per_col.data(), thischunk, MPIType<IT>(), 0, commGrid->GetColWorld());
142 
143  if(rankincol == 0)
144  {
145  std::vector<double> columnCounts(thischunk, 0.0);
146  std::vector< std::pair<NT, double> > mediansNweights(colneighs); // (median,weight) pairs [to be reused at each iteration]
147 
148  for(int j = 0; j < thischunk; ++j) // for each column
149  {
150  for(int k = 0; k<colneighs; ++k)
151  {
152  size_t fetchindex = k*thischunk+j;
153  columnCounts[j] += static_cast<double>(nnz_per_col[fetchindex]);
154  }
155  for(int k = 0; k<colneighs; ++k)
156  {
157  size_t fetchindex = k*thischunk+j;
158  mediansNweights[k] = std::make_pair(all_medians[fetchindex], static_cast<double>(nnz_per_col[fetchindex]) / columnCounts[j]);
159  }
160  sort(mediansNweights.begin(), mediansNweights.end()); // sort by median
161 
162  double sumofweights = 0;
163  int k = 0;
164  while( k<colneighs && sumofweights < 0.5)
165  {
166  sumofweights += mediansNweights[k++].second;
167  }
168  finalWeightedMedians[j] = mediansNweights[k-1].first;
169  }
170  }
171  MPI_Bcast(finalWeightedMedians.data(), thischunk, MPIType<double>(), 0, commGrid->GetColWorld());
172 
173  std::vector<IT> larger(thischunk, 0);
174  std::vector<IT> smaller(thischunk, 0);
175  std::vector<IT> equal(thischunk, 0);
176 
177 #ifdef THREADED
178 #pragma omp parallel for
179 #endif
180  for(int j = 0; j < thischunk; ++j) // for each active column
181  {
182  size_t fetchindex = actcolsmap[j+itersuntil*chunksize];
183  for(size_t k = 0; k < localmat[fetchindex].size(); ++k)
184  {
185  // count those above/below/equal to the median
186  if(localmat[fetchindex][k] > finalWeightedMedians[j])
187  larger[j]++;
188  else if(localmat[fetchindex][k] < finalWeightedMedians[j])
189  smaller[j]++;
190  else
191  equal[j]++;
192  }
193  }
194  MPI_Allreduce(MPI_IN_PLACE, larger.data(), thischunk, MPIType<IT>(), MPI_SUM, commGrid->GetColWorld());
195  MPI_Allreduce(MPI_IN_PLACE, smaller.data(), thischunk, MPIType<IT>(), MPI_SUM, commGrid->GetColWorld());
196  MPI_Allreduce(MPI_IN_PLACE, equal.data(), thischunk, MPIType<IT>(), MPI_SUM, commGrid->GetColWorld());
197 
198  int numThreads = 1; // default case
199 #ifdef THREADED
200  omp_lock_t lock[nprocs]; // a lock per recipient
201  for (int i=0; i<nprocs; i++)
202  omp_init_lock(&(lock[i]));
203 #pragma omp parallel
204  {
205  numThreads = omp_get_num_threads();
206  }
207 #endif
208 
209  std::vector < std::vector<IT> > perthread2retain(numThreads);
210 
211 #ifdef THREADED
212 #pragma omp parallel for
213 #endif
214  for(int j = 0; j < thischunk; ++j) // for each active column
215  {
216 #ifdef THREADED
217  int myThread = omp_get_thread_num();
218 #else
219  int myThread = 0;
220 #endif
221 
222  // both clmapindex and fetchindex are unique for a given j (hence not shared among threads)
223  size_t clmapindex = j+itersuntil*chunksize; // klimits is of the same length as actcolsmap
224  size_t fetchindex = actcolsmap[clmapindex]; // localmat can only be dereferenced using the original indices.
225 
226  // these following if/else checks are the same (because klimits/large/equal vectors are mirrored) on every processor along ColWorld
227  if(klimits[clmapindex] <= larger[j]) // the entries larger than Weighted-Median are plentiful, we can discard all the smaller/equal guys
228  {
229  std::vector<NT> survivors;
230  for(size_t k = 0; k < localmat[fetchindex].size(); ++k)
231  {
232  if(localmat[fetchindex][k] > finalWeightedMedians[j]) // keep only the large guys (even equal guys go)
233  survivors.push_back(localmat[fetchindex][k]);
234  }
235  localmat[fetchindex].swap(survivors);
236  perthread2retain[myThread].push_back(clmapindex); // items to retain in actcolsmap
237  }
238  else if (klimits[clmapindex] > larger[j] + equal[j]) // the elements that are either larger or equal-to are surely keepers, no need to reprocess them
239  {
240  std::vector<NT> survivors;
241  for(size_t k = 0; k < localmat[fetchindex].size(); ++k)
242  {
243  if(localmat[fetchindex][k] < finalWeightedMedians[j]) // keep only the small guys (even equal guys go)
244  survivors.push_back(localmat[fetchindex][k]);
245  }
246  localmat[fetchindex].swap(survivors);
247 
248  klimits[clmapindex] -= (larger[j] + equal[j]); // update the k limit for this column only
249  perthread2retain[myThread].push_back(clmapindex); // items to retain in actcolsmap
250  }
251  else // larger[j] < klimits[clmapindex] && klimits[clmapindex] <= larger[j] + equal[j]
252  { // we will always have equal[j] > 0 because the weighted median is part of the dataset so it has to be equal to itself.
253  std::vector<NT> survivors;
254  for(size_t k = 0; k < localmat[fetchindex].size(); ++k)
255  {
256  if(localmat[fetchindex][k] >= finalWeightedMedians[j]) // keep the larger and equal to guys (might exceed k-limit but that's fine according to MCL)
257  survivors.push_back(localmat[fetchindex][k]);
258  }
259  localmat[fetchindex].swap(survivors);
260 
261  // We found it: the kth largest element in column (coffset + fetchindex) is finalWeightedMedians[j]
262  // But everyone in the same processor column has the information, only one of them should send it
263  IT n_perproc = getlocalcols() / colneighs; // find a typical processor's share
264  int assigned = std::max(static_cast<int>(fetchindex/n_perproc), colneighs-1);
265  if( assigned == rankincol)
266  {
267  IT locid;
268  int owner = rvec.Owner(coffset + fetchindex, locid);
269 
270  #ifdef THREADED
271  omp_set_lock(&(lock[owner]));
272  #endif
273  tmppair[owner].emplace_back(std::make_pair(locid, finalWeightedMedians[j]));
274  #ifdef THREADED
275  omp_unset_lock(&(lock[owner]));
276  #endif
277  }
278  } // end_else
279  } // end_for
280  // ------ concatenate toretain "indices" processed by threads ------
281  std::vector<IT> tdisp(numThreads+1);
282  tdisp[0] = 0;
283  for(int i=0; i<numThreads; ++i)
284  {
285  tdisp[i+1] = tdisp[i] + perthread2retain[i].size();
286  }
287  toretain.resize(tdisp[numThreads]);
288 
289 #pragma omp parallel for
290  for(int i=0; i< numThreads; i++)
291  {
292  std::copy(perthread2retain[i].data() , perthread2retain[i].data()+ perthread2retain[i].size(), toretain.data() + tdisp[i]);
293  }
294 
295 #ifdef THREADED
296  for (int i=0; i<nprocs; i++) // destroy the locks
297  omp_destroy_lock(&(lock[i]));
298 #endif
299 }
300 
301 
307 template <class IT, class NT, class DER>
308 template <typename VT, typename GIT> // GIT: global index type of vector
310 {
311  if(*rvec.commGrid != *commGrid)
312  {
313  SpParHelper::Print("Grids are not comparable, SpParMat::Kselect() fails!", commGrid->GetWorld());
314  MPI_Abort(MPI_COMM_WORLD,GRIDMISMATCH);
315  }
316 
317 
318  int rankincol = commGrid->GetRankInProcCol();
319  int rankinrow = commGrid->GetRankInProcRow();
320  int rowneighs = commGrid->GetGridCols(); // get # of processors on the row
321  int colneighs = commGrid->GetGridRows();
322  int myrank = commGrid->GetRank();
323  int nprocs = commGrid->GetSize();
324 
325  FullyDistVec<GIT,IT> colcnt(commGrid);
326  Reduce(colcnt, Column, std::plus<IT>(), (IT) 0, [](NT i){ return (IT) 1;});
327 
328  // <begin> Gather vector along columns (Logic copied from DimApply)
329  int xsize = (int) colcnt.LocArrSize();
330  int trxsize = 0;
331  int diagneigh = colcnt.commGrid->GetComplementRank();
332  MPI_Status status;
333  MPI_Sendrecv(&xsize, 1, MPI_INT, diagneigh, TRX, &trxsize, 1, MPI_INT, diagneigh, TRX, commGrid->GetWorld(), &status);
334 
335  IT * trxnums = new IT[trxsize];
336  MPI_Sendrecv(const_cast<IT*>(SpHelper::p2a(colcnt.arr)), xsize, MPIType<IT>(), diagneigh, TRX, trxnums, trxsize, MPIType<IT>(), diagneigh, TRX, commGrid->GetWorld(), &status);
337 
338  int * colsize = new int[colneighs];
339  colsize[rankincol] = trxsize;
340  MPI_Allgather(MPI_IN_PLACE, 1, MPI_INT, colsize, 1, MPI_INT, commGrid->GetColWorld());
341  int * dpls = new int[colneighs](); // displacements (zero initialized pid)
342  std::partial_sum(colsize, colsize+colneighs-1, dpls+1);
343  int accsize = std::accumulate(colsize, colsize+colneighs, 0);
344  std::vector<IT> percsum(accsize); // per column sum of the number of entries
345 
346  MPI_Allgatherv(trxnums, trxsize, MPIType<IT>(), percsum.data(), colsize, dpls, MPIType<VT>(), commGrid->GetColWorld());
347  DeleteAll(trxnums,colsize, dpls);
348  // <end> Gather vector along columns
349 
350  IT locm = getlocalcols(); // length (number of columns) assigned to this processor (and processor column)
351  std::vector< std::vector<NT> > localmat(locm); // some sort of minimal local copy of matrix
352 
353 #ifdef COMBBLAS_DEBUG
354  if(accsize != locm) std::cout << "Gather vector along columns logic is wrong" << std::endl;
355 #endif
356 
357  for(typename DER::SpColIter colit = spSeq->begcol(); colit != spSeq->endcol(); ++colit) // iterate over columns
358  {
359  if(percsum[colit.colid()] >= k_limit) // don't make a copy of an inactive column
360  {
361  localmat[colit.colid()].reserve(colit.nnz());
362  for(typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit); nzit < spSeq->endnz(colit); ++nzit)
363  {
364  localmat[colit.colid()].push_back(nzit.value());
365  }
366  }
367  }
368 
369  int64_t activecols = std::count_if(percsum.begin(), percsum.end(), [k_limit](IT i){ return i >= k_limit;});
370  int64_t activennz = std::accumulate(percsum.begin(), percsum.end(), (int64_t) 0);
371 
372  int64_t totactcols, totactnnzs;
373  MPI_Allreduce(&activecols, &totactcols, 1, MPIType<int64_t>(), MPI_SUM, commGrid->GetRowWorld());
374  if(myrank == 0) std::cout << "Number of initial active columns are " << totactcols << std::endl;
375 
376  MPI_Allreduce(&activennz, &totactnnzs, 1, MPIType<int64_t>(), MPI_SUM, commGrid->GetRowWorld());
377  if(myrank == 0) std::cout << "Number of initial nonzeros are " << totactnnzs << std::endl;
378 
379 #ifdef COMBBLAS_DEBUG
380  IT glactcols = colcnt.Count([k_limit](IT i){ return i >= k_limit;});
381  if(myrank == 0) std::cout << "Number of initial active columns are " << glactcols << std::endl;
382  if(glactcols != totactcols) if(myrank == 0) std::cout << "Wrong number of active columns are computed" << std::endl;
383 #endif
384 
385 
386  rvec = FullyDistVec<GIT,VT> ( rvec.getcommgrid(), getncol(), std::numeric_limits<NT>::min()); // set length of rvec correctly
387 
388 #ifdef COMBBLAS_DEBUG
389  PrintInfo();
390  rvec.PrintInfo("rvector");
391 #endif
392 
393  if(totactcols == 0)
394  {
395  std::ostringstream ss;
396  ss << "TopK: k_limit (" << k_limit <<")" << " >= maxNnzInColumn. Returning the result of Reduce(Column, minimum<NT>()) instead..." << std::endl;
397  SpParHelper::Print(ss.str());
398  return false; // no prune needed
399  }
400 
401 
402  std::vector<IT> actcolsmap(activecols); // the map that gives the original index of that active column (this map will shrink over iterations)
403  for (IT i=0, j=0; i< locm; ++i) {
404  if(percsum[i] >= k_limit)
405  actcolsmap[j++] = i;
406  }
407 
408  std::vector<NT> all_medians;
409  std::vector<IT> nnz_per_col;
410  std::vector<IT> klimits(activecols, k_limit); // is distributed management of this vector needed?
411  int activecols_lowerbound = 10*colneighs;
412 
413 
414  IT * locncols = new IT[rowneighs];
415  locncols[rankinrow] = locm;
416  MPI_Allgather(MPI_IN_PLACE, 0, MPIType<IT>(),locncols, 1, MPIType<IT>(), commGrid->GetRowWorld());
417  IT coffset = std::accumulate(locncols, locncols+rankinrow, static_cast<IT>(0));
418  delete [] locncols;
419 
420  /* Create/allocate variables for vector assignment */
421  MPI_Datatype MPI_pair;
422  MPI_Type_contiguous(sizeof(std::pair<IT,NT>), MPI_CHAR, &MPI_pair);
423  MPI_Type_commit(&MPI_pair);
424 
425  int * sendcnt = new int[nprocs];
426  int * recvcnt = new int[nprocs];
427  int * sdispls = new int[nprocs]();
428  int * rdispls = new int[nprocs]();
429 
430  while(totactcols > 0)
431  {
432  int chunksize, iterations, lastchunk;
433  if(activecols > activecols_lowerbound)
434  {
435  // two reasons for chunking:
436  // (1) keep memory limited to activecols (<= n/sqrt(p))
437  // (2) avoid overflow in sentcount
438  chunksize = static_cast<int>(activecols/colneighs); // invariant chunksize >= 10 (by activecols_lowerbound)
439  iterations = std::max(static_cast<int>(activecols/chunksize), 1);
440  lastchunk = activecols - (iterations-1)*chunksize; // lastchunk >= chunksize by construction
441  }
442  else
443  {
444  chunksize = activecols;
445  iterations = 1;
446  lastchunk = activecols;
447  }
448  std::vector<NT> activemedians(activecols); // one per "active" column
449  std::vector<IT> activennzperc(activecols);
450 
451 #ifdef THREADED
452 #pragma omp parallel for
453 #endif
454  for(IT i=0; i< activecols; ++i) // recompute the medians and nnzperc
455  {
456  size_t orgindex = actcolsmap[i]; // assert: no two threads will share the same "orgindex"
457  if(localmat[orgindex].empty())
458  {
459  activemedians[i] = (NT) 0;
460  activennzperc[i] = 0;
461  }
462  else
463  {
464  // this actually *sorts* increasing but doesn't matter as long we solely care about the median as opposed to a general nth element
465  auto entriesincol(localmat[orgindex]); // create a temporary vector as nth_element modifies the vector
466  std::nth_element(entriesincol.begin(), entriesincol.begin() + entriesincol.size()/2, entriesincol.end());
467  activemedians[i] = entriesincol[entriesincol.size()/2];
468  activennzperc[i] = entriesincol.size();
469  }
470  }
471 
472  percsum.resize(activecols, 0);
473  MPI_Allreduce(activennzperc.data(), percsum.data(), activecols, MPIType<IT>(), MPI_SUM, commGrid->GetColWorld());
474  activennz = std::accumulate(percsum.begin(), percsum.end(), (int64_t) 0);
475 
476 #ifdef COMBBLAS_DEBUG
477  MPI_Allreduce(&activennz, &totactnnzs, 1, MPIType<int64_t>(), MPI_SUM, commGrid->GetRowWorld());
478  if(myrank == 0) std::cout << "Number of active nonzeros are " << totactnnzs << std::endl;
479 #endif
480 
481  std::vector<IT> toretain;
482  if(rankincol == 0)
483  {
484  all_medians.resize(lastchunk*colneighs);
485  nnz_per_col.resize(lastchunk*colneighs);
486  }
487  std::vector< std::vector< std::pair<IT,NT> > > tmppair(nprocs);
488  for(int i=0; i< iterations-1; ++i) // this loop should not be parallelized if we want to keep storage small
489  {
490  TopKGather(all_medians, nnz_per_col, chunksize, chunksize, activemedians, activennzperc, i, localmat, actcolsmap, klimits, toretain, tmppair, coffset, rvec);
491  }
492  TopKGather(all_medians, nnz_per_col, lastchunk, chunksize, activemedians, activennzperc, iterations-1, localmat, actcolsmap, klimits, toretain, tmppair, coffset, rvec);
493 
494  /* Set the newly found vector entries */
495  IT totsend = 0;
496  for(IT i=0; i<nprocs; ++i)
497  {
498  sendcnt[i] = tmppair[i].size();
499  totsend += tmppair[i].size();
500  }
501 
502  MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, commGrid->GetWorld());
503 
504  std::partial_sum(sendcnt, sendcnt+nprocs-1, sdispls+1);
505  std::partial_sum(recvcnt, recvcnt+nprocs-1, rdispls+1);
506  IT totrecv = std::accumulate(recvcnt,recvcnt+nprocs, static_cast<IT>(0));
507  assert((totsend < std::numeric_limits<int>::max()));
508  assert((totrecv < std::numeric_limits<int>::max()));
509 
510  std::pair<IT,NT> * sendpair = new std::pair<IT,NT>[totsend];
511  for(int i=0; i<nprocs; ++i)
512  {
513  std::copy(tmppair[i].begin(), tmppair[i].end(), sendpair+sdispls[i]);
514  std::vector< std::pair<IT,NT> >().swap(tmppair[i]); // clear memory
515  }
516  std::vector< std::pair<IT,NT> > recvpair(totrecv);
517  MPI_Alltoallv(sendpair, sendcnt, sdispls, MPI_pair, recvpair.data(), recvcnt, rdispls, MPI_pair, commGrid->GetWorld());
518  delete [] sendpair;
519 
520  IT updated = 0;
521  for(auto & update : recvpair ) // Now, write these to rvec
522  {
523  updated++;
524  rvec.arr[update.first] = update.second;
525  }
526 #ifdef COMBBLAS_DEBUG
527  MPI_Allreduce(MPI_IN_PLACE, &updated, 1, MPIType<IT>(), MPI_SUM, commGrid->GetWorld());
528  if(myrank == 0) std::cout << "Total vector entries updated " << updated << std::endl;
529 #endif
530 
531  /* End of setting up the newly found vector entries */
532 
533  std::vector<IT> newactivecols(toretain.size());
534  std::vector<IT> newklimits(toretain.size());
535  IT newindex = 0;
536  for(auto & retind : toretain )
537  {
538  newactivecols[newindex] = actcolsmap[retind];
539  newklimits[newindex++] = klimits[retind];
540  }
541  actcolsmap.swap(newactivecols);
542  klimits.swap(newklimits);
543  activecols = actcolsmap.size();
544 
545  MPI_Allreduce(&activecols, &totactcols, 1, MPIType<int64_t>(), MPI_SUM, commGrid->GetRowWorld());
546 #ifdef COMBBLAS_DEBUG
547  if(myrank == 0) std::cout << "Number of active columns are " << totactcols << std::endl;
548 #endif
549  }
550  DeleteAll(sendcnt, recvcnt, sdispls, rdispls);
551  MPI_Type_free(&MPI_pair);
552 
553 #ifdef COMBBLAS_DEBUG
554  if(myrank == 0) std::cout << "Exiting kselect2"<< std::endl;
555 #endif
556  return true; // prune needed
557 }
558 
559 
560 
561 template <class IT, class NT, class DER>
562 void SpParMat< IT,NT,DER >::Dump(std::string filename) const
563 {
564  MPI_Comm World = commGrid->GetWorld();
565  int rank = commGrid->GetRank();
566  int nprocs = commGrid->GetSize();
567 
568  MPI_File thefile;
569  char * _fn = const_cast<char*>(filename.c_str());
570  MPI_File_open(World, _fn, MPI_MODE_CREATE | MPI_MODE_WRONLY, MPI_INFO_NULL, &thefile);
571 
572  int rankinrow = commGrid->GetRankInProcRow();
573  int rankincol = commGrid->GetRankInProcCol();
574  int rowneighs = commGrid->GetGridCols(); // get # of processors on the row
575  int colneighs = commGrid->GetGridRows();
576 
577  IT * colcnts = new IT[rowneighs];
578  IT * rowcnts = new IT[colneighs];
579  rowcnts[rankincol] = getlocalrows();
580  colcnts[rankinrow] = getlocalcols();
581 
582  MPI_Allgather(MPI_IN_PLACE, 0, MPIType<IT>(), colcnts, 1, MPIType<IT>(), commGrid->GetRowWorld());
583  IT coloffset = std::accumulate(colcnts, colcnts+rankinrow, static_cast<IT>(0));
584 
585  MPI_Allgather(MPI_IN_PLACE, 0, MPIType<IT>(), rowcnts, 1, MPIType<IT>(), commGrid->GetColWorld());
586  IT rowoffset = std::accumulate(rowcnts, rowcnts+rankincol, static_cast<IT>(0));
587  DeleteAll(colcnts, rowcnts);
588 
589  IT * prelens = new IT[nprocs];
590  prelens[rank] = 2*getlocalnnz();
591  MPI_Allgather(MPI_IN_PLACE, 0, MPIType<IT>(), prelens, 1, MPIType<IT>(), commGrid->GetWorld());
592  IT lengthuntil = std::accumulate(prelens, prelens+rank, static_cast<IT>(0));
593 
594  // The disp displacement argument specifies the position
595  // (absolute offset in bytes from the beginning of the file)
596  MPI_Offset disp = lengthuntil * sizeof(uint32_t);
597  char native[] = "native";
598  MPI_File_set_view(thefile, disp, MPI_UNSIGNED, MPI_UNSIGNED, native, MPI_INFO_NULL); // AL: the second-to-last argument is a non-const char* (looks like poor MPI standardization, the C++ bindings list it as const), C++ string literals MUST be const (especially in c++11).
599  uint32_t * gen_edges = new uint32_t[prelens[rank]];
600 
601  IT k = 0;
602  for(typename DER::SpColIter colit = spSeq->begcol(); colit != spSeq->endcol(); ++colit) // iterate over columns
603  {
604  for(typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit); nzit != spSeq->endnz(colit); ++nzit)
605  {
606  gen_edges[k++] = (uint32_t) (nzit.rowid() + rowoffset);
607  gen_edges[k++] = (uint32_t) (colit.colid() + coloffset);
608  }
609  }
610  assert(k == prelens[rank]);
611  MPI_File_write(thefile, gen_edges, prelens[rank], MPI_UNSIGNED, NULL);
612  MPI_File_close(&thefile);
613 
614  delete [] prelens;
615  delete [] gen_edges;
616 }
617 
618 template <class IT, class NT, class DER>
620 {
621  if(rhs.spSeq != NULL)
622  spSeq = new DER(*(rhs.spSeq)); // Deep copy of local block
623 
624  commGrid = rhs.commGrid;
625 }
626 
627 template <class IT, class NT, class DER>
629 {
630  if(this != &rhs)
631  {
634  if(spSeq != NULL) delete spSeq;
635  if(rhs.spSeq != NULL)
636  spSeq = new DER(*(rhs.spSeq)); // Deep copy of local block
637 
638  commGrid = rhs.commGrid;
639  }
640  return *this;
641 }
642 
643 template <class IT, class NT, class DER>
645 {
646  if(this != &rhs)
647  {
648  if(*commGrid == *rhs.commGrid)
649  {
650  (*spSeq) += (*(rhs.spSeq));
651  }
652  else
653  {
654  std::cout << "Grids are not comparable for parallel addition (A+B)" << std::endl;
655  }
656  }
657  else
658  {
659  std::cout<< "Missing feature (A+A): Use multiply with 2 instead !"<<std::endl;
660  }
661  return *this;
662 }
663 
664 template <class IT, class NT, class DER>
666 {
667  IT totnnz = getnnz(); // collective call
668  IT maxnnz = 0;
669  IT localnnz = (IT) spSeq->getnnz();
670  MPI_Allreduce( &localnnz, &maxnnz, 1, MPIType<IT>(), MPI_MAX, commGrid->GetWorld());
671  if(totnnz == 0) return 1;
672  return static_cast<float>((commGrid->GetSize() * maxnnz)) / static_cast<float>(totnnz);
673 }
674 
675 template <class IT, class NT, class DER>
677 {
678  IT totalnnz = 0;
679  IT localnnz = spSeq->getnnz();
680  MPI_Allreduce( &localnnz, &totalnnz, 1, MPIType<IT>(), MPI_SUM, commGrid->GetWorld());
681  return totalnnz;
682 }
683 
684 template <class IT, class NT, class DER>
686 {
687  IT totalrows = 0;
688  IT localrows = spSeq->getnrow();
689  MPI_Allreduce( &localrows, &totalrows, 1, MPIType<IT>(), MPI_SUM, commGrid->GetColWorld());
690  return totalrows;
691 }
692 
693 template <class IT, class NT, class DER>
695 {
696  IT totalcols = 0;
697  IT localcols = spSeq->getncol();
698  MPI_Allreduce( &localcols, &totalcols, 1, MPIType<IT>(), MPI_SUM, commGrid->GetRowWorld());
699  return totalcols;
700 }
701 
702 template <class IT, class NT, class DER>
703 template <typename _BinaryOperation>
704 void SpParMat<IT,NT,DER>::DimApply(Dim dim, const FullyDistVec<IT, NT>& x, _BinaryOperation __binary_op)
705 {
706 
707  if(!(*commGrid == *(x.commGrid)))
708  {
709  std::cout << "Grids are not comparable for SpParMat::DimApply" << std::endl;
710  MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
711  }
712 
713  MPI_Comm World = x.commGrid->GetWorld();
714  MPI_Comm ColWorld = x.commGrid->GetColWorld();
715  MPI_Comm RowWorld = x.commGrid->GetRowWorld();
716  switch(dim)
717  {
718  case Column: // scale each column
719  {
720  int xsize = (int) x.LocArrSize();
721  int trxsize = 0;
722  int diagneigh = x.commGrid->GetComplementRank();
723  MPI_Status status;
724  MPI_Sendrecv(&xsize, 1, MPI_INT, diagneigh, TRX, &trxsize, 1, MPI_INT, diagneigh, TRX, World, &status);
725 
726  NT * trxnums = new NT[trxsize];
727  MPI_Sendrecv(const_cast<NT*>(SpHelper::p2a(x.arr)), xsize, MPIType<NT>(), diagneigh, TRX, trxnums, trxsize, MPIType<NT>(), diagneigh, TRX, World, &status);
728 
729  int colneighs, colrank;
730  MPI_Comm_size(ColWorld, &colneighs);
731  MPI_Comm_rank(ColWorld, &colrank);
732  int * colsize = new int[colneighs];
733  colsize[colrank] = trxsize;
734 
735  MPI_Allgather(MPI_IN_PLACE, 1, MPI_INT, colsize, 1, MPI_INT, ColWorld);
736  int * dpls = new int[colneighs](); // displacements (zero initialized pid)
737  std::partial_sum(colsize, colsize+colneighs-1, dpls+1);
738  int accsize = std::accumulate(colsize, colsize+colneighs, 0);
739  NT * scaler = new NT[accsize];
740 
741  MPI_Allgatherv(trxnums, trxsize, MPIType<NT>(), scaler, colsize, dpls, MPIType<NT>(), ColWorld);
742  DeleteAll(trxnums,colsize, dpls);
743 
744  for(typename DER::SpColIter colit = spSeq->begcol(); colit != spSeq->endcol(); ++colit) // iterate over columns
745  {
746  for(typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit); nzit != spSeq->endnz(colit); ++nzit)
747  {
748  nzit.value() = __binary_op(nzit.value(), scaler[colit.colid()]);
749  }
750  }
751  delete [] scaler;
752  break;
753  }
754  case Row:
755  {
756  int xsize = (int) x.LocArrSize();
757  int rowneighs, rowrank;
758  MPI_Comm_size(RowWorld, &rowneighs);
759  MPI_Comm_rank(RowWorld, &rowrank);
760  int * rowsize = new int[rowneighs];
761  rowsize[rowrank] = xsize;
762  MPI_Allgather(MPI_IN_PLACE, 1, MPI_INT, rowsize, 1, MPI_INT, RowWorld);
763  int * dpls = new int[rowneighs](); // displacements (zero initialized pid)
764  std::partial_sum(rowsize, rowsize+rowneighs-1, dpls+1);
765  int accsize = std::accumulate(rowsize, rowsize+rowneighs, 0);
766  NT * scaler = new NT[accsize];
767 
768  MPI_Allgatherv(const_cast<NT*>(SpHelper::p2a(x.arr)), xsize, MPIType<NT>(), scaler, rowsize, dpls, MPIType<NT>(), RowWorld);
769  DeleteAll(rowsize, dpls);
770 
771  for(typename DER::SpColIter colit = spSeq->begcol(); colit != spSeq->endcol(); ++colit)
772  {
773  for(typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit); nzit != spSeq->endnz(colit); ++nzit)
774  {
775  nzit.value() = __binary_op(nzit.value(), scaler[nzit.rowid()]);
776  }
777  }
778  delete [] scaler;
779  break;
780  }
781  default:
782  {
783  std::cout << "Unknown scaling dimension, returning..." << std::endl;
784  break;
785  }
786  }
787 }
788 
789 template <class IT, class NT, class DER>
790 template <typename _BinaryOperation, typename _UnaryOperation >
791 FullyDistVec<IT,NT> SpParMat<IT,NT,DER>::Reduce(Dim dim, _BinaryOperation __binary_op, NT id, _UnaryOperation __unary_op) const
792 {
793  IT length;
794  switch(dim)
795  {
796  case Column:
797  {
798  length = getncol();
799  break;
800  }
801  case Row:
802  {
803  length = getnrow();
804  break;
805  }
806  default:
807  {
808  std::cout << "Unknown reduction dimension, returning empty vector" << std::endl;
809  break;
810  }
811  }
812  FullyDistVec<IT,NT> parvec(commGrid, length, id);
813  Reduce(parvec, dim, __binary_op, id, __unary_op);
814  return parvec;
815 }
816 
817 template <class IT, class NT, class DER>
818 template <typename _BinaryOperation>
819 FullyDistVec<IT,NT> SpParMat<IT,NT,DER>::Reduce(Dim dim, _BinaryOperation __binary_op, NT id) const
820 {
821  IT length;
822  switch(dim)
823  {
824  case Column:
825  {
826  length = getncol();
827  break;
828  }
829  case Row:
830  {
831  length = getnrow();
832  break;
833  }
834  default:
835  {
836  std::cout << "Unknown reduction dimension, returning empty vector" << std::endl;
837  break;
838  }
839  }
840  FullyDistVec<IT,NT> parvec(commGrid, length, id);
841  Reduce(parvec, dim, __binary_op, id, myidentity<NT>()); // myidentity<NT>() is a no-op function
842  return parvec;
843 }
844 
845 
846 template <class IT, class NT, class DER>
847 template <typename VT, typename GIT, typename _BinaryOperation>
848 void SpParMat<IT,NT,DER>::Reduce(FullyDistVec<GIT,VT> & rvec, Dim dim, _BinaryOperation __binary_op, VT id) const
849 {
850  Reduce(rvec, dim, __binary_op, id, myidentity<NT>() );
851 }
852 
853 
854 template <class IT, class NT, class DER>
855 template <typename VT, typename GIT, typename _BinaryOperation, typename _UnaryOperation> // GIT: global index type of vector
856 void SpParMat<IT,NT,DER>::Reduce(FullyDistVec<GIT,VT> & rvec, Dim dim, _BinaryOperation __binary_op, VT id, _UnaryOperation __unary_op) const
857 {
858  Reduce(rvec, dim, __binary_op, id, __unary_op, MPIOp<_BinaryOperation, VT>::op() );
859 }
860 
861 
862 template <class IT, class NT, class DER>
863 template <typename VT, typename GIT, typename _BinaryOperation, typename _UnaryOperation> // GIT: global index type of vector
864 void SpParMat<IT,NT,DER>::Reduce(FullyDistVec<GIT,VT> & rvec, Dim dim, _BinaryOperation __binary_op, VT id, _UnaryOperation __unary_op, MPI_Op mympiop) const
865 {
866  if(*rvec.commGrid != *commGrid)
867  {
868  SpParHelper::Print("Grids are not comparable, SpParMat::Reduce() fails!", commGrid->GetWorld());
869  MPI_Abort(MPI_COMM_WORLD,GRIDMISMATCH);
870  }
871  switch(dim)
872  {
873  case Column: // pack along the columns, result is a vector of size n
874  {
875  // We can't use rvec's distribution (rows first, columns later) here
876  // because the ownership model of the vector has the order P(0,0), P(0,1),...
877  // column reduction will first generate vector distribution in P(0,0), P(1,0),... order.
878 
879  IT n_thiscol = getlocalcols(); // length assigned to this processor column
880  int colneighs = commGrid->GetGridRows(); // including oneself
881  int colrank = commGrid->GetRankInProcCol();
882 
883  GIT * loclens = new GIT[colneighs];
884  GIT * lensums = new GIT[colneighs+1](); // begin/end points of local lengths
885 
886  GIT n_perproc = n_thiscol / colneighs; // length on a typical processor
887  if(colrank == colneighs-1)
888  loclens[colrank] = n_thiscol - (n_perproc*colrank);
889  else
890  loclens[colrank] = n_perproc;
891 
892  MPI_Allgather(MPI_IN_PLACE, 0, MPIType<GIT>(), loclens, 1, MPIType<GIT>(), commGrid->GetColWorld());
893  std::partial_sum(loclens, loclens+colneighs, lensums+1); // loclens and lensums are different, but both would fit in 32-bits
894 
895  std::vector<VT> trarr;
896  typename DER::SpColIter colit = spSeq->begcol();
897  for(int i=0; i< colneighs; ++i)
898  {
899  VT * sendbuf = new VT[loclens[i]];
900  std::fill(sendbuf, sendbuf+loclens[i], id); // fill with identity
901 
902  for(; colit != spSeq->endcol() && colit.colid() < lensums[i+1]; ++colit) // iterate over a portion of columns
903  {
904  for(typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit); nzit != spSeq->endnz(colit); ++nzit) // all nonzeros in this column
905  {
906  sendbuf[colit.colid()-lensums[i]] = __binary_op(static_cast<VT>(__unary_op(nzit.value())), sendbuf[colit.colid()-lensums[i]]);
907  }
908  }
909 
910  VT * recvbuf = NULL;
911  if(colrank == i)
912  {
913  trarr.resize(loclens[i]);
914  recvbuf = SpHelper::p2a(trarr);
915  }
916  MPI_Reduce(sendbuf, recvbuf, loclens[i], MPIType<VT>(), mympiop, i, commGrid->GetColWorld()); // root = i
917  delete [] sendbuf;
918  }
919  DeleteAll(loclens, lensums);
920 
921  GIT reallen; // Now we have to transpose the vector
922  GIT trlen = trarr.size();
923  int diagneigh = commGrid->GetComplementRank();
924  MPI_Status status;
925  MPI_Sendrecv(&trlen, 1, MPIType<IT>(), diagneigh, TRNNZ, &reallen, 1, MPIType<IT>(), diagneigh, TRNNZ, commGrid->GetWorld(), &status);
926 
927  rvec.arr.resize(reallen);
928  MPI_Sendrecv(SpHelper::p2a(trarr), trlen, MPIType<VT>(), diagneigh, TRX, SpHelper::p2a(rvec.arr), reallen, MPIType<VT>(), diagneigh, TRX, commGrid->GetWorld(), &status);
929  rvec.glen = getncol(); // ABAB: Put a sanity check here
930  break;
931 
932  }
933  case Row: // pack along the rows, result is a vector of size m
934  {
935  rvec.glen = getnrow();
936  int rowneighs = commGrid->GetGridCols();
937  int rowrank = commGrid->GetRankInProcRow();
938  GIT * loclens = new GIT[rowneighs];
939  GIT * lensums = new GIT[rowneighs+1](); // begin/end points of local lengths
940  loclens[rowrank] = rvec.MyLocLength();
941  MPI_Allgather(MPI_IN_PLACE, 0, MPIType<GIT>(), loclens, 1, MPIType<GIT>(), commGrid->GetRowWorld());
942  std::partial_sum(loclens, loclens+rowneighs, lensums+1);
943  try
944  {
945  rvec.arr.resize(loclens[rowrank], id);
946 
947  // keeping track of all nonzero iterators within columns at once is unscalable w.r.t. memory (due to sqrt(p) scaling)
948  // thus we'll do batches of column as opposed to all columns at once. 5 million columns take 80MB (two pointers per column)
949  #define MAXCOLUMNBATCH 5 * 1024 * 1024
950  typename DER::SpColIter begfinger = spSeq->begcol(); // beginning finger to columns
951 
952  // Each processor on the same processor row should execute the SAME number of reduce calls
953  int numreducecalls = (int) ceil(static_cast<float>(spSeq->getnzc()) / static_cast<float>(MAXCOLUMNBATCH));
954  int maxreducecalls;
955  MPI_Allreduce( &numreducecalls, &maxreducecalls, 1, MPI_INT, MPI_MAX, commGrid->GetRowWorld());
956 
957  for(int k=0; k< maxreducecalls; ++k)
958  {
959  std::vector<typename DER::SpColIter::NzIter> nziters;
960  typename DER::SpColIter curfinger = begfinger;
961  for(; curfinger != spSeq->endcol() && nziters.size() < MAXCOLUMNBATCH ; ++curfinger)
962  {
963  nziters.push_back(spSeq->begnz(curfinger));
964  }
965  for(int i=0; i< rowneighs; ++i) // step by step to save memory
966  {
967  VT * sendbuf = new VT[loclens[i]];
968  std::fill(sendbuf, sendbuf+loclens[i], id); // fill with identity
969 
970  typename DER::SpColIter colit = begfinger;
971  IT colcnt = 0; // "processed column" counter
972  for(; colit != curfinger; ++colit, ++colcnt) // iterate over this batch of columns until curfinger
973  {
974  typename DER::SpColIter::NzIter nzit = nziters[colcnt];
975  for(; nzit != spSeq->endnz(colit) && nzit.rowid() < lensums[i+1]; ++nzit) // a portion of nonzeros in this column
976  {
977  sendbuf[nzit.rowid()-lensums[i]] = __binary_op(static_cast<VT>(__unary_op(nzit.value())), sendbuf[nzit.rowid()-lensums[i]]);
978  }
979  nziters[colcnt] = nzit; // set the new finger
980  }
981 
982  VT * recvbuf = NULL;
983  if(rowrank == i)
984  {
985  for(int j=0; j< loclens[i]; ++j)
986  {
987  sendbuf[j] = __binary_op(rvec.arr[j], sendbuf[j]); // rvec.arr will be overriden with MPI_Reduce, save its contents
988  }
989  recvbuf = SpHelper::p2a(rvec.arr);
990  }
991  MPI_Reduce(sendbuf, recvbuf, loclens[i], MPIType<VT>(), mympiop, i, commGrid->GetRowWorld()); // root = i
992  delete [] sendbuf;
993  }
994  begfinger = curfinger; // set the next begfilter
995  }
996  DeleteAll(loclens, lensums);
997  }
998  catch (std::length_error& le)
999  {
1000  std::cerr << "Length error: " << le.what() << std::endl;
1001  }
1002  break;
1003  }
1004  default:
1005  {
1006  std::cout << "Unknown reduction dimension, returning empty vector" << std::endl;
1007  break;
1008  }
1009  }
1010 }
1011 
1012 #ifndef KSELECTLIMIT
1013 #define KSELECTLIMIT 10000
1014 #endif
1015 
1021 template <class IT, class NT, class DER>
1022 template <typename VT, typename GIT>
1023 bool SpParMat<IT,NT,DER>::Kselect(FullyDistSpVec<GIT,VT> & kth, IT k_limit, int kselectVersion) const
1024 {
1025 #ifdef COMBBLAS_DEBUG
1026  FullyDistVec<GIT,VT> test1(kth.getcommgrid());
1027  FullyDistVec<GIT,VT> test2(kth.getcommgrid());
1028  Kselect1(test1, k_limit, myidentity<NT>());
1029  Kselect2(test2, k_limit);
1030  if(test1 == test2)
1031  SpParHelper::Print("Kselect1 and Kselect2 producing same results\n");
1032  else
1033  {
1034  SpParHelper::Print("WARNING: Kselect1 and Kselect2 producing DIFFERENT results\n");
1035  test1.PrintToFile("test1");
1036  test2.PrintToFile("test2");
1037  }
1038 #endif
1039 
1040  if(kselectVersion==1 || k_limit < KSELECTLIMIT)
1041  {
1042  return Kselect1(kth, k_limit, myidentity<NT>());
1043  }
1044  else
1045  {
1046  FullyDistVec<GIT,VT> kthAll ( getcommgrid());
1047  bool ret = Kselect2(kthAll, k_limit);
1048  FullyDistSpVec<GIT,VT> temp = EWiseApply<VT>(kth, kthAll,
1049  [](VT spval, VT dval){return dval;},
1050  [](VT spval, VT dval){return true;},
1051  false, NT());
1052  kth = temp;
1053  return ret;
1054  }
1055 }
1056 
1057 
1061 template <class IT, class NT, class DER>
1062 template <typename VT, typename GIT>
1063 bool SpParMat<IT,NT,DER>::Kselect(FullyDistVec<GIT,VT> & rvec, IT k_limit, int kselectVersion) const
1064 {
1065 #ifdef COMBBLAS_DEBUG
1066  FullyDistVec<GIT,VT> test1(rvec.getcommgrid());
1067  FullyDistVec<GIT,VT> test2(rvec.getcommgrid());
1068  Kselect1(test1, k_limit, myidentity<NT>());
1069  Kselect2(test2, k_limit);
1070  if(test1 == test2)
1071  SpParHelper::Print("Kselect1 and Kselect2 producing same results\n");
1072  else
1073  {
1074  SpParHelper::Print("WARNING: Kselect1 and Kselect2 producing DIFFERENT results\n");
1075  //test1.PrintToFile("test1");
1076  //test2.PrintToFile("test2");
1077  }
1078 #endif
1079 
1080  if(kselectVersion==1 || k_limit < KSELECTLIMIT)
1081  return Kselect1(rvec, k_limit, myidentity<NT>());
1082  else
1083  return Kselect2(rvec, k_limit);
1084 
1085 }
1086 
1087 /* identify the k-th maximum element in each column of a matrix
1088 ** if the number of nonzeros in a column is less than or equal to k, return minimum entry
1089 ** Caution: this is a preliminary implementation: needs 3*(n/sqrt(p))*k memory per processor
1090 ** this memory requirement is too high for larger k
1091  */
1092 template <class IT, class NT, class DER>
1093 template <typename VT, typename GIT, typename _UnaryOperation> // GIT: global index type of vector
1094 bool SpParMat<IT,NT,DER>::Kselect1(FullyDistVec<GIT,VT> & rvec, IT k, _UnaryOperation __unary_op) const
1095 {
1096  if(*rvec.commGrid != *commGrid)
1097  {
1098  SpParHelper::Print("Grids are not comparable, SpParMat::Kselect() fails!", commGrid->GetWorld());
1099  MPI_Abort(MPI_COMM_WORLD,GRIDMISMATCH);
1100  }
1101 
1102  FullyDistVec<IT, IT> nnzPerColumn (getcommgrid());
1103  Reduce(nnzPerColumn, Column, std::plus<IT>(), (IT)0, [](NT val){return (IT)1;});
1104  IT maxnnzPerColumn = nnzPerColumn.Reduce(maximum<IT>(), (IT)0);
1105  if(k>maxnnzPerColumn)
1106  {
1107  SpParHelper::Print("Kselect: k is greater then maxNnzInColumn. Calling Reduce instead...\n");
1108  Reduce(rvec, Column, minimum<NT>(), static_cast<NT>(0));
1109  return false;
1110  }
1111 
1112  IT n_thiscol = getlocalcols(); // length (number of columns) assigned to this processor (and processor column)
1113 
1114  // check, memory should be min(n_thiscol*k, local nnz)
1115  // hence we will not overflow for very large k
1116  std::vector<VT> sendbuf(n_thiscol*k);
1117  std::vector<IT> send_coldisp(n_thiscol+1,0);
1118  std::vector<IT> local_coldisp(n_thiscol+1,0);
1119 
1120 
1121  //displacement of local columns
1122  //local_coldisp is the displacement of all nonzeros per column
1123  //send_coldisp is the displacement of k nonzeros per column
1124  IT nzc = 0;
1125  if(spSeq->getnnz()>0)
1126  {
1127  typename DER::SpColIter colit = spSeq->begcol();
1128  for(IT i=0; i<n_thiscol; ++i)
1129  {
1130  local_coldisp[i+1] = local_coldisp[i];
1131  send_coldisp[i+1] = send_coldisp[i];
1132  if(i==colit.colid())
1133  {
1134  local_coldisp[i+1] += colit.nnz();
1135  if(colit.nnz()>=k)
1136  send_coldisp[i+1] += k;
1137  else
1138  send_coldisp[i+1] += colit.nnz();
1139  colit++;
1140  nzc++;
1141  }
1142  }
1143  }
1144  assert(local_coldisp[n_thiscol] == spSeq->getnnz());
1145 
1146  // a copy of local part of the matrix
1147  // this can be avoided if we write our own local kselect function instead of using partial_sort
1148  std::vector<VT> localmat(spSeq->getnnz());
1149 
1150 
1151 #ifdef THREADED
1152 #pragma omp parallel for
1153 #endif
1154  for(IT i=0; i<nzc; i++)
1155  //for(typename DER::SpColIter colit = spSeq->begcol(); colit != spSeq->endcol(); ++colit) // iterate over columns
1156  {
1157  typename DER::SpColIter colit = spSeq->begcol() + i;
1158  IT colid = colit.colid();
1159  IT idx = local_coldisp[colid];
1160  for(typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit); nzit < spSeq->endnz(colit); ++nzit)
1161  {
1162  localmat[idx++] = static_cast<VT>(__unary_op(nzit.value()));
1163  }
1164 
1165  if(colit.nnz()<=k)
1166  {
1167  std::sort(localmat.begin()+local_coldisp[colid], localmat.begin()+local_coldisp[colid+1], std::greater<VT>());
1168  std::copy(localmat.begin()+local_coldisp[colid], localmat.begin()+local_coldisp[colid+1], sendbuf.begin()+send_coldisp[colid]);
1169  }
1170  else
1171  {
1172  std::partial_sort(localmat.begin()+local_coldisp[colid], localmat.begin()+local_coldisp[colid]+k, localmat.begin()+local_coldisp[colid+1], std::greater<VT>());
1173  std::copy(localmat.begin()+local_coldisp[colid], localmat.begin()+local_coldisp[colid]+k, sendbuf.begin()+send_coldisp[colid]);
1174  }
1175  }
1176 
1177  std::vector<VT>().swap(localmat);
1178  std::vector<IT>().swap(local_coldisp);
1179 
1180  std::vector<VT> recvbuf(n_thiscol*k);
1181  std::vector<VT> tempbuf(n_thiscol*k);
1182  std::vector<IT> recv_coldisp(n_thiscol+1);
1183  std::vector<IT> templen(n_thiscol);
1184 
1185  int colneighs = commGrid->GetGridRows();
1186  int colrank = commGrid->GetRankInProcCol();
1187 
1188  for(int p=2; p <= colneighs; p*=2)
1189  {
1190 
1191  if(colrank%p == p/2) // this processor is a sender in this round
1192  {
1193  int receiver = colrank - ceil(p/2);
1194  MPI_Send(send_coldisp.data(), n_thiscol+1, MPIType<IT>(), receiver, 0, commGrid->GetColWorld());
1195  MPI_Send(sendbuf.data(), send_coldisp[n_thiscol], MPIType<VT>(), receiver, 1, commGrid->GetColWorld());
1196  //break;
1197  }
1198  else if(colrank%p == 0) // this processor is a receiver in this round
1199  {
1200  int sender = colrank + ceil(p/2);
1201  if(sender < colneighs)
1202  {
1203 
1204  MPI_Recv(recv_coldisp.data(), n_thiscol+1, MPIType<IT>(), sender, 0, commGrid->GetColWorld(), MPI_STATUS_IGNORE);
1205  MPI_Recv(recvbuf.data(), recv_coldisp[n_thiscol], MPIType<VT>(), sender, 1, commGrid->GetColWorld(), MPI_STATUS_IGNORE);
1206 
1207 
1208 
1209 #ifdef THREADED
1210 #pragma omp parallel for
1211 #endif
1212  for(IT i=0; i<n_thiscol; ++i)
1213  {
1214  // partial merge until first k elements
1215  IT j=send_coldisp[i], l=recv_coldisp[i];
1216  //IT templen[i] = k*i;
1217  IT offset = k*i;
1218  IT lid = 0;
1219  for(; j<send_coldisp[i+1] && l<recv_coldisp[i+1] && lid<k;)
1220  {
1221  if(sendbuf[j] > recvbuf[l]) // decision
1222  tempbuf[offset+lid++] = sendbuf[j++];
1223  else
1224  tempbuf[offset+lid++] = recvbuf[l++];
1225  }
1226  while(j<send_coldisp[i+1] && lid<k) tempbuf[offset+lid++] = sendbuf[j++];
1227  while(l<recv_coldisp[i+1] && lid<k) tempbuf[offset+lid++] = recvbuf[l++];
1228  templen[i] = lid;
1229  }
1230 
1231  send_coldisp[0] = 0;
1232  for(IT i=0; i<n_thiscol; i++)
1233  {
1234  send_coldisp[i+1] = send_coldisp[i] + templen[i];
1235  }
1236 
1237 
1238 #ifdef THREADED
1239 #pragma omp parallel for
1240 #endif
1241  for(IT i=0; i<n_thiscol; i++) // direct copy
1242  {
1243  IT offset = k*i;
1244  std::copy(tempbuf.begin()+offset, tempbuf.begin()+offset+templen[i], sendbuf.begin() + send_coldisp[i]);
1245  }
1246  }
1247  }
1248  }
1249  MPI_Barrier(commGrid->GetWorld());
1250  std::vector<VT> kthItem(n_thiscol);
1251 
1252  int root = commGrid->GetDiagOfProcCol();
1253  if(root==0 && colrank==0) // rank 0
1254  {
1255 #ifdef THREADED
1256 #pragma omp parallel for
1257 #endif
1258  for(IT i=0; i<n_thiscol; i++)
1259  {
1260  IT nitems = send_coldisp[i+1]-send_coldisp[i];
1261  if(nitems >= k)
1262  kthItem[i] = sendbuf[send_coldisp[i]+k-1];
1263  else
1264  kthItem[i] = std::numeric_limits<VT>::min(); // return minimum possible value if a column is empty or has less than k elements
1265  }
1266  }
1267  else if(root>0 && colrank==0) // send to the diagonl processor of this processor column
1268  {
1269 #ifdef THREADED
1270 #pragma omp parallel for
1271 #endif
1272  for(IT i=0; i<n_thiscol; i++)
1273  {
1274  IT nitems = send_coldisp[i+1]-send_coldisp[i];
1275  if(nitems >= k)
1276  kthItem[i] = sendbuf[send_coldisp[i]+k-1];
1277  else
1278  kthItem[i] = std::numeric_limits<VT>::min(); // return minimum possible value if a column is empty or has less than k elements
1279  }
1280  MPI_Send(kthItem.data(), n_thiscol, MPIType<VT>(), root, 0, commGrid->GetColWorld());
1281  }
1282  else if(root>0 && colrank==root)
1283  {
1284  MPI_Recv(kthItem.data(), n_thiscol, MPIType<VT>(), 0, 0, commGrid->GetColWorld(), MPI_STATUS_IGNORE);
1285  }
1286 
1287  std::vector <int> sendcnts;
1288  std::vector <int> dpls;
1289  if(colrank==root)
1290  {
1291  int proccols = commGrid->GetGridCols();
1292  IT n_perproc = n_thiscol / proccols;
1293  sendcnts.resize(proccols);
1294  std::fill(sendcnts.data(), sendcnts.data()+proccols-1, n_perproc);
1295  sendcnts[proccols-1] = n_thiscol - (n_perproc * (proccols-1));
1296  dpls.resize(proccols,0); // displacements (zero initialized pid)
1297  std::partial_sum(sendcnts.data(), sendcnts.data()+proccols-1, dpls.data()+1);
1298  }
1299 
1300  int rowroot = commGrid->GetDiagOfProcRow();
1301  int recvcnts = 0;
1302  // scatter received data size
1303  MPI_Scatter(sendcnts.data(),1, MPI_INT, & recvcnts, 1, MPI_INT, rowroot, commGrid->GetRowWorld());
1304 
1305  rvec.arr.resize(recvcnts);
1306  MPI_Scatterv(kthItem.data(),sendcnts.data(), dpls.data(), MPIType<VT>(), rvec.arr.data(), rvec.arr.size(), MPIType<VT>(),rowroot, commGrid->GetRowWorld());
1307  rvec.glen = getncol();
1308  return true;
1309 }
1310 
1311 
1312 
1313 template <class IT, class NT, class DER>
1314 template <typename VT, typename GIT, typename _UnaryOperation> // GIT: global index type of vector
1315 bool SpParMat<IT,NT,DER>::Kselect1(FullyDistSpVec<GIT,VT> & rvec, IT k, _UnaryOperation __unary_op) const
1316 {
1317  if(*rvec.commGrid != *commGrid)
1318  {
1319  SpParHelper::Print("Grids are not comparable, SpParMat::Kselect() fails!", commGrid->GetWorld());
1320  MPI_Abort(MPI_COMM_WORLD,GRIDMISMATCH);
1321  }
1322 
1323  /*
1324  FullyDistVec<IT, IT> nnzPerColumn (getcommgrid());
1325  Reduce(nnzPerColumn, Column, plus<IT>(), (IT)0, [](NT val){return (IT)1;});
1326  IT maxnnzPerColumn = nnzPerColumn.Reduce(maximum<IT>(), (IT)0);
1327  if(k>maxnnzPerColumn)
1328  {
1329  SpParHelper::Print("Kselect: k is greater then maxNnzInColumn. Calling Reduce instead...\n");
1330  Reduce(rvec, Column, minimum<NT>(), static_cast<NT>(0));
1331  return false;
1332  }
1333  */
1334 
1335  IT n_thiscol = getlocalcols(); // length (number of columns) assigned to this processor (and processor column)
1336  MPI_Comm World = rvec.commGrid->GetWorld();
1337  MPI_Comm ColWorld = rvec.commGrid->GetColWorld();
1338  MPI_Comm RowWorld = rvec.commGrid->GetRowWorld();
1339  int colneighs = commGrid->GetGridRows();
1340  int colrank = commGrid->GetRankInProcCol();
1341  int coldiagrank = commGrid->GetDiagOfProcCol();
1342 
1343 
1344  //replicate sparse indices along processor column
1345  int accnz;
1346  int32_t trxlocnz;
1347  GIT lenuntil;
1348  int32_t *trxinds, *activeCols;
1349  VT *trxnums, *numacc;
1350  TransposeVector(World, rvec, trxlocnz, lenuntil, trxinds, trxnums, true);
1351 
1352  if(rvec.commGrid->GetGridRows() > 1)
1353  {
1354  //TODO: we only need to communicate indices
1355  AllGatherVector(ColWorld, trxlocnz, lenuntil, trxinds, trxnums, activeCols, numacc, accnz, true); // trxindS/trxnums deallocated, indacc/numacc allocated, accnz set
1356  }
1357  else
1358  {
1359  accnz = trxlocnz;
1360  activeCols = trxinds; //aliasing ptr
1361  numacc = trxnums; //aliasing ptr
1362  }
1363 
1364 
1365  std::vector<bool> isactive(n_thiscol,false);
1366  for(int i=0; i<accnz ; i++)
1367  {
1368  isactive[activeCols[i]] = true;
1369  //cout << indacc[i] << " ";
1370  }
1371  IT nActiveCols = accnz;//count_if(isactive.begin(), isactive.end(), [](bool ac){return ac;});
1372  // check, memory should be min(n_thiscol*k, local nnz)
1373  // hence we will not overflow for very large k
1374 
1375  std::vector<IT> send_coldisp(n_thiscol+1,0);
1376  std::vector<IT> local_coldisp(n_thiscol+1,0);
1377  //vector<VT> sendbuf(nActiveCols*k);
1378  VT * sendbuf = static_cast<VT *> (::operator new (n_thiscol*k*sizeof(VT)));
1379 
1380 
1381  //displacement of local columns
1382  //local_coldisp is the displacement of all nonzeros per column
1383  //send_coldisp is the displacement of k nonzeros per column
1384  IT nzc = 0;
1385  if(spSeq->getnnz()>0)
1386  {
1387  typename DER::SpColIter colit = spSeq->begcol();
1388  for(IT i=0; i<n_thiscol; ++i)
1389  {
1390  local_coldisp[i+1] = local_coldisp[i];
1391  send_coldisp[i+1] = send_coldisp[i];
1392  if(i==colit.colid())
1393  {
1394  if(isactive[i])
1395  {
1396  local_coldisp[i+1] += colit.nnz();
1397  if(colit.nnz()>=k)
1398  send_coldisp[i+1] += k;
1399  else
1400  send_coldisp[i+1] += colit.nnz();
1401  }
1402  colit++;
1403  nzc++;
1404  }
1405  }
1406  }
1407 
1408  // a copy of local part of the matrix
1409  // this can be avoided if we write our own local kselect function instead of using partial_sort
1410  //vector<VT> localmat(local_coldisp[n_thiscol]);
1411  VT * localmat = static_cast<VT *> (::operator new (local_coldisp[n_thiscol]*sizeof(VT)));
1412 
1413 
1414 #ifdef THREADED
1415 #pragma omp parallel for
1416 #endif
1417  for(IT i=0; i<nzc; i++)
1418  //for(typename DER::SpColIter colit = spSeq->begcol(); colit != spSeq->endcol(); ++colit) // iterate over columns
1419  {
1420  typename DER::SpColIter colit = spSeq->begcol() + i;
1421  IT colid = colit.colid();
1422  if(isactive[colid])
1423  {
1424  IT idx = local_coldisp[colid];
1425  for(typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit); nzit < spSeq->endnz(colit); ++nzit)
1426  {
1427  localmat[idx++] = static_cast<VT>(__unary_op(nzit.value()));
1428  }
1429 
1430  if(colit.nnz()<=k)
1431  {
1432  sort(localmat+local_coldisp[colid], localmat+local_coldisp[colid+1], std::greater<VT>());
1433  std::copy(localmat+local_coldisp[colid], localmat+local_coldisp[colid+1], sendbuf+send_coldisp[colid]);
1434  }
1435  else
1436  {
1437  partial_sort(localmat+local_coldisp[colid], localmat+local_coldisp[colid]+k, localmat+local_coldisp[colid+1], std::greater<VT>());
1438  std::copy(localmat+local_coldisp[colid], localmat+local_coldisp[colid]+k, sendbuf+send_coldisp[colid]);
1439  }
1440  }
1441  }
1442 
1443 
1444  //vector<VT>().swap(localmat);
1445  ::operator delete(localmat);
1446  std::vector<IT>().swap(local_coldisp);
1447 
1448  VT * recvbuf = static_cast<VT *> (::operator new (n_thiscol*k*sizeof(VT)));
1449  VT * tempbuf = static_cast<VT *> (::operator new (n_thiscol*k*sizeof(VT)));
1450  //vector<VT> recvbuf(n_thiscol*k);
1451  //vector<VT> tempbuf(n_thiscol*k);
1452  std::vector<IT> recv_coldisp(n_thiscol+1);
1453  std::vector<IT> templen(n_thiscol);
1454 
1455 
1456 
1457  for(int p=2; p <= colneighs; p*=2)
1458  {
1459 
1460  if(colrank%p == p/2) // this processor is a sender in this round
1461  {
1462  int receiver = colrank - ceil(p/2);
1463  MPI_Send(send_coldisp.data(), n_thiscol+1, MPIType<IT>(), receiver, 0, commGrid->GetColWorld());
1464  MPI_Send(sendbuf, send_coldisp[n_thiscol], MPIType<VT>(), receiver, 1, commGrid->GetColWorld());
1465  //break;
1466  }
1467  else if(colrank%p == 0) // this processor is a receiver in this round
1468  {
1469  int sender = colrank + ceil(p/2);
1470  if(sender < colneighs)
1471  {
1472 
1473  MPI_Recv(recv_coldisp.data(), n_thiscol+1, MPIType<IT>(), sender, 0, commGrid->GetColWorld(), MPI_STATUS_IGNORE);
1474  MPI_Recv(recvbuf, recv_coldisp[n_thiscol], MPIType<VT>(), sender, 1, commGrid->GetColWorld(), MPI_STATUS_IGNORE);
1475 
1476 
1477 
1478 #ifdef THREADED
1479 #pragma omp parallel for
1480 #endif
1481  for(IT i=0; i<n_thiscol; ++i)
1482  {
1483  // partial merge until first k elements
1484  IT j=send_coldisp[i], l=recv_coldisp[i];
1485  //IT templen[i] = k*i;
1486  IT offset = k*i;
1487  IT lid = 0;
1488  for(; j<send_coldisp[i+1] && l<recv_coldisp[i+1] && lid<k;)
1489  {
1490  if(sendbuf[j] > recvbuf[l]) // decision
1491  tempbuf[offset+lid++] = sendbuf[j++];
1492  else
1493  tempbuf[offset+lid++] = recvbuf[l++];
1494  }
1495  while(j<send_coldisp[i+1] && lid<k) tempbuf[offset+lid++] = sendbuf[j++];
1496  while(l<recv_coldisp[i+1] && lid<k) tempbuf[offset+lid++] = recvbuf[l++];
1497  templen[i] = lid;
1498  }
1499 
1500  send_coldisp[0] = 0;
1501  for(IT i=0; i<n_thiscol; i++)
1502  {
1503  send_coldisp[i+1] = send_coldisp[i] + templen[i];
1504  }
1505 
1506 
1507 #ifdef THREADED
1508 #pragma omp parallel for
1509 #endif
1510  for(IT i=0; i<n_thiscol; i++) // direct copy
1511  {
1512  IT offset = k*i;
1513  std::copy(tempbuf+offset, tempbuf+offset+templen[i], sendbuf + send_coldisp[i]);
1514  }
1515  }
1516  }
1517  }
1518  MPI_Barrier(commGrid->GetWorld());
1519 
1520 
1521 
1522 
1523  /*--------------------------------------------------------
1524  At this point, top k elements in every active column
1525  are gathered on the first processor row, P(0,:).
1526 
1527  Next step: At P(0,i) find the kth largest element in
1528  active columns belonging to P(0,i).
1529  If nnz in a column is less than k, keep the largest nonzero.
1530  If a column is empty, keep the lowest numeric value.
1531  --------------------------------------------------------*/
1532 
1533  std::vector<VT> kthItem(nActiveCols); // kth elements of local active columns
1534  if(colrank==0)
1535  {
1536 #ifdef THREADED
1537 #pragma omp parallel for
1538 #endif
1539  for(IT i=0; i<nActiveCols; i++)
1540  {
1541  IT ai = activeCols[i]; // active column index
1542  IT nitems = send_coldisp[ai+1]-send_coldisp[ai];
1543  if(nitems >= k)
1544  kthItem[i] = sendbuf[send_coldisp[ai]+k-1];
1545  else if (nitems==0)
1546  kthItem[i] = std::numeric_limits<VT>::min(); // return minimum possible value if a column is empty
1547  else
1548  kthItem[i] = sendbuf[send_coldisp[ai+1]-1]; // returning the last entry if nnz in this column is less than k
1549 
1550  }
1551  }
1552 
1553  /*--------------------------------------------------------
1554  At this point, kth largest elements in every active column
1555  are gathered on the first processor row, P(0,:).
1556 
1557  Next step: Send the kth largest elements from P(0,i) to P(i,i)
1558  Nothing to do for P(0,0)
1559  --------------------------------------------------------*/
1560  if(coldiagrank>0 && colrank==0)
1561  {
1562  MPI_Send(kthItem.data(), nActiveCols, MPIType<VT>(), coldiagrank, 0, commGrid->GetColWorld());
1563  }
1564  else if(coldiagrank>0 && colrank==coldiagrank) // receive in the diagonal processor
1565  {
1566  MPI_Recv(kthItem.data(), nActiveCols, MPIType<VT>(), 0, 0, commGrid->GetColWorld(), MPI_STATUS_IGNORE);
1567  }
1568 
1569  /*--------------------------------------------------------
1570  At this point, kth largest elements in every active column
1571  are gathered on the diagonal processors P(i,i).
1572 
1573  Next step: Scatter the kth largest elements from P(i,i)
1574  to all processors in the ith row, P(i,:).
1575  Each processor recevies exactly local nnz of rvec entries
1576  so that the received data can be directly put in rvec.
1577  --------------------------------------------------------*/
1578  int rowroot = commGrid->GetDiagOfProcRow();
1579  int proccols = commGrid->GetGridCols();
1580  std::vector <int> sendcnts(proccols,0);
1581  std::vector <int> dpls(proccols,0);
1582  int lsize = rvec.ind.size();
1583  // local sizes of the input vecotor will be sent from the doagonal processor
1584  MPI_Gather(&lsize,1, MPI_INT, sendcnts.data(), 1, MPI_INT, rowroot, RowWorld);
1585  std::partial_sum(sendcnts.data(), sendcnts.data()+proccols-1, dpls.data()+1);
1586  MPI_Scatterv(kthItem.data(),sendcnts.data(), dpls.data(), MPIType<VT>(), rvec.num.data(), rvec.num.size(), MPIType<VT>(),rowroot, RowWorld);
1587 
1588  ::operator delete(sendbuf);
1589  ::operator delete(recvbuf);
1590  ::operator delete(tempbuf);
1591 
1592 
1593  return true;
1594 }
1595 
1596 // only defined for symmetric matrix
1597 template <class IT, class NT, class DER>
1599 {
1600  IT upperlBW = -1;
1601  IT lowerlBW = -1;
1602  IT m_perproc = getnrow() / commGrid->GetGridRows();
1603  IT n_perproc = getncol() / commGrid->GetGridCols();
1604  IT moffset = commGrid->GetRankInProcCol() * m_perproc;
1605  IT noffset = commGrid->GetRankInProcRow() * n_perproc;
1606 
1607  for(typename DER::SpColIter colit = spSeq->begcol(); colit != spSeq->endcol(); ++colit) // iterate over columns
1608  {
1609  IT diagrow = colit.colid() + noffset;
1610  typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit);
1611  if(nzit != spSeq->endnz(colit)) // nonempty column
1612  {
1613  IT firstrow = nzit.rowid() + moffset;
1614  IT lastrow = (nzit+ colit.nnz()-1).rowid() + moffset;
1615 
1616  if(firstrow <= diagrow) // upper diagonal
1617  {
1618  IT dev = diagrow - firstrow;
1619  if(upperlBW < dev) upperlBW = dev;
1620  }
1621  if(lastrow >= diagrow) // lower diagonal
1622  {
1623  IT dev = lastrow - diagrow;
1624  if(lowerlBW < dev) lowerlBW = dev;
1625  }
1626  }
1627  }
1628  IT upperBW;
1629  //IT lowerBW;
1630  MPI_Allreduce( &upperlBW, &upperBW, 1, MPIType<IT>(), MPI_MAX, commGrid->GetWorld());
1631  //MPI_Allreduce( &lowerlBW, &lowerBW, 1, MPIType<IT>(), MPI_MAX, commGrid->GetWorld());
1632 
1633  //return (upperBW + lowerBW + 1);
1634  return (upperBW);
1635 }
1636 
1637 
1638 
1639 // only defined for symmetric matrix
1640 template <class IT, class NT, class DER>
1642 {
1643  int colrank = commGrid->GetRankInProcRow();
1644  IT cols = getncol();
1645  IT rows = getnrow();
1646  IT m_perproc = cols / commGrid->GetGridRows();
1647  IT n_perproc = rows / commGrid->GetGridCols();
1648  IT moffset = commGrid->GetRankInProcCol() * m_perproc;
1649  IT noffset = colrank * n_perproc;
1650 
1651 
1652  int pc = commGrid->GetGridCols();
1653  IT n_thisproc;
1654  if(colrank!=pc-1 ) n_thisproc = n_perproc;
1655  else n_thisproc = cols - (pc-1)*n_perproc;
1656 
1657 
1658  std::vector<IT> firstRowInCol(n_thisproc,getnrow());
1659  std::vector<IT> lastRowInCol(n_thisproc,-1);
1660 
1661  for(typename DER::SpColIter colit = spSeq->begcol(); colit != spSeq->endcol(); ++colit) // iterate over columns
1662  {
1663  IT diagrow = colit.colid() + noffset;
1664  typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit);
1665  if(nzit != spSeq->endnz(colit)) // nonempty column
1666  {
1667  IT firstrow = nzit.rowid() + moffset;
1668  IT lastrow = (nzit+ colit.nnz()-1).rowid() + moffset;
1669  if(firstrow <= diagrow) // upper diagonal
1670  {
1671  firstRowInCol[colit.colid()] = firstrow;
1672  }
1673  if(lastrow >= diagrow) // lower diagonal
1674  {
1675  lastRowInCol[colit.colid()] = lastrow;
1676  }
1677  }
1678  }
1679 
1680  std::vector<IT> firstRowInCol_global(n_thisproc,getnrow());
1681  //vector<IT> lastRowInCol_global(n_thisproc,-1);
1682  MPI_Allreduce( firstRowInCol.data(), firstRowInCol_global.data(), n_thisproc, MPIType<IT>(), MPI_MIN, commGrid->colWorld);
1683  //MPI_Allreduce( lastRowInCol.data(), lastRowInCol_global.data(), n_thisproc, MPIType<IT>(), MPI_MAX, commGrid->GetColWorld());
1684 
1685  IT profile = 0;
1686  for(IT i=0; i<n_thisproc; i++)
1687  {
1688  if(firstRowInCol_global[i]==rows) // empty column
1689  profile++;
1690  else
1691  profile += (i + noffset - firstRowInCol_global[i]);
1692  }
1693 
1694  IT profile_global = 0;
1695  MPI_Allreduce( &profile, &profile_global, 1, MPIType<IT>(), MPI_SUM, commGrid->rowWorld);
1696 
1697  return (profile_global);
1698 }
1699 
1700 
1701 
1702 template <class IT, class NT, class DER>
1703 template <typename VT, typename GIT, typename _BinaryOperation>
1704 void SpParMat<IT,NT,DER>::MaskedReduce(FullyDistVec<GIT,VT> & rvec, FullyDistSpVec<GIT,VT> & mask, Dim dim, _BinaryOperation __binary_op, VT id, bool exclude) const
1705 {
1706  if (dim!=Column)
1707  {
1708  SpParHelper::Print("SpParMat::MaskedReduce() is only implemented for Colum\n");
1709  MPI_Abort(MPI_COMM_WORLD,GRIDMISMATCH);
1710  }
1711  MaskedReduce(rvec, mask, dim, __binary_op, id, myidentity<NT>(), exclude);
1712 }
1713 
1723 template <class IT, class NT, class DER>
1724 template <typename VT, typename GIT, typename _BinaryOperation, typename _UnaryOperation> // GIT: global index type of vector
1725 void SpParMat<IT,NT,DER>::MaskedReduce(FullyDistVec<GIT,VT> & rvec, FullyDistSpVec<GIT,VT> & mask, Dim dim, _BinaryOperation __binary_op, VT id, _UnaryOperation __unary_op, bool exclude) const
1726 {
1727  MPI_Comm World = commGrid->GetWorld();
1728  MPI_Comm ColWorld = commGrid->GetColWorld();
1729  MPI_Comm RowWorld = commGrid->GetRowWorld();
1730 
1731  if (dim!=Column)
1732  {
1733  SpParHelper::Print("SpParMat::MaskedReduce() is only implemented for Colum\n");
1734  MPI_Abort(MPI_COMM_WORLD,GRIDMISMATCH);
1735  }
1736  if(*rvec.commGrid != *commGrid)
1737  {
1738  SpParHelper::Print("Grids are not comparable, SpParMat::MaskedReduce() fails!", commGrid->GetWorld());
1739  MPI_Abort(MPI_COMM_WORLD,GRIDMISMATCH);
1740  }
1741 
1742  int rowneighs = commGrid->GetGridCols();
1743  int rowrank = commGrid->GetRankInProcRow();
1744  std::vector<int> rownz(rowneighs);
1745  int locnnzMask = static_cast<int> (mask.getlocnnz());
1746  rownz[rowrank] = locnnzMask;
1747  MPI_Allgather(MPI_IN_PLACE, 1, MPI_INT, rownz.data(), 1, MPI_INT, RowWorld);
1748  std::vector<int> dpls(rowneighs+1,0);
1749  std::partial_sum(rownz.begin(), rownz.end(), dpls.data()+1);
1750  int accnz = std::accumulate(rownz.begin(), rownz.end(), 0);
1751  std::vector<GIT> sendInd(locnnzMask);
1752  std::transform(mask.ind.begin(), mask.ind.end(),sendInd.begin(), bind2nd(std::plus<GIT>(), mask.RowLenUntil()));
1753 
1754  std::vector<GIT> indMask(accnz);
1755  MPI_Allgatherv(sendInd.data(), rownz[rowrank], MPIType<GIT>(), indMask.data(), rownz.data(), dpls.data(), MPIType<GIT>(), RowWorld);
1756 
1757 
1758  // We can't use rvec's distribution (rows first, columns later) here
1759  IT n_thiscol = getlocalcols(); // length assigned to this processor column
1760  int colneighs = commGrid->GetGridRows(); // including oneself
1761  int colrank = commGrid->GetRankInProcCol();
1762 
1763  GIT * loclens = new GIT[colneighs];
1764  GIT * lensums = new GIT[colneighs+1](); // begin/end points of local lengths
1765 
1766  GIT n_perproc = n_thiscol / colneighs; // length on a typical processor
1767  if(colrank == colneighs-1)
1768  loclens[colrank] = n_thiscol - (n_perproc*colrank);
1769  else
1770  loclens[colrank] = n_perproc;
1771 
1772  MPI_Allgather(MPI_IN_PLACE, 0, MPIType<GIT>(), loclens, 1, MPIType<GIT>(), commGrid->GetColWorld());
1773  std::partial_sum(loclens, loclens+colneighs, lensums+1); // loclens and lensums are different, but both would fit in 32-bits
1774 
1775  std::vector<VT> trarr;
1776  typename DER::SpColIter colit = spSeq->begcol();
1777  for(int i=0; i< colneighs; ++i)
1778  {
1779  VT * sendbuf = new VT[loclens[i]];
1780  std::fill(sendbuf, sendbuf+loclens[i], id); // fill with identity
1781 
1782  for(; colit != spSeq->endcol() && colit.colid() < lensums[i+1]; ++colit) // iterate over a portion of columns
1783  {
1784  int k=0;
1785  typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit);
1786 
1787  for(; nzit != spSeq->endnz(colit) && k < indMask.size(); ) // all nonzeros in this column
1788  {
1789  if(nzit.rowid() < indMask[k])
1790  {
1791  if(exclude)
1792  {
1793  sendbuf[colit.colid()-lensums[i]] = __binary_op(static_cast<VT>(__unary_op(nzit.value())), sendbuf[colit.colid()-lensums[i]]);
1794  }
1795  ++nzit;
1796  }
1797  else if(nzit.rowid() > indMask[k]) ++k;
1798  else
1799  {
1800  if(!exclude)
1801  {
1802  sendbuf[colit.colid()-lensums[i]] = __binary_op(static_cast<VT>(__unary_op(nzit.value())), sendbuf[colit.colid()-lensums[i]]);
1803  }
1804  ++k;
1805  ++nzit;
1806  }
1807 
1808  }
1809  if(exclude)
1810  {
1811  while(nzit != spSeq->endnz(colit))
1812  {
1813  sendbuf[colit.colid()-lensums[i]] = __binary_op(static_cast<VT>(__unary_op(nzit.value())), sendbuf[colit.colid()-lensums[i]]);
1814  ++nzit;
1815  }
1816  }
1817  }
1818 
1819  VT * recvbuf = NULL;
1820  if(colrank == i)
1821  {
1822  trarr.resize(loclens[i]);
1823  recvbuf = SpHelper::p2a(trarr);
1824  }
1825  MPI_Reduce(sendbuf, recvbuf, loclens[i], MPIType<VT>(), MPIOp<_BinaryOperation, VT>::op(), i, commGrid->GetColWorld()); // root = i
1826  delete [] sendbuf;
1827  }
1828  DeleteAll(loclens, lensums);
1829 
1830  GIT reallen; // Now we have to transpose the vector
1831  GIT trlen = trarr.size();
1832  int diagneigh = commGrid->GetComplementRank();
1833  MPI_Status status;
1834  MPI_Sendrecv(&trlen, 1, MPIType<IT>(), diagneigh, TRNNZ, &reallen, 1, MPIType<IT>(), diagneigh, TRNNZ, commGrid->GetWorld(), &status);
1835 
1836  rvec.arr.resize(reallen);
1837  MPI_Sendrecv(SpHelper::p2a(trarr), trlen, MPIType<VT>(), diagneigh, TRX, SpHelper::p2a(rvec.arr), reallen, MPIType<VT>(), diagneigh, TRX, commGrid->GetWorld(), &status);
1838  rvec.glen = getncol(); // ABAB: Put a sanity check here
1839 
1840 }
1841 
1842 
1843 
1844 
1845 template <class IT, class NT, class DER>
1846 template <typename NNT,typename NDER>
1848 {
1849  NDER * convert = new NDER(*spSeq);
1850  return SpParMat<IT,NNT,NDER> (convert, commGrid);
1851 }
1852 
1854 template <class IT, class NT, class DER>
1855 template <typename NIT, typename NNT,typename NDER>
1857 {
1858  NDER * convert = new NDER(*spSeq);
1859  return SpParMat<NIT,NNT,NDER> (convert, commGrid);
1860 }
1861 
1866 template <class IT, class NT, class DER>
1867 SpParMat<IT,NT,DER> SpParMat<IT,NT,DER>::SubsRefCol (const std::vector<IT> & ci) const
1868 {
1869  std::vector<IT> ri;
1870  DER * tempseq = new DER((*spSeq)(ri, ci));
1871  return SpParMat<IT,NT,DER> (tempseq, commGrid);
1872 }
1873 
1881 template <class IT, class NT, class DER>
1882 template <typename PTNTBOOL, typename PTBOOLNT>
1884 {
1885  // infer the concrete type SpMat<IT,IT>
1886  typedef typename create_trait<DER, IT, bool>::T_inferred DER_IT;
1887 
1888  if((*(ri.commGrid) != *(commGrid)) || (*(ci.commGrid) != *(commGrid)))
1889  {
1890  SpParHelper::Print("Grids are not comparable, SpRef fails !");
1891  MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
1892  }
1893 
1894  // Safety check
1895  IT locmax_ri = 0;
1896  IT locmax_ci = 0;
1897  if(!ri.arr.empty())
1898  locmax_ri = *std::max_element(ri.arr.begin(), ri.arr.end());
1899  if(!ci.arr.empty())
1900  locmax_ci = *std::max_element(ci.arr.begin(), ci.arr.end());
1901 
1902  IT total_m = getnrow();
1903  IT total_n = getncol();
1904  if(locmax_ri > total_m || locmax_ci > total_n)
1905  {
1906  throw outofrangeexception();
1907  }
1908 
1909  // The indices for FullyDistVec are offset'd to 1/p pieces
1910  // The matrix indices are offset'd to 1/sqrt(p) pieces
1911  // Add the corresponding offset before sending the data
1912  IT roffset = ri.RowLenUntil();
1913  IT rrowlen = ri.MyRowLength();
1914  IT coffset = ci.RowLenUntil();
1915  IT crowlen = ci.MyRowLength();
1916 
1917  // We create two boolean matrices P and Q
1918  // Dimensions: P is size(ri) x m
1919  // Q is n x size(ci)
1920  // Range(ri) = {0,...,m-1}
1921  // Range(ci) = {0,...,n-1}
1922 
1923  IT rowneighs = commGrid->GetGridCols(); // number of neighbors along this processor row (including oneself)
1924  IT totalm = getnrow(); // collective call
1925  IT totaln = getncol();
1926  IT m_perproccol = totalm / rowneighs;
1927  IT n_perproccol = totaln / rowneighs;
1928 
1929  // Get the right local dimensions
1930  IT diagneigh = commGrid->GetComplementRank();
1931  IT mylocalrows = getlocalrows();
1932  IT mylocalcols = getlocalcols();
1933  IT trlocalrows;
1934  MPI_Status status;
1935  MPI_Sendrecv(&mylocalrows, 1, MPIType<IT>(), diagneigh, TRROWX, &trlocalrows, 1, MPIType<IT>(), diagneigh, TRROWX, commGrid->GetWorld(), &status);
1936  // we don't need trlocalcols because Q.Transpose() will take care of it
1937 
1938  std::vector< std::vector<IT> > rowid(rowneighs); // reuse for P and Q
1939  std::vector< std::vector<IT> > colid(rowneighs);
1940 
1941  // Step 1: Create P
1942  IT locvec = ri.arr.size(); // nnz in local vector
1943  for(typename std::vector<IT>::size_type i=0; i< (unsigned)locvec; ++i)
1944  {
1945  // numerical values (permutation indices) are 0-based
1946  // recipient alone progessor row
1947  IT rowrec = (m_perproccol!=0) ? std::min(ri.arr[i] / m_perproccol, rowneighs-1) : (rowneighs-1);
1948 
1949  // ri's numerical values give the colids and its local indices give rowids
1950  rowid[rowrec].push_back( i + roffset);
1951  colid[rowrec].push_back(ri.arr[i] - (rowrec * m_perproccol));
1952  }
1953 
1954  int * sendcnt = new int[rowneighs]; // reuse in Q as well
1955  int * recvcnt = new int[rowneighs];
1956  for(IT i=0; i<rowneighs; ++i)
1957  sendcnt[i] = rowid[i].size();
1958 
1959  MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, commGrid->GetRowWorld()); // share the counts
1960  int * sdispls = new int[rowneighs]();
1961  int * rdispls = new int[rowneighs]();
1962  std::partial_sum(sendcnt, sendcnt+rowneighs-1, sdispls+1);
1963  std::partial_sum(recvcnt, recvcnt+rowneighs-1, rdispls+1);
1964  IT p_nnz = std::accumulate(recvcnt,recvcnt+rowneighs, static_cast<IT>(0));
1965 
1966  // create space for incoming data ...
1967  IT * p_rows = new IT[p_nnz];
1968  IT * p_cols = new IT[p_nnz];
1969  IT * senddata = new IT[locvec]; // re-used for both rows and columns
1970  for(int i=0; i<rowneighs; ++i)
1971  {
1972  std::copy(rowid[i].begin(), rowid[i].end(), senddata+sdispls[i]);
1973  std::vector<IT>().swap(rowid[i]); // clear memory of rowid
1974  }
1975  MPI_Alltoallv(senddata, sendcnt, sdispls, MPIType<IT>(), p_rows, recvcnt, rdispls, MPIType<IT>(), commGrid->GetRowWorld());
1976 
1977  for(int i=0; i<rowneighs; ++i)
1978  {
1979  std::copy(colid[i].begin(), colid[i].end(), senddata+sdispls[i]);
1980  std::vector<IT>().swap(colid[i]); // clear memory of colid
1981  }
1982  MPI_Alltoallv(senddata, sendcnt, sdispls, MPIType<IT>(), p_cols, recvcnt, rdispls, MPIType<IT>(), commGrid->GetRowWorld());
1983  delete [] senddata;
1984 
1985  std::tuple<IT,IT,bool> * p_tuples = new std::tuple<IT,IT,bool>[p_nnz];
1986  for(IT i=0; i< p_nnz; ++i)
1987  {
1988  p_tuples[i] = std::make_tuple(p_rows[i], p_cols[i], 1);
1989  }
1990  DeleteAll(p_rows, p_cols);
1991 
1992  DER_IT * PSeq = new DER_IT();
1993  PSeq->Create( p_nnz, rrowlen, trlocalrows, p_tuples); // deletion of tuples[] is handled by SpMat::Create
1994 
1995  SpParMat<IT,NT,DER> PA(commGrid);
1996  if(&ri == &ci) // Symmetric permutation
1997  {
1998  DeleteAll(sendcnt, recvcnt, sdispls, rdispls);
1999  #ifdef SPREFDEBUG
2000  SpParHelper::Print("Symmetric permutation\n", commGrid->GetWorld());
2001  #endif
2002  SpParMat<IT,bool,DER_IT> P (PSeq, commGrid);
2003  if(inplace)
2004  {
2005  #ifdef SPREFDEBUG
2006  SpParHelper::Print("In place multiplication\n", commGrid->GetWorld());
2007  #endif
2008  *this = Mult_AnXBn_DoubleBuff<PTBOOLNT, NT, DER>(P, *this, false, true); // clear the memory of *this
2009 
2010  //ostringstream outb;
2011  //outb << "P_after_" << commGrid->myrank;
2012  //ofstream ofb(outb.str().c_str());
2013  //P.put(ofb);
2014 
2015  P.Transpose();
2016  *this = Mult_AnXBn_DoubleBuff<PTNTBOOL, NT, DER>(*this, P, true, true); // clear the memory of both *this and P
2017  return SpParMat<IT,NT,DER>(commGrid); // dummy return to match signature
2018  }
2019  else
2020  {
2021  PA = Mult_AnXBn_DoubleBuff<PTBOOLNT, NT, DER>(P,*this);
2022  P.Transpose();
2023  return Mult_AnXBn_DoubleBuff<PTNTBOOL, NT, DER>(PA, P);
2024  }
2025  }
2026  else
2027  {
2028  // Intermediate step (to save memory): Form PA and store it in P
2029  // Distributed matrix generation (collective call)
2030  SpParMat<IT,bool,DER_IT> P (PSeq, commGrid);
2031 
2032  // Do parallel matrix-matrix multiply
2033  PA = Mult_AnXBn_DoubleBuff<PTBOOLNT, NT, DER>(P, *this);
2034  } // P is destructed here
2035 #ifndef NDEBUG
2036  PA.PrintInfo();
2037 #endif
2038  // Step 2: Create Q (use the same row-wise communication and transpose at the end)
2039  // This temporary to-be-transposed Q is size(ci) x n
2040  locvec = ci.arr.size(); // nnz in local vector (reset variable)
2041  for(typename std::vector<IT>::size_type i=0; i< (unsigned)locvec; ++i)
2042  {
2043  // numerical values (permutation indices) are 0-based
2044  IT rowrec = (n_perproccol!=0) ? std::min(ci.arr[i] / n_perproccol, rowneighs-1) : (rowneighs-1);
2045 
2046  // ri's numerical values give the colids and its local indices give rowids
2047  rowid[rowrec].push_back( i + coffset);
2048  colid[rowrec].push_back(ci.arr[i] - (rowrec * n_perproccol));
2049  }
2050 
2051  for(IT i=0; i<rowneighs; ++i)
2052  sendcnt[i] = rowid[i].size(); // update with new sizes
2053 
2054  MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, commGrid->GetRowWorld()); // share the counts
2055  std::fill(sdispls, sdispls+rowneighs, 0); // reset
2056  std::fill(rdispls, rdispls+rowneighs, 0);
2057  std::partial_sum(sendcnt, sendcnt+rowneighs-1, sdispls+1);
2058  std::partial_sum(recvcnt, recvcnt+rowneighs-1, rdispls+1);
2059  IT q_nnz = std::accumulate(recvcnt,recvcnt+rowneighs, static_cast<IT>(0));
2060 
2061  // create space for incoming data ...
2062  IT * q_rows = new IT[q_nnz];
2063  IT * q_cols = new IT[q_nnz];
2064  senddata = new IT[locvec];
2065  for(int i=0; i<rowneighs; ++i)
2066  {
2067  std::copy(rowid[i].begin(), rowid[i].end(), senddata+sdispls[i]);
2068  std::vector<IT>().swap(rowid[i]); // clear memory of rowid
2069  }
2070  MPI_Alltoallv(senddata, sendcnt, sdispls, MPIType<IT>(), q_rows, recvcnt, rdispls, MPIType<IT>(), commGrid->GetRowWorld());
2071 
2072  for(int i=0; i<rowneighs; ++i)
2073  {
2074  std::copy(colid[i].begin(), colid[i].end(), senddata+sdispls[i]);
2075  std::vector<IT>().swap(colid[i]); // clear memory of colid
2076  }
2077  MPI_Alltoallv(senddata, sendcnt, sdispls, MPIType<IT>(), q_cols, recvcnt, rdispls, MPIType<IT>(), commGrid->GetRowWorld());
2078  DeleteAll(senddata, sendcnt, recvcnt, sdispls, rdispls);
2079 
2080  std::tuple<IT,IT,bool> * q_tuples = new std::tuple<IT,IT,bool>[q_nnz];
2081  for(IT i=0; i< q_nnz; ++i)
2082  {
2083  q_tuples[i] = std::make_tuple(q_rows[i], q_cols[i], 1);
2084  }
2085  DeleteAll(q_rows, q_cols);
2086  DER_IT * QSeq = new DER_IT();
2087  QSeq->Create( q_nnz, crowlen, mylocalcols, q_tuples); // Creating Q' instead
2088 
2089  // Step 3: Form PAQ
2090  // Distributed matrix generation (collective call)
2091  SpParMat<IT,bool,DER_IT> Q (QSeq, commGrid);
2092  Q.Transpose();
2093  if(inplace)
2094  {
2095  *this = Mult_AnXBn_DoubleBuff<PTNTBOOL, NT, DER>(PA, Q, true, true); // clear the memory of both PA and P
2096  return SpParMat<IT,NT,DER>(commGrid); // dummy return to match signature
2097  }
2098  else
2099  {
2100  return Mult_AnXBn_DoubleBuff<PTNTBOOL, NT, DER>(PA, Q);
2101  }
2102 }
2103 
2104 
2105 template <class IT, class NT, class DER>
2107 {
2108  typedef PlusTimesSRing<NT, NT> PTRing;
2109 
2110  if((*(ri.commGrid) != *(B.commGrid)) || (*(ci.commGrid) != *(B.commGrid)))
2111  {
2112  SpParHelper::Print("Grids are not comparable, SpAsgn fails !", commGrid->GetWorld());
2113  MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
2114  }
2115  IT total_m_A = getnrow();
2116  IT total_n_A = getncol();
2117  IT total_m_B = B.getnrow();
2118  IT total_n_B = B.getncol();
2119 
2120  if(total_m_B != ri.TotalLength())
2121  {
2122  SpParHelper::Print("First dimension of B does NOT match the length of ri, SpAsgn fails !", commGrid->GetWorld());
2123  MPI_Abort(MPI_COMM_WORLD, DIMMISMATCH);
2124  }
2125  if(total_n_B != ci.TotalLength())
2126  {
2127  SpParHelper::Print("Second dimension of B does NOT match the length of ci, SpAsgn fails !", commGrid->GetWorld());
2128  MPI_Abort(MPI_COMM_WORLD, DIMMISMATCH);
2129  }
2130  Prune(ri, ci); // make a hole
2131 
2132  // embed B to the size of A
2133  FullyDistVec<IT,IT> * rvec = new FullyDistVec<IT,IT>(ri.commGrid);
2134  rvec->iota(total_m_B, 0); // sparse() expects a zero based index
2135 
2136  SpParMat<IT,NT,DER> R(total_m_A, total_m_B, ri, *rvec, 1);
2137  delete rvec; // free memory
2138  SpParMat<IT,NT,DER> RB = Mult_AnXBn_DoubleBuff<PTRing, NT, DER>(R, B, true, false); // clear memory of R but not B
2139 
2140  FullyDistVec<IT,IT> * qvec = new FullyDistVec<IT,IT>(ri.commGrid);
2141  qvec->iota(total_n_B, 0);
2142  SpParMat<IT,NT,DER> Q(total_n_B, total_n_A, *qvec, ci, 1);
2143  delete qvec; // free memory
2144  SpParMat<IT,NT,DER> RBQ = Mult_AnXBn_DoubleBuff<PTRing, NT, DER>(RB, Q, true, true); // clear memory of RB and Q
2145  *this += RBQ; // extend-add
2146 }
2147 
2148 template <class IT, class NT, class DER>
2150 {
2151  typedef PlusTimesSRing<NT, NT> PTRing;
2152 
2153  if((*(ri.commGrid) != *(commGrid)) || (*(ci.commGrid) != *(commGrid)))
2154  {
2155  SpParHelper::Print("Grids are not comparable, Prune fails!\n", commGrid->GetWorld());
2156  MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
2157  }
2158 
2159  // Safety check
2160  IT locmax_ri = 0;
2161  IT locmax_ci = 0;
2162  if(!ri.arr.empty())
2163  locmax_ri = *std::max_element(ri.arr.begin(), ri.arr.end());
2164  if(!ci.arr.empty())
2165  locmax_ci = *std::max_element(ci.arr.begin(), ci.arr.end());
2166 
2167  IT total_m = getnrow();
2168  IT total_n = getncol();
2169  if(locmax_ri > total_m || locmax_ci > total_n)
2170  {
2171  throw outofrangeexception();
2172  }
2173 
2174  SpParMat<IT,NT,DER> S(total_m, total_m, ri, ri, 1);
2175  SpParMat<IT,NT,DER> SA = Mult_AnXBn_DoubleBuff<PTRing, NT, DER>(S, *this, true, false); // clear memory of S but not *this
2176 
2177  SpParMat<IT,NT,DER> T(total_n, total_n, ci, ci, 1);
2178  SpParMat<IT,NT,DER> SAT = Mult_AnXBn_DoubleBuff<PTRing, NT, DER>(SA, T, true, true); // clear memory of SA and T
2179  EWiseMult(SAT, true); // In-place EWiseMult with not(SAT)
2180 }
2181 
2183 template <class IT, class NT, class DER>
2184 template <typename _BinaryOperation>
2185 SpParMat<IT,NT,DER> SpParMat<IT,NT,DER>::PruneColumn(const FullyDistVec<IT,NT> & pvals, _BinaryOperation __binary_op, bool inPlace)
2186 {
2187  MPI_Barrier(MPI_COMM_WORLD);
2188  if(getncol() != pvals.TotalLength())
2189  {
2190  std::ostringstream outs;
2191  outs << "Can not prune column-by-column, dimensions does not match"<< std::endl;
2192  outs << getncol() << " != " << pvals.TotalLength() << std::endl;
2193  SpParHelper::Print(outs.str());
2194  MPI_Abort(MPI_COMM_WORLD, DIMMISMATCH);
2195  }
2196  if(! ( *(getcommgrid()) == *(pvals.getcommgrid())) )
2197  {
2198  std::cout << "Grids are not comparable for PurneColumn" << std::endl;
2199  MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
2200  }
2201 
2202  MPI_Comm World = pvals.commGrid->GetWorld();
2203  MPI_Comm ColWorld = pvals.commGrid->GetColWorld();
2204 
2205  int xsize = (int) pvals.LocArrSize();
2206  int trxsize = 0;
2207 
2208 
2209  int diagneigh = pvals.commGrid->GetComplementRank();
2210  MPI_Status status;
2211  MPI_Sendrecv(&xsize, 1, MPI_INT, diagneigh, TRX, &trxsize, 1, MPI_INT, diagneigh, TRX, World, &status);
2212 
2213 
2214  NT * trxnums = new NT[trxsize];
2215  MPI_Sendrecv(const_cast<NT*>(SpHelper::p2a(pvals.arr)), xsize, MPIType<NT>(), diagneigh, TRX, trxnums, trxsize, MPIType<NT>(), diagneigh, TRX, World, &status);
2216 
2217  int colneighs, colrank;
2218  MPI_Comm_size(ColWorld, &colneighs);
2219  MPI_Comm_rank(ColWorld, &colrank);
2220  int * colsize = new int[colneighs];
2221  colsize[colrank] = trxsize;
2222  MPI_Allgather(MPI_IN_PLACE, 1, MPI_INT, colsize, 1, MPI_INT, ColWorld);
2223  int * dpls = new int[colneighs](); // displacements (zero initialized pid)
2224  std::partial_sum(colsize, colsize+colneighs-1, dpls+1);
2225  int accsize = std::accumulate(colsize, colsize+colneighs, 0);
2226  std::vector<NT> numacc(accsize);
2227 
2228 #ifdef COMBBLAS_DEBUG
2229  std::ostringstream outs2;
2230  outs2 << "PruneColumn displacements: ";
2231  for(int i=0; i< colneighs; ++i)
2232  {
2233  outs2 << dpls[i] << " ";
2234  }
2235  outs2 << std::endl;
2236  SpParHelper::Print(outs2.str());
2237  MPI_Barrier(World);
2238 #endif
2239 
2240 
2241  MPI_Allgatherv(trxnums, trxsize, MPIType<NT>(), numacc.data(), colsize, dpls, MPIType<NT>(), ColWorld);
2242  delete [] trxnums;
2243  delete [] colsize;
2244  delete [] dpls;
2245 
2246  //sanity check
2247  assert(accsize == getlocalcols());
2248  if (inPlace)
2249  {
2250  spSeq->PruneColumn(numacc.data(), __binary_op, inPlace);
2251  return SpParMat<IT,NT,DER>(getcommgrid()); // return blank to match signature
2252  }
2253  else
2254  {
2255  return SpParMat<IT,NT,DER>(spSeq->PruneColumn(numacc.data(), __binary_op, inPlace), commGrid);
2256  }
2257 }
2258 
2259 
2262 template <class IT, class NT, class DER>
2263 template <typename _BinaryOperation>
2264 SpParMat<IT,NT,DER> SpParMat<IT,NT,DER>::PruneColumn(const FullyDistSpVec<IT,NT> & pvals, _BinaryOperation __binary_op, bool inPlace)
2265 {
2266  MPI_Barrier(MPI_COMM_WORLD);
2267  if(getncol() != pvals.TotalLength())
2268  {
2269  std::ostringstream outs;
2270  outs << "Can not prune column-by-column, dimensions does not match"<< std::endl;
2271  outs << getncol() << " != " << pvals.TotalLength() << std::endl;
2272  SpParHelper::Print(outs.str());
2273  MPI_Abort(MPI_COMM_WORLD, DIMMISMATCH);
2274  }
2275  if(! ( *(getcommgrid()) == *(pvals.getcommgrid())) )
2276  {
2277  std::cout << "Grids are not comparable for PurneColumn" << std::endl;
2278  MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
2279  }
2280 
2281  MPI_Comm World = pvals.commGrid->GetWorld();
2282  MPI_Comm ColWorld = pvals.commGrid->GetColWorld();
2283  int diagneigh = pvals.commGrid->GetComplementRank();
2284 
2285  IT xlocnz = pvals.getlocnnz();
2286  IT roffst = pvals.RowLenUntil();
2287  IT roffset;
2288  IT trxlocnz = 0;
2289 
2290  MPI_Status status;
2291  MPI_Sendrecv(&roffst, 1, MPIType<IT>(), diagneigh, TROST, &roffset, 1, MPIType<IT>(), diagneigh, TROST, World, &status);
2292  MPI_Sendrecv(&xlocnz, 1, MPIType<IT>(), diagneigh, TRNNZ, &trxlocnz, 1, MPIType<IT>(), diagneigh, TRNNZ, World, &status);
2293 
2294  std::vector<IT> trxinds (trxlocnz);
2295  std::vector<NT> trxnums (trxlocnz);
2296  MPI_Sendrecv(pvals.ind.data(), xlocnz, MPIType<IT>(), diagneigh, TRI, trxinds.data(), trxlocnz, MPIType<IT>(), diagneigh, TRI, World, &status);
2297  MPI_Sendrecv(pvals.num.data(), xlocnz, MPIType<NT>(), diagneigh, TRX, trxnums.data(), trxlocnz, MPIType<NT>(), diagneigh, TRX, World, &status);
2298  std::transform(trxinds.data(), trxinds.data()+trxlocnz, trxinds.data(), std::bind2nd(std::plus<IT>(), roffset));
2299 
2300 
2301  int colneighs, colrank;
2302  MPI_Comm_size(ColWorld, &colneighs);
2303  MPI_Comm_rank(ColWorld, &colrank);
2304  int * colnz = new int[colneighs];
2305  colnz[colrank] = trxlocnz;
2306  MPI_Allgather(MPI_IN_PLACE, 1, MPI_INT, colnz, 1, MPI_INT, ColWorld);
2307  int * dpls = new int[colneighs](); // displacements (zero initialized pid)
2308  std::partial_sum(colnz, colnz+colneighs-1, dpls+1);
2309  IT accnz = std::accumulate(colnz, colnz+colneighs, 0);
2310 
2311  std::vector<IT> indacc(accnz);
2312  std::vector<NT> numacc(accnz);
2313  MPI_Allgatherv(trxinds.data(), trxlocnz, MPIType<IT>(), indacc.data(), colnz, dpls, MPIType<IT>(), ColWorld);
2314  MPI_Allgatherv(trxnums.data(), trxlocnz, MPIType<NT>(), numacc.data(), colnz, dpls, MPIType<NT>(), ColWorld);
2315 
2316  delete [] colnz;
2317  delete [] dpls;
2318 
2319 
2320  if (inPlace)
2321  {
2322  spSeq->PruneColumn(indacc.data(), numacc.data(), __binary_op, inPlace);
2323  return SpParMat<IT,NT,DER>(getcommgrid()); // return blank to match signature
2324  }
2325  else
2326  {
2327  return SpParMat<IT,NT,DER>(spSeq->PruneColumn(indacc.data(), numacc.data(), __binary_op, inPlace), commGrid);
2328  }
2329 }
2330 
2331 
2332 
2333 // In-place version where rhs type is the same (no need for type promotion)
2334 template <class IT, class NT, class DER>
2336 {
2337  if(*commGrid == *rhs.commGrid)
2338  {
2339  spSeq->EWiseMult(*(rhs.spSeq), exclude); // Dimension compatibility check performed by sequential function
2340  }
2341  else
2342  {
2343  std::cout << "Grids are not comparable, EWiseMult() fails !" << std::endl;
2344  MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
2345  }
2346 }
2347 
2348 
2349 template <class IT, class NT, class DER>
2351 {
2352  if(*commGrid == *rhs.commGrid)
2353  {
2354  spSeq->EWiseScale(rhs.array, rhs.m, rhs.n); // Dimension compatibility check performed by sequential function
2355  }
2356  else
2357  {
2358  std::cout << "Grids are not comparable, EWiseScale() fails !" << std::endl;
2359  MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
2360  }
2361 }
2362 
2363 template <class IT, class NT, class DER>
2364 template <typename _BinaryOperation>
2365 void SpParMat<IT,NT,DER>::UpdateDense(DenseParMat<IT, NT> & rhs, _BinaryOperation __binary_op) const
2366 {
2367  if(*commGrid == *rhs.commGrid)
2368  {
2369  if(getlocalrows() == rhs.m && getlocalcols() == rhs.n)
2370  {
2371  spSeq->UpdateDense(rhs.array, __binary_op);
2372  }
2373  else
2374  {
2375  std::cout << "Matrices have different dimensions, UpdateDense() fails !" << std::endl;
2376  MPI_Abort(MPI_COMM_WORLD, DIMMISMATCH);
2377  }
2378  }
2379  else
2380  {
2381  std::cout << "Grids are not comparable, UpdateDense() fails !" << std::endl;
2382  MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
2383  }
2384 }
2385 
2386 template <class IT, class NT, class DER>
2388 {
2389  IT mm = getnrow();
2390  IT nn = getncol();
2391  IT nznz = getnnz();
2392 
2393  if (commGrid->myrank == 0)
2394  std::cout << "As a whole: " << mm << " rows and "<< nn <<" columns and "<< nznz << " nonzeros" << std::endl;
2395 
2396 #ifdef DEBUG
2397  IT allprocs = commGrid->grrows * commGrid->grcols;
2398  for(IT i=0; i< allprocs; ++i)
2399  {
2400  if (commGrid->myrank == i)
2401  {
2402  std::cout << "Processor (" << commGrid->GetRankInProcRow() << "," << commGrid->GetRankInProcCol() << ")'s data: " << std::endl;
2403  spSeq->PrintInfo();
2404  }
2405  MPI_Barrier(commGrid->GetWorld());
2406  }
2407 #endif
2408 }
2409 
2410 template <class IT, class NT, class DER>
2412 {
2413  int local = static_cast<int>((*spSeq) == (*(rhs.spSeq)));
2414  int whole = 1;
2415  MPI_Allreduce( &local, &whole, 1, MPI_INT, MPI_BAND, commGrid->GetWorld());
2416  return static_cast<bool>(whole);
2417 }
2418 
2419 
2424 template <class IT, class NT, class DER>
2425 template <typename _BinaryOperation, typename LIT>
2426 void SpParMat< IT,NT,DER >::SparseCommon(std::vector< std::vector < std::tuple<LIT,LIT,NT> > > & data, LIT locsize, IT total_m, IT total_n, _BinaryOperation BinOp)
2427 {
2428  int nprocs = commGrid->GetSize();
2429  int * sendcnt = new int[nprocs];
2430  int * recvcnt = new int[nprocs];
2431  for(int i=0; i<nprocs; ++i)
2432  sendcnt[i] = data[i].size(); // sizes are all the same
2433 
2434  MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, commGrid->GetWorld()); // share the counts
2435  int * sdispls = new int[nprocs]();
2436  int * rdispls = new int[nprocs]();
2437  std::partial_sum(sendcnt, sendcnt+nprocs-1, sdispls+1);
2438  std::partial_sum(recvcnt, recvcnt+nprocs-1, rdispls+1);
2439  IT totrecv = std::accumulate(recvcnt,recvcnt+nprocs, static_cast<IT>(0));
2440  IT totsent = std::accumulate(sendcnt,sendcnt+nprocs, static_cast<IT>(0));
2441 
2442  assert((totsent < std::numeric_limits<int>::max()));
2443  assert((totrecv < std::numeric_limits<int>::max()));
2444 
2445 
2446 #if 0
2447  ofstream oput;
2448  commGrid->OpenDebugFile("Displacements", oput);
2449  copy(sdispls, sdispls+nprocs, ostream_iterator<int>(oput, " ")); oput << endl;
2450  copy(rdispls, rdispls+nprocs, ostream_iterator<int>(oput, " ")); oput << endl;
2451  oput.close();
2452 
2453  IT * gsizes;
2454  if(commGrid->GetRank() == 0) gsizes = new IT[nprocs];
2455  MPI_Gather(&totrecv, 1, MPIType<IT>(), gsizes, 1, MPIType<IT>(), 0, commGrid->GetWorld());
2456  if(commGrid->GetRank() == 0) { std::copy(gsizes, gsizes+nprocs, std::ostream_iterator<IT>(std::cout, " ")); std::cout << std::endl; }
2457  MPI_Barrier(commGrid->GetWorld());
2458  MPI_Gather(&totsent, 1, MPIType<IT>(), gsizes, 1, MPIType<IT>(), 0, commGrid->GetWorld());
2459  if(commGrid->GetRank() == 0) { copy(gsizes, gsizes+nprocs, ostream_iterator<IT>(cout, " ")); cout << endl; }
2460  MPI_Barrier(commGrid->GetWorld());
2461  if(commGrid->GetRank() == 0) delete [] gsizes;
2462 #endif
2463 
2464  std::tuple<LIT,LIT,NT> * senddata = new std::tuple<LIT,LIT,NT>[locsize]; // re-used for both rows and columns
2465  for(int i=0; i<nprocs; ++i)
2466  {
2467  std::copy(data[i].begin(), data[i].end(), senddata+sdispls[i]);
2468  data[i].clear(); // clear memory
2469  data[i].shrink_to_fit();
2470  }
2471  MPI_Datatype MPI_triple;
2472  MPI_Type_contiguous(sizeof(std::tuple<LIT,LIT,NT>), MPI_CHAR, &MPI_triple);
2473  MPI_Type_commit(&MPI_triple);
2474 
2475  std::tuple<LIT,LIT,NT> * recvdata = new std::tuple<LIT,LIT,NT>[totrecv];
2476  MPI_Alltoallv(senddata, sendcnt, sdispls, MPI_triple, recvdata, recvcnt, rdispls, MPI_triple, commGrid->GetWorld());
2477 
2478  DeleteAll(senddata, sendcnt, recvcnt, sdispls, rdispls);
2479  MPI_Type_free(&MPI_triple);
2480 
2481  int r = commGrid->GetGridRows();
2482  int s = commGrid->GetGridCols();
2483  IT m_perproc = total_m / r;
2484  IT n_perproc = total_n / s;
2485  int myprocrow = commGrid->GetRankInProcCol();
2486  int myproccol = commGrid->GetRankInProcRow();
2487  IT locrows, loccols;
2488  if(myprocrow != r-1) locrows = m_perproc;
2489  else locrows = total_m - myprocrow * m_perproc;
2490  if(myproccol != s-1) loccols = n_perproc;
2491  else loccols = total_n - myproccol * n_perproc;
2492 
2493  SpTuples<LIT,NT> A(totrecv, locrows, loccols, recvdata); // It is ~SpTuples's job to deallocate
2494 
2495  // the previous constructor sorts based on columns-first (but that doesn't matter as long as they are sorted one way or another)
2496  A.RemoveDuplicates(BinOp);
2497 
2498  spSeq = new DER(A,false); // Convert SpTuples to DER
2499 }
2500 
2501 
2503 template <class IT, class NT, class DER>
2504 SpParMat< IT,NT,DER >::SpParMat (IT total_m, IT total_n, const FullyDistVec<IT,IT> & distrows,
2505  const FullyDistVec<IT,IT> & distcols, const FullyDistVec<IT,NT> & distvals, bool SumDuplicates)
2506 {
2507  if((*(distrows.commGrid) != *(distcols.commGrid)) || (*(distcols.commGrid) != *(distvals.commGrid)))
2508  {
2509  SpParHelper::Print("Grids are not comparable, Sparse() fails!\n"); // commGrid is not initialized yet
2510  MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
2511  }
2512  if((distrows.TotalLength() != distcols.TotalLength()) || (distcols.TotalLength() != distvals.TotalLength()))
2513  {
2514  SpParHelper::Print("Vectors have different sizes, Sparse() fails!");
2515  MPI_Abort(MPI_COMM_WORLD, DIMMISMATCH);
2516  }
2517 
2518  commGrid = distrows.commGrid;
2519  int nprocs = commGrid->GetSize();
2520  std::vector< std::vector < std::tuple<IT,IT,NT> > > data(nprocs);
2521 
2522  IT locsize = distrows.LocArrSize();
2523  for(IT i=0; i<locsize; ++i)
2524  {
2525  IT lrow, lcol;
2526  int owner = Owner(total_m, total_n, distrows.arr[i], distcols.arr[i], lrow, lcol);
2527  data[owner].push_back(std::make_tuple(lrow,lcol,distvals.arr[i]));
2528  }
2529  if(SumDuplicates)
2530  {
2531  SparseCommon(data, locsize, total_m, total_n, std::plus<NT>());
2532  }
2533  else
2534  {
2535  SparseCommon(data, locsize, total_m, total_n, maximum<NT>());
2536  }
2537 }
2538 
2539 
2540 
2541 template <class IT, class NT, class DER>
2542 SpParMat< IT,NT,DER >::SpParMat (IT total_m, IT total_n, const FullyDistVec<IT,IT> & distrows,
2543  const FullyDistVec<IT,IT> & distcols, const NT & val, bool SumDuplicates)
2544 {
2545  if((*(distrows.commGrid) != *(distcols.commGrid)) )
2546  {
2547  SpParHelper::Print("Grids are not comparable, Sparse() fails!\n");
2548  MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
2549  }
2550  if((distrows.TotalLength() != distcols.TotalLength()) )
2551  {
2552  SpParHelper::Print("Vectors have different sizes, Sparse() fails!\n");
2553  MPI_Abort(MPI_COMM_WORLD, DIMMISMATCH);
2554  }
2555  commGrid = distrows.commGrid;
2556  int nprocs = commGrid->GetSize();
2557  std::vector< std::vector < std::tuple<IT,IT,NT> > > data(nprocs);
2558 
2559  IT locsize = distrows.LocArrSize();
2560  for(IT i=0; i<locsize; ++i)
2561  {
2562  IT lrow, lcol;
2563  int owner = Owner(total_m, total_n, distrows.arr[i], distcols.arr[i], lrow, lcol);
2564  data[owner].push_back(std::make_tuple(lrow,lcol,val));
2565  }
2566  if(SumDuplicates)
2567  {
2568  SparseCommon(data, locsize, total_m, total_n, std::plus<NT>());
2569  }
2570  else
2571  {
2572  SparseCommon(data, locsize, total_m, total_n, maximum<NT>());
2573  }
2574 }
2575 
2576 template <class IT, class NT, class DER>
2577 template <class DELIT>
2579 {
2580  commGrid = DEL.commGrid;
2581  typedef typename DER::LocalIT LIT;
2582 
2583  int nprocs = commGrid->GetSize();
2584  int gridrows = commGrid->GetGridRows();
2585  int gridcols = commGrid->GetGridCols();
2586  std::vector< std::vector<LIT> > data(nprocs); // enties are pre-converted to local indices before getting pushed into "data"
2587 
2588  LIT m_perproc = DEL.getGlobalV() / gridrows;
2589  LIT n_perproc = DEL.getGlobalV() / gridcols;
2590 
2591  if(sizeof(LIT) < sizeof(DELIT))
2592  {
2593  std::ostringstream outs;
2594  outs << "Warning: Using smaller indices for the matrix than DistEdgeList\n";
2595  outs << "Local matrices are " << m_perproc << "-by-" << n_perproc << std::endl;
2596  SpParHelper::Print(outs.str(), commGrid->GetWorld()); // commgrid initialized
2597  }
2598 
2599  LIT stages = MEM_EFFICIENT_STAGES; // to lower memory consumption, form sparse matrix in stages
2600 
2601  // even if local indices (LIT) are 32-bits, we should work with 64-bits for global info
2602  int64_t perstage = DEL.nedges / stages;
2603  LIT totrecv = 0;
2604  std::vector<LIT> alledges;
2605 
2606  for(LIT s=0; s< stages; ++s)
2607  {
2608  int64_t n_befor = s*perstage;
2609  int64_t n_after= ((s==(stages-1))? DEL.nedges : ((s+1)*perstage));
2610 
2611  // clear the source vertex by setting it to -1
2612  int realedges = 0; // these are "local" realedges
2613 
2614  if(DEL.pedges)
2615  {
2616  for (int64_t i = n_befor; i < n_after; i++)
2617  {
2618  int64_t fr = get_v0_from_edge(&(DEL.pedges[i]));
2619  int64_t to = get_v1_from_edge(&(DEL.pedges[i]));
2620 
2621  if(fr >= 0 && to >= 0) // otherwise skip
2622  {
2623  IT lrow, lcol;
2624  int owner = Owner(DEL.getGlobalV(), DEL.getGlobalV(), fr, to, lrow, lcol);
2625  data[owner].push_back(lrow); // row_id
2626  data[owner].push_back(lcol); // col_id
2627  ++realedges;
2628  }
2629  }
2630  }
2631  else
2632  {
2633  for (int64_t i = n_befor; i < n_after; i++)
2634  {
2635  if(DEL.edges[2*i+0] >= 0 && DEL.edges[2*i+1] >= 0) // otherwise skip
2636  {
2637  IT lrow, lcol;
2638  int owner = Owner(DEL.getGlobalV(), DEL.getGlobalV(), DEL.edges[2*i+0], DEL.edges[2*i+1], lrow, lcol);
2639  data[owner].push_back(lrow);
2640  data[owner].push_back(lcol);
2641  ++realedges;
2642  }
2643  }
2644  }
2645 
2646  LIT * sendbuf = new LIT[2*realedges];
2647  int * sendcnt = new int[nprocs];
2648  int * sdispls = new int[nprocs];
2649  for(int i=0; i<nprocs; ++i)
2650  sendcnt[i] = data[i].size();
2651 
2652  int * rdispls = new int[nprocs];
2653  int * recvcnt = new int[nprocs];
2654  MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT,commGrid->GetWorld()); // share the counts
2655 
2656  sdispls[0] = 0;
2657  rdispls[0] = 0;
2658  for(int i=0; i<nprocs-1; ++i)
2659  {
2660  sdispls[i+1] = sdispls[i] + sendcnt[i];
2661  rdispls[i+1] = rdispls[i] + recvcnt[i];
2662  }
2663  for(int i=0; i<nprocs; ++i)
2664  std::copy(data[i].begin(), data[i].end(), sendbuf+sdispls[i]);
2665 
2666  // clear memory
2667  for(int i=0; i<nprocs; ++i)
2668  std::vector<LIT>().swap(data[i]);
2669 
2670  // ABAB: Total number of edges received might not be LIT-addressible
2671  // However, each edge_id is LIT-addressible
2672  IT thisrecv = std::accumulate(recvcnt,recvcnt+nprocs, static_cast<IT>(0)); // thisrecv = 2*locedges
2673  LIT * recvbuf = new LIT[thisrecv];
2674  totrecv += thisrecv;
2675 
2676  MPI_Alltoallv(sendbuf, sendcnt, sdispls, MPIType<LIT>(), recvbuf, recvcnt, rdispls, MPIType<LIT>(), commGrid->GetWorld());
2677  DeleteAll(sendcnt, recvcnt, sdispls, rdispls,sendbuf);
2678  std::copy (recvbuf,recvbuf+thisrecv,std::back_inserter(alledges)); // copy to all edges
2679  delete [] recvbuf;
2680  }
2681 
2682  int myprocrow = commGrid->GetRankInProcCol();
2683  int myproccol = commGrid->GetRankInProcRow();
2684  LIT locrows, loccols;
2685  if(myprocrow != gridrows-1) locrows = m_perproc;
2686  else locrows = DEL.getGlobalV() - myprocrow * m_perproc;
2687  if(myproccol != gridcols-1) loccols = n_perproc;
2688  else loccols = DEL.getGlobalV() - myproccol * n_perproc;
2689 
2690  SpTuples<LIT,NT> A(totrecv/2, locrows, loccols, alledges, removeloops); // alledges is empty upon return
2691  spSeq = new DER(A,false); // Convert SpTuples to DER
2692 }
2693 
2694 template <class IT, class NT, class DER>
2696 {
2697  MPI_Comm DiagWorld = commGrid->GetDiagWorld();
2698  IT totrem;
2699  IT removed = 0;
2700  if(DiagWorld != MPI_COMM_NULL) // Diagonal processors only
2701  {
2702  SpTuples<IT,NT> tuples(*spSeq);
2703  delete spSeq;
2704  removed = tuples.RemoveLoops();
2705  spSeq = new DER(tuples, false); // Convert to DER
2706  }
2707  MPI_Allreduce( &removed, & totrem, 1, MPIType<IT>(), MPI_SUM, commGrid->GetWorld());
2708  return totrem;
2709 }
2710 
2711 
2712 
2713 template <class IT, class NT, class DER>
2714 void SpParMat<IT,NT,DER>::AddLoops(NT loopval, bool replaceExisting)
2715 {
2716  MPI_Comm DiagWorld = commGrid->GetDiagWorld();
2717  if(DiagWorld != MPI_COMM_NULL) // Diagonal processors only
2718  {
2719  typedef typename DER::LocalIT LIT;
2720  SpTuples<LIT,NT> tuples(*spSeq);
2721  delete spSeq;
2722  tuples.AddLoops(loopval, replaceExisting);
2723  tuples.SortColBased();
2724  spSeq = new DER(tuples, false); // Convert to DER
2725  }
2726 }
2727 
2728 
2729 // Different values on the diagonal
2730 template <class IT, class NT, class DER>
2731 void SpParMat<IT,NT,DER>::AddLoops(FullyDistVec<IT,NT> loopvals, bool replaceExisting)
2732 {
2733 
2734 
2735  if(*loopvals.commGrid != *commGrid)
2736  {
2737  SpParHelper::Print("Grids are not comparable, SpParMat::AddLoops() fails!\n", commGrid->GetWorld());
2738  MPI_Abort(MPI_COMM_WORLD,GRIDMISMATCH);
2739  }
2740  if (getncol()!= loopvals.TotalLength())
2741  {
2742  SpParHelper::Print("The number of entries in loopvals is not equal to the number of diagonal entries.\n");
2743  MPI_Abort(MPI_COMM_WORLD,DIMMISMATCH);
2744  }
2745 
2746  // Gather data on the diagonal processor
2747  IT locsize = loopvals.LocArrSize();
2748  int rowProcs = commGrid->GetGridCols();
2749  std::vector<int> recvcnt(rowProcs, 0);
2750  std::vector<int> rdpls(rowProcs, 0);
2751  MPI_Gather(&locsize, 1, MPI_INT, recvcnt.data(), 1, MPI_INT, commGrid->GetDiagOfProcRow(), commGrid->GetRowWorld());
2752  std::partial_sum(recvcnt.data(), recvcnt.data()+rowProcs-1, rdpls.data()+1);
2753 
2754  IT totrecv = rdpls[rowProcs-1] + recvcnt[rowProcs-1];
2755  assert((totrecv < std::numeric_limits<int>::max()));
2756 
2757  std::vector<NT> rowvals(totrecv);
2758  MPI_Gatherv(loopvals.arr.data(), locsize, MPIType<NT>(), rowvals.data(), recvcnt.data(), rdpls.data(),
2759  MPIType<NT>(), commGrid->GetDiagOfProcRow(), commGrid->GetRowWorld());
2760 
2761 
2762  MPI_Comm DiagWorld = commGrid->GetDiagWorld();
2763  if(DiagWorld != MPI_COMM_NULL) // Diagonal processors only
2764  {
2765  typedef typename DER::LocalIT LIT;
2766  SpTuples<LIT,NT> tuples(*spSeq);
2767  delete spSeq;
2768  tuples.AddLoops(rowvals, replaceExisting);
2769  tuples.SortColBased();
2770  spSeq = new DER(tuples, false); // Convert to DER
2771  }
2772 }
2773 
2774 
2778 template <class IT, class NT, class DER>
2779 template <typename LIT, typename OT>
2781 {
2782  if(spSeq->getnsplit() > 0)
2783  {
2784  SpParHelper::Print("Can not declare preallocated buffers for multithreaded execution\n", commGrid->GetWorld());
2785  return;
2786  }
2787 
2788  typedef typename DER::LocalIT LocIT; // ABAB: should match the type of LIT. Check?
2789 
2790  // Set up communication buffers, one for all
2791  LocIT mA = spSeq->getnrow();
2792  LocIT nA = spSeq->getncol();
2793 
2794  int p_c = commGrid->GetGridCols();
2795  int p_r = commGrid->GetGridRows();
2796 
2797  LocIT rwperproc = mA / p_c; // per processors in row-wise communication
2798  LocIT cwperproc = nA / p_r; // per processors in column-wise communication
2799 
2800 #ifdef GATHERVOPT
2801  LocIT * colinds = seq->GetDCSC()->jc; // local nonzero column id's
2802  LocIT locnzc = seq->getnzc();
2803  LocIT cci = 0; // index to column id's array (cci: current column index)
2804  int * gsizes = NULL;
2805  IT * ents = NULL;
2806  IT * dpls = NULL;
2807  std::vector<LocIT> pack2send;
2808 
2809  FullyDistSpVec<IT,IT> dummyRHS ( commGrid, getncol()); // dummy RHS vector to estimate index start position
2810  IT recveclen;
2811 
2812  for(int pid = 1; pid <= p_r; pid++)
2813  {
2814  IT diagoffset;
2815  MPI_Status status;
2816  IT offset = dummyRHS.RowLenUntil(pid-1);
2817  int diagneigh = commGrid->GetComplementRank();
2818  MPI_Sendrecv(&offset, 1, MPIType<IT>(), diagneigh, TRTAGNZ, &diagoffset, 1, MPIType<IT>(), diagneigh, TRTAGNZ, commGrid->GetWorld(), &status);
2819 
2820  LocIT endind = (pid == p_r)? nA : static_cast<LocIT>(pid) * cwperproc; // the last one might have a larger share (is this fitting to the vector boundaries?)
2821  while(cci < locnzc && colinds[cci] < endind)
2822  {
2823  pack2send.push_back(colinds[cci++]-diagoffset);
2824  }
2825  if(pid-1 == myrank) gsizes = new int[p_r];
2826  MPI_Gather(&mysize, 1, MPI_INT, gsizes, 1, MPI_INT, pid-1, commGrid->GetColWorld());
2827  if(pid-1 == myrank)
2828  {
2829  IT colcnt = std::accumulate(gsizes, gsizes+p_r, static_cast<IT>(0));
2830  recvbuf = new IT[colcnt];
2831  dpls = new IT[p_r](); // displacements (zero initialized pid)
2832  std::partial_sum(gsizes, gsizes+p_r-1, dpls+1);
2833  }
2834 
2835  // int MPI_Gatherv (void* sbuf, int scount, MPI_Datatype stype, void* rbuf, int *rcount, int* displs, MPI_Datatype rtype, int root, MPI_Comm comm)
2836  MPI_Gatherv(SpHelper::p2a(pack2send), mysize, MPIType<LocIT>(), recvbuf, gsizes, dpls, MPIType<LocIT>(), pid-1, commGrid->GetColWorld());
2837  std::vector<LocIT>().swap(pack2send);
2838 
2839  if(pid-1 == myrank)
2840  {
2841  recveclen = dummyRHS.MyLocLength();
2842  std::vector< std::vector<LocIT> > service(recveclen);
2843  for(int i=0; i< p_r; ++i)
2844  {
2845  for(int j=0; j< gsizes[i]; ++j)
2846  {
2847  IT colid2update = recvbuf[dpls[i]+j];
2848  if(service[colid2update].size() < GATHERVNEIGHLIMIT)
2849  {
2850  service.push_back(i);
2851  }
2852  // else don't increase any further and mark it done after the iterations are complete
2853  }
2854  }
2855  }
2856  }
2857 #endif
2858 
2859 
2860  std::vector<bool> isthere(mA, false); // perhaps the only appropriate use of this crippled data structure
2861  std::vector<int> maxlens(p_c,0); // maximum data size to be sent to any neighbor along the processor row
2862 
2863  for(typename DER::SpColIter colit = spSeq->begcol(); colit != spSeq->endcol(); ++colit)
2864  {
2865  for(typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit); nzit != spSeq->endnz(colit); ++nzit)
2866  {
2867  LocIT rowid = nzit.rowid();
2868  if(!isthere[rowid])
2869  {
2870  LocIT owner = std::min(nzit.rowid() / rwperproc, (LocIT) p_c-1);
2871  maxlens[owner]++;
2872  isthere[rowid] = true;
2873  }
2874  }
2875  }
2876  SpParHelper::Print("Optimization buffers set\n", commGrid->GetWorld());
2877  optbuf.Set(maxlens,mA);
2878 }
2879 
2880 template <class IT, class NT, class DER>
2882 {
2883  spSeq->RowSplit(numsplits);
2884 }
2885 
2886 
2891 template <class IT, class NT, class DER>
2892 template <typename SR>
2894 {
2895  int stages, dummy; // last two parameters of productgrid are ignored for synchronous multiplication
2896  std::shared_ptr<CommGrid> Grid = ProductGrid(commGrid.get(), commGrid.get(), stages, dummy, dummy);
2897 
2898  typedef typename DER::LocalIT LIT;
2899 
2900  LIT AA_m = spSeq->getnrow();
2901  LIT AA_n = spSeq->getncol();
2902 
2903  DER seqTrn = spSeq->TransposeConst(); // will be automatically discarded after going out of scope
2904 
2905  MPI_Barrier(commGrid->GetWorld());
2906 
2907  LIT ** NRecvSizes = SpHelper::allocate2D<LIT>(DER::esscount, stages);
2908  LIT ** TRecvSizes = SpHelper::allocate2D<LIT>(DER::esscount, stages);
2909 
2910  SpParHelper::GetSetSizes( *spSeq, NRecvSizes, commGrid->GetRowWorld());
2911  SpParHelper::GetSetSizes( seqTrn, TRecvSizes, commGrid->GetColWorld());
2912 
2913  // Remotely fetched matrices are stored as pointers
2914  DER * NRecv;
2915  DER * TRecv;
2916  std::vector< SpTuples<LIT,NT> *> tomerge;
2917 
2918  int Nself = commGrid->GetRankInProcRow();
2919  int Tself = commGrid->GetRankInProcCol();
2920 
2921  for(int i = 0; i < stages; ++i)
2922  {
2923  std::vector<LIT> ess;
2924  if(i == Nself) NRecv = spSeq; // shallow-copy
2925  else
2926  {
2927  ess.resize(DER::esscount);
2928  for(int j=0; j< DER::esscount; ++j)
2929  ess[j] = NRecvSizes[j][i]; // essentials of the ith matrix in this row
2930  NRecv = new DER(); // first, create the object
2931  }
2932 
2933  SpParHelper::BCastMatrix(Grid->GetRowWorld(), *NRecv, ess, i); // then, broadcast its elements
2934  ess.clear();
2935 
2936  if(i == Tself) TRecv = &seqTrn; // shallow-copy
2937  else
2938  {
2939  ess.resize(DER::esscount);
2940  for(int j=0; j< DER::esscount; ++j)
2941  ess[j] = TRecvSizes[j][i];
2942  TRecv = new DER();
2943  }
2944  SpParHelper::BCastMatrix(Grid->GetColWorld(), *TRecv, ess, i);
2945 
2946  SpTuples<LIT,NT> * AA_cont = MultiplyReturnTuples<SR, NT>(*NRecv, *TRecv, false, true);
2947  if(!AA_cont->isZero())
2948  tomerge.push_back(AA_cont);
2949 
2950  if(i != Nself) delete NRecv;
2951  if(i != Tself) delete TRecv;
2952  }
2953 
2954  SpHelper::deallocate2D(NRecvSizes, DER::esscount);
2955  SpHelper::deallocate2D(TRecvSizes, DER::esscount);
2956 
2957  delete spSeq;
2958  spSeq = new DER(MergeAll<SR>(tomerge, AA_m, AA_n), false); // First get the result in SpTuples, then convert to UDER
2959  for(unsigned int i=0; i<tomerge.size(); ++i)
2960  delete tomerge[i];
2961 }
2962 
2963 
2964 template <class IT, class NT, class DER>
2966 {
2967  if(commGrid->myproccol == commGrid->myprocrow) // Diagonal
2968  {
2969  spSeq->Transpose();
2970  }
2971  else
2972  {
2973  typedef typename DER::LocalIT LIT;
2974  SpTuples<LIT,NT> Atuples(*spSeq);
2975  LIT locnnz = Atuples.getnnz();
2976  LIT * rows = new LIT[locnnz];
2977  LIT * cols = new LIT[locnnz];
2978  NT * vals = new NT[locnnz];
2979  for(LIT i=0; i < locnnz; ++i)
2980  {
2981  rows[i] = Atuples.colindex(i); // swap (i,j) here
2982  cols[i] = Atuples.rowindex(i);
2983  vals[i] = Atuples.numvalue(i);
2984  }
2985  LIT locm = getlocalcols();
2986  LIT locn = getlocalrows();
2987  delete spSeq;
2988 
2989  LIT remotem, remoten, remotennz;
2990  std::swap(locm,locn);
2991  int diagneigh = commGrid->GetComplementRank();
2992 
2993  MPI_Status status;
2994  MPI_Sendrecv(&locnnz, 1, MPIType<LIT>(), diagneigh, TRTAGNZ, &remotennz, 1, MPIType<LIT>(), diagneigh, TRTAGNZ, commGrid->GetWorld(), &status);
2995  MPI_Sendrecv(&locn, 1, MPIType<LIT>(), diagneigh, TRTAGM, &remotem, 1, MPIType<LIT>(), diagneigh, TRTAGM, commGrid->GetWorld(), &status);
2996  MPI_Sendrecv(&locm, 1, MPIType<LIT>(), diagneigh, TRTAGN, &remoten, 1, MPIType<LIT>(), diagneigh, TRTAGN, commGrid->GetWorld(), &status);
2997 
2998  LIT * rowsrecv = new LIT[remotennz];
2999  MPI_Sendrecv(rows, locnnz, MPIType<LIT>(), diagneigh, TRTAGROWS, rowsrecv, remotennz, MPIType<LIT>(), diagneigh, TRTAGROWS, commGrid->GetWorld(), &status);
3000  delete [] rows;
3001 
3002  LIT * colsrecv = new LIT[remotennz];
3003  MPI_Sendrecv(cols, locnnz, MPIType<LIT>(), diagneigh, TRTAGCOLS, colsrecv, remotennz, MPIType<LIT>(), diagneigh, TRTAGCOLS, commGrid->GetWorld(), &status);
3004  delete [] cols;
3005 
3006  NT * valsrecv = new NT[remotennz];
3007  MPI_Sendrecv(vals, locnnz, MPIType<NT>(), diagneigh, TRTAGVALS, valsrecv, remotennz, MPIType<NT>(), diagneigh, TRTAGVALS, commGrid->GetWorld(), &status);
3008  delete [] vals;
3009 
3010  std::tuple<LIT,LIT,NT> * arrtuples = new std::tuple<LIT,LIT,NT>[remotennz];
3011  for(LIT i=0; i< remotennz; ++i)
3012  {
3013  arrtuples[i] = std::make_tuple(rowsrecv[i], colsrecv[i], valsrecv[i]);
3014  }
3015  DeleteAll(rowsrecv, colsrecv, valsrecv);
3016  ColLexiCompare<LIT,NT> collexicogcmp;
3017  sort(arrtuples , arrtuples+remotennz, collexicogcmp ); // sort w.r.t columns here
3018 
3019  spSeq = new DER();
3020  spSeq->Create( remotennz, remotem, remoten, arrtuples); // the deletion of arrtuples[] is handled by SpMat::Create
3021  }
3022 }
3023 
3024 
3025 template <class IT, class NT, class DER>
3026 template <class HANDLER>
3027 void SpParMat< IT,NT,DER >::SaveGathered(std::string filename, HANDLER handler, bool transpose) const
3028 {
3029  int proccols = commGrid->GetGridCols();
3030  int procrows = commGrid->GetGridRows();
3031  IT totalm = getnrow();
3032  IT totaln = getncol();
3033  IT totnnz = getnnz();
3034  int flinelen = 0;
3035  std::ofstream out;
3036  if(commGrid->GetRank() == 0)
3037  {
3038  std::string s;
3039  std::stringstream strm;
3040  strm << "%%MatrixMarket matrix coordinate real general" << std::endl;
3041  strm << totalm << " " << totaln << " " << totnnz << std::endl;
3042  s = strm.str();
3043  out.open(filename.c_str(),std::ios_base::trunc);
3044  flinelen = s.length();
3045  out.write(s.c_str(), flinelen);
3046  out.close();
3047  }
3048  int colrank = commGrid->GetRankInProcCol();
3049  int colneighs = commGrid->GetGridRows();
3050  IT * locnrows = new IT[colneighs]; // number of rows is calculated by a reduction among the processor column
3051  locnrows[colrank] = (IT) getlocalrows();
3052  MPI_Allgather(MPI_IN_PLACE, 0, MPIType<IT>(),locnrows, 1, MPIType<IT>(), commGrid->GetColWorld());
3053  IT roffset = std::accumulate(locnrows, locnrows+colrank, 0);
3054  delete [] locnrows;
3055 
3056  MPI_Datatype datatype;
3057  MPI_Type_contiguous(sizeof(std::pair<IT,NT>), MPI_CHAR, &datatype);
3058  MPI_Type_commit(&datatype);
3059 
3060  for(int i = 0; i < procrows; i++) // for all processor row (in order)
3061  {
3062  if(commGrid->GetRankInProcCol() == i) // only the ith processor row
3063  {
3064  IT localrows = spSeq->getnrow(); // same along the processor row
3065  std::vector< std::vector< std::pair<IT,NT> > > csr(localrows);
3066  if(commGrid->GetRankInProcRow() == 0) // get the head of processor row
3067  {
3068  IT localcols = spSeq->getncol(); // might be different on the last processor on this processor row
3069  MPI_Bcast(&localcols, 1, MPIType<IT>(), 0, commGrid->GetRowWorld());
3070  for(typename DER::SpColIter colit = spSeq->begcol(); colit != spSeq->endcol(); ++colit) // iterate over nonempty subcolumns
3071  {
3072  for(typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit); nzit != spSeq->endnz(colit); ++nzit)
3073  {
3074  csr[nzit.rowid()].push_back( std::make_pair(colit.colid(), nzit.value()) );
3075  }
3076  }
3077  }
3078  else // get the rest of the processors
3079  {
3080  IT n_perproc;
3081  MPI_Bcast(&n_perproc, 1, MPIType<IT>(), 0, commGrid->GetRowWorld());
3082  IT noffset = commGrid->GetRankInProcRow() * n_perproc;
3083  for(typename DER::SpColIter colit = spSeq->begcol(); colit != spSeq->endcol(); ++colit) // iterate over nonempty subcolumns
3084  {
3085  for(typename DER::SpColIter::NzIter nzit = spSeq->begnz(colit); nzit != spSeq->endnz(colit); ++nzit)
3086  {
3087  csr[nzit.rowid()].push_back( std::make_pair(colit.colid() + noffset, nzit.value()) );
3088  }
3089  }
3090  }
3091  std::pair<IT,NT> * ents = NULL;
3092  int * gsizes = NULL, * dpls = NULL;
3093  if(commGrid->GetRankInProcRow() == 0) // only the head of processor row
3094  {
3095  out.open(filename.c_str(),std::ios_base::app);
3096  gsizes = new int[proccols];
3097  dpls = new int[proccols](); // displacements (zero initialized pid)
3098  }
3099  for(int j = 0; j < localrows; ++j)
3100  {
3101  IT rowcnt = 0;
3102  sort(csr[j].begin(), csr[j].end());
3103  int mysize = csr[j].size();
3104  MPI_Gather(&mysize, 1, MPI_INT, gsizes, 1, MPI_INT, 0, commGrid->GetRowWorld());
3105  if(commGrid->GetRankInProcRow() == 0)
3106  {
3107  rowcnt = std::accumulate(gsizes, gsizes+proccols, static_cast<IT>(0));
3108  std::partial_sum(gsizes, gsizes+proccols-1, dpls+1);
3109  ents = new std::pair<IT,NT>[rowcnt]; // nonzero entries in the j'th local row
3110  }
3111 
3112  // int MPI_Gatherv (void* sbuf, int scount, MPI_Datatype stype,
3113  // void* rbuf, int *rcount, int* displs, MPI_Datatype rtype, int root, MPI_Comm comm)
3114  MPI_Gatherv(SpHelper::p2a(csr[j]), mysize, datatype, ents, gsizes, dpls, datatype, 0, commGrid->GetRowWorld());
3115  if(commGrid->GetRankInProcRow() == 0)
3116  {
3117  for(int k=0; k< rowcnt; ++k)
3118  {
3119  //out << j + roffset + 1 << "\t" << ents[k].first + 1 <<"\t" << ents[k].second << endl;
3120  if (!transpose)
3121  // regular
3122  out << j + roffset + 1 << "\t" << ents[k].first + 1 << "\t";
3123  else
3124  // transpose row/column
3125  out << ents[k].first + 1 << "\t" << j + roffset + 1 << "\t";
3126  handler.save(out, ents[k].second, j + roffset, ents[k].first);
3127  out << std::endl;
3128  }
3129  delete [] ents;
3130  }
3131  }
3132  if(commGrid->GetRankInProcRow() == 0)
3133  {
3134  DeleteAll(gsizes, dpls);
3135  out.close();
3136  }
3137  } // end_if the ith processor row
3138  MPI_Barrier(commGrid->GetWorld()); // signal the end of ith processor row iteration (so that all processors block)
3139  }
3140 }
3141 
3142 
3145 template <class IT, class NT, class DER>
3146 MPI_File SpParMat< IT,NT,DER >::TupleRead1stPassNExchange (const std::string & filename, TYPE2SEND * & senddata, IT & totsend,
3147  FullyDistVec<IT,STRASARRAY> & distmapper, uint64_t & totallength)
3148 {
3149  int myrank = commGrid->GetRank();
3150  int nprocs = commGrid->GetSize();
3151 
3152  MPI_Offset fpos, end_fpos;
3153  struct stat st; // get file size
3154  if (stat(filename.c_str(), &st) == -1)
3155  {
3156  MPI_Abort(MPI_COMM_WORLD, NOFILE);
3157  }
3158  int64_t file_size = st.st_size;
3159  if(myrank == 0) // the offset needs to be for this rank
3160  {
3161  std::cout << "File is " << file_size << " bytes" << std::endl;
3162  }
3163  fpos = myrank * file_size / nprocs;
3164 
3165  if(myrank != (nprocs-1)) end_fpos = (myrank + 1) * file_size / nprocs;
3166  else end_fpos = file_size;
3167 
3168  MPI_File mpi_fh;
3169  MPI_File_open (commGrid->commWorld, const_cast<char*>(filename.c_str()), MPI_MODE_RDONLY, MPI_INFO_NULL, &mpi_fh);
3170 
3171  typedef std::map<std::string, uint64_t> KEYMAP; // due to potential (but extremely unlikely) collusions in MurmurHash, make the key to the std:map the string itself
3172  std::vector< KEYMAP > allkeys(nprocs); // map keeps the outgoing data unique, we could have applied this to HipMer too
3173 
3174  std::vector<std::string> lines;
3175  bool finished = SpParHelper::FetchBatch(mpi_fh, fpos, end_fpos, true, lines, myrank);
3176  int64_t entriesread = lines.size();
3177  SpHelper::ProcessLinesWithStringKeys(allkeys, lines,nprocs);
3178 
3179  while(!finished)
3180  {
3181  finished = SpParHelper::FetchBatch(mpi_fh, fpos, end_fpos, false, lines, myrank);
3182 
3183  entriesread += lines.size();
3184  SpHelper::ProcessLinesWithStringKeys(allkeys, lines,nprocs);
3185  }
3186  int64_t allentriesread;
3187  MPI_Reduce(&entriesread, &allentriesread, 1, MPIType<int64_t>(), MPI_SUM, 0, commGrid->commWorld);
3188 #ifdef COMBBLAS_DEBUG
3189  if(myrank == 0)
3190  std::cout << "Initial reading finished. Total number of entries read across all processors is " << allentriesread << std::endl;
3191 #endif
3192 
3193  int * sendcnt = new int[nprocs];
3194  int * recvcnt = new int[nprocs];
3195  for(int i=0; i<nprocs; ++i)
3196  sendcnt[i] = allkeys[i].size();
3197 
3198  MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, commGrid->GetWorld()); // share the counts
3199  int * sdispls = new int[nprocs]();
3200  int * rdispls = new int[nprocs]();
3201  std::partial_sum(sendcnt, sendcnt+nprocs-1, sdispls+1);
3202  std::partial_sum(recvcnt, recvcnt+nprocs-1, rdispls+1);
3203  totsend = std::accumulate(sendcnt,sendcnt+nprocs, static_cast<IT>(0));
3204  IT totrecv = std::accumulate(recvcnt,recvcnt+nprocs, static_cast<IT>(0));
3205 
3206  assert((totsend < std::numeric_limits<int>::max()));
3207  assert((totrecv < std::numeric_limits<int>::max()));
3208 
3209  // The following are declared in SpParMat.h
3210  // typedef std::array<char, MAXVERTNAME> STRASARRAY;
3211  // typedef std::pair< STRASARRAY, uint64_t> TYPE2SEND;
3212  senddata = new TYPE2SEND[totsend];
3213 
3214  #pragma omp parallel for
3215  for(int i=0; i<nprocs; ++i)
3216  {
3217  size_t j = 0;
3218  for(auto pobj:allkeys[i])
3219  {
3220  // The naked C-style array type is not copyable or assignable, but pair will require it, hence used std::array
3221  std::array<char, MAXVERTNAME> vname;
3222  std::copy( pobj.first.begin(), pobj.first.end(), vname.begin() );
3223  if(pobj.first.length() < MAXVERTNAME) vname[pobj.first.length()] = '\0'; // null termination
3224 
3225  senddata[sdispls[i]+j] = TYPE2SEND(vname, pobj.second);
3226  j++;
3227  }
3228  }
3229  allkeys.clear(); // allkeys is no longer needed after this point
3230 
3231  MPI_Datatype MPI_HASH;
3232  MPI_Type_contiguous(sizeof(TYPE2SEND), MPI_CHAR, &MPI_HASH);
3233  MPI_Type_commit(&MPI_HASH);
3234 
3235  TYPE2SEND * recvdata = new TYPE2SEND[totrecv];
3236  MPI_Alltoallv(senddata, sendcnt, sdispls, MPI_HASH, recvdata, recvcnt, rdispls, MPI_HASH, commGrid->GetWorld());
3237  // do not delete send buffers yet as we will use them to recv back the data
3238 
3239  std::set< std::pair<uint64_t, std::string> > uniqsorted;
3240  for(IT i=0; i< totrecv; ++i)
3241  {
3242  auto locnull = std::find(recvdata[i].first.begin(), recvdata[i].first.end(), '\0'); // find the null character (or string::end)
3243  std::string strtmp(recvdata[i].first.begin(), locnull); // range constructor
3244 
3245  uniqsorted.insert(std::make_pair(recvdata[i].second, strtmp));
3246  }
3247  uint64_t uniqsize = uniqsorted.size();
3248 
3249 #ifdef COMBBLAS_DEBUG
3250  if(myrank == 0)
3251  std::cout << "out of " << totrecv << " vertices received, " << uniqsize << " were unique" << std::endl;
3252 #endif
3253  uint64_t sizeuntil = 0;
3254  totallength = 0;
3255  MPI_Exscan( &uniqsize, &sizeuntil, 1, MPIType<uint64_t>(), MPI_SUM, commGrid->GetWorld() );
3256  MPI_Allreduce(&uniqsize, &totallength, 1, MPIType<uint64_t>(), MPI_SUM, commGrid->GetWorld());
3257  if(myrank == 0) sizeuntil = 0; // because MPI_Exscan says the recvbuf in process 0 is undefined
3258 
3259  distmapper = FullyDistVec<IT,STRASARRAY>(commGrid, totallength,STRASARRAY{});
3260 
3261  // invindex does not conform to FullyDistVec boundaries, otherwise its contents are essentially the same as distmapper
3262  KEYMAP invindex; // KEYMAP is map<string, uint64_t>.
3263  uint64_t locindex = 0;
3264  std::vector< std::vector< IT > > locs_send(nprocs);
3265  std::vector< std::vector< std::string > > data_send(nprocs);
3266  int * map_scnt = new int[nprocs](); // send counts for this map only (to no confuse with the other sendcnt)
3267  for(auto itr = uniqsorted.begin(); itr != uniqsorted.end(); ++itr)
3268  {
3269  uint64_t globalindex = sizeuntil + locindex;
3270  invindex.insert(std::make_pair(itr->second, globalindex));
3271 
3272  IT newlocid;
3273  int owner = distmapper.Owner(globalindex, newlocid);
3274 
3275  //if(myrank == 0)
3276  // std::cout << "invindex received " << itr->second << " with global index " << globalindex << " to be owned by " << owner << " with index " << newlocid << std::endl;
3277 
3278  locs_send[owner].push_back(newlocid);
3279  data_send[owner].push_back(itr->second);
3280  map_scnt[owner]++;
3281  locindex++;
3282  }
3283  uniqsorted.clear(); // clear memory
3284 
3285 
3286  /* BEGIN: Redistributing the permutation vector to fit the FullyDistVec semantics */
3287  SpParHelper::ReDistributeToVector(map_scnt, locs_send, data_send, distmapper.arr, commGrid->GetWorld()); // map_scnt is deleted here
3288  /* END: Redistributing the permutation vector to fit the FullyDistVec semantics */
3289 
3290  for(IT i=0; i< totrecv; ++i)
3291  {
3292  auto locnull = std::find(recvdata[i].first.begin(), recvdata[i].first.end(), '\0');
3293  std::string searchstr(recvdata[i].first.begin(), locnull); // range constructor
3294 
3295  auto resp = invindex.find(searchstr); // recvdata[i] is of type pair< STRASARRAY, uint64_t>
3296  if (resp != invindex.end())
3297  {
3298  recvdata[i].second = resp->second; // now instead of random numbers, recvdata's second entry will be its new index
3299  }
3300  else
3301  std::cout << "Assertion failed at proc " << myrank << ": the absence of the entry in invindex is unexpected!!!" << std::endl;
3302  }
3303  MPI_Alltoallv(recvdata, recvcnt, rdispls, MPI_HASH, senddata, sendcnt, sdispls, MPI_HASH, commGrid->GetWorld());
3304  DeleteAll(recvdata, sendcnt, recvcnt, sdispls, rdispls);
3305  MPI_Type_free(&MPI_HASH);
3306 
3307  // the following gets deleted here: allkeys
3308  return mpi_fh;
3309 }
3310 
3311 
3312 
3317 template <class IT, class NT, class DER>
3318 template <typename _BinaryOperation>
3320 {
3321  int myrank = commGrid->GetRank();
3322  int nprocs = commGrid->GetSize();
3323  TYPE2SEND * senddata;
3324  IT totsend;
3325  uint64_t totallength;
3326  FullyDistVec<IT,STRASARRAY> distmapper(commGrid); // choice of array<char, MAXVERTNAME> over string = array is required to be a contiguous container and an aggregate
3327 
3328  MPI_File mpi_fh = TupleRead1stPassNExchange(filename, senddata, totsend, distmapper, totallength);
3329 
3330  typedef std::map<std::string, uint64_t> KEYMAP;
3331  KEYMAP ultimateperm; // the ultimate permutation
3332  for(IT i=0; i< totsend; ++i)
3333  {
3334  auto locnull = std::find(senddata[i].first.begin(), senddata[i].first.end(), '\0');
3335 
3336  std::string searchstr(senddata[i].first.begin(), locnull);
3337  auto ret = ultimateperm.emplace(std::make_pair(searchstr, senddata[i].second));
3338  if(!ret.second) // the second is the boolean that tells success
3339  {
3340  // remember, we only sent unique vertex ids in the first place so we are expecting unique values in return
3341  std::cout << "the duplication in ultimateperm is unexpected!!!" << std::endl;
3342  }
3343  }
3344  delete [] senddata;
3345 
3346  // rename the data now, first reset file pointers
3347  MPI_Offset fpos, end_fpos;
3348  struct stat st; // get file size
3349  if (stat(filename.c_str(), &st) == -1)
3350  {
3351  MPI_Abort(MPI_COMM_WORLD, NOFILE);
3352  }
3353  int64_t file_size = st.st_size;
3354 
3355  fpos = myrank * file_size / nprocs;
3356 
3357  if(myrank != (nprocs-1)) end_fpos = (myrank + 1) * file_size / nprocs;
3358  else end_fpos = file_size;
3359 
3360  typedef typename DER::LocalIT LIT;
3361  std::vector<LIT> rows;
3362  std::vector<LIT> cols;
3363  std::vector<NT> vals;
3364 
3365  std::vector<std::string> lines;
3366  bool finished = SpParHelper::FetchBatch(mpi_fh, fpos, end_fpos, true, lines, myrank);
3367  int64_t entriesread = lines.size();
3368 
3369  SpHelper::ProcessStrLinesNPermute(rows, cols, vals, lines, ultimateperm);
3370 
3371  while(!finished)
3372  {
3373  finished = SpParHelper::FetchBatch(mpi_fh, fpos, end_fpos, false, lines, myrank);
3374  entriesread += lines.size();
3375  SpHelper::ProcessStrLinesNPermute(rows, cols, vals, lines, ultimateperm);
3376  }
3377  int64_t allentriesread;
3378  MPI_Reduce(&entriesread, &allentriesread, 1, MPIType<int64_t>(), MPI_SUM, 0, commGrid->commWorld);
3379 #ifdef COMBBLAS_DEBUG
3380  if(myrank == 0)
3381  std::cout << "Second reading finished. Total number of entries read across all processors is " << allentriesread << std::endl;
3382 #endif
3383 
3384  MPI_File_close(&mpi_fh);
3385  std::vector< std::vector < std::tuple<LIT,LIT,NT> > > data(nprocs);
3386 
3387  LIT locsize = rows.size(); // remember: locsize != entriesread (unless the matrix is unsymmetric)
3388  for(LIT i=0; i<locsize; ++i)
3389  {
3390  LIT lrow, lcol;
3391  int owner = Owner(totallength, totallength, rows[i], cols[i], lrow, lcol);
3392  data[owner].push_back(std::make_tuple(lrow,lcol,vals[i]));
3393  }
3394  std::vector<LIT>().swap(rows);
3395  std::vector<LIT>().swap(cols);
3396  std::vector<NT>().swap(vals);
3397 
3398 #ifdef COMBBLAS_DEBUG
3399  if(myrank == 0)
3400  std::cout << "Packing to recipients finished, about to send..." << std::endl;
3401 #endif
3402 
3403  if(spSeq) delete spSeq;
3404  SparseCommon(data, locsize, totallength, totallength, BinOp);
3405  // PrintInfo();
3406  // distmapper.ParallelWrite("distmapper.mtx", 1, CharArraySaveHandler());
3407  return distmapper;
3408 }
3409 
3410 
3411 
3415 template <class IT, class NT, class DER>
3416 template <typename _BinaryOperation>
3417 void SpParMat< IT,NT,DER >::ParallelReadMM (const std::string & filename, bool onebased, _BinaryOperation BinOp)
3418 {
3419  int32_t type = -1;
3420  int32_t symmetric = 0;
3421  int64_t nrows, ncols, nonzeros;
3422  int64_t linesread = 0;
3423 
3424  FILE *f;
3425  int myrank = commGrid->GetRank();
3426  int nprocs = commGrid->GetSize();
3427  if(myrank == 0)
3428  {
3429  MM_typecode matcode;
3430  if ((f = fopen(filename.c_str(), "r")) == NULL)
3431  {
3432  printf("COMBBLAS: Matrix-market file %s can not be found\n", filename.c_str());
3433  MPI_Abort(MPI_COMM_WORLD, NOFILE);
3434  }
3435  if (mm_read_banner(f, &matcode) != 0)
3436  {
3437  printf("Could not process Matrix Market banner.\n");
3438  exit(1);
3439  }
3440  linesread++;
3441 
3442  if (mm_is_complex(matcode))
3443  {
3444  printf("Sorry, this application does not support complext types");
3445  printf("Market Market type: [%s]\n", mm_typecode_to_str(matcode));
3446  }
3447  else if(mm_is_real(matcode))
3448  {
3449  std::cout << "Matrix is Float" << std::endl;
3450  type = 0;
3451  }
3452  else if(mm_is_integer(matcode))
3453  {
3454  std::cout << "Matrix is Integer" << std::endl;
3455  type = 1;
3456  }
3457  else if(mm_is_pattern(matcode))
3458  {
3459  std::cout << "Matrix is Boolean" << std::endl;
3460  type = 2;
3461  }
3462  if(mm_is_symmetric(matcode) || mm_is_hermitian(matcode))
3463  {
3464  std::cout << "Matrix is symmetric" << std::endl;
3465  symmetric = 1;
3466  }
3467  int ret_code;
3468  if ((ret_code = mm_read_mtx_crd_size(f, &nrows, &ncols, &nonzeros, &linesread)) !=0) // ABAB: mm_read_mtx_crd_size made 64-bit friendly
3469  exit(1);
3470 
3471  std::cout << "Total number of nonzeros expected across all processors is " << nonzeros << std::endl;
3472 
3473  }
3474  MPI_Bcast(&type, 1, MPI_INT, 0, commGrid->commWorld);
3475  MPI_Bcast(&symmetric, 1, MPI_INT, 0, commGrid->commWorld);
3476  MPI_Bcast(&nrows, 1, MPIType<int64_t>(), 0, commGrid->commWorld);
3477  MPI_Bcast(&ncols, 1, MPIType<int64_t>(), 0, commGrid->commWorld);
3478  MPI_Bcast(&nonzeros, 1, MPIType<int64_t>(), 0, commGrid->commWorld);
3479 
3480  // Use fseek again to go backwards two bytes and check that byte with fgetc
3481  struct stat st; // get file size
3482  if (stat(filename.c_str(), &st) == -1)
3483  {
3484  MPI_Abort(MPI_COMM_WORLD, NOFILE);
3485  }
3486  int64_t file_size = st.st_size;
3487  MPI_Offset fpos, end_fpos, endofheader;
3488  if(commGrid->GetRank() == 0) // the offset needs to be for this rank
3489  {
3490  std::cout << "File is " << file_size << " bytes" << std::endl;
3491  fpos = ftell(f);
3492  endofheader = fpos;
3493  MPI_Bcast(&endofheader, 1, MPIType<MPI_Offset>(), 0, commGrid->commWorld);
3494  fclose(f);
3495  }
3496  else
3497  {
3498  MPI_Bcast(&endofheader, 1, MPIType<MPI_Offset>(), 0, commGrid->commWorld); // receive the file loc at the end of header
3499  fpos = endofheader + myrank * (file_size-endofheader) / nprocs;
3500  }
3501  if(myrank != (nprocs-1)) end_fpos = endofheader + (myrank + 1) * (file_size-endofheader) / nprocs;
3502  else end_fpos = file_size;
3503 
3504  MPI_File mpi_fh;
3505  MPI_File_open (commGrid->commWorld, const_cast<char*>(filename.c_str()), MPI_MODE_RDONLY, MPI_INFO_NULL, &mpi_fh);
3506 
3507 
3508  typedef typename DER::LocalIT LIT;
3509  std::vector<LIT> rows;
3510  std::vector<LIT> cols;
3511  std::vector<NT> vals;
3512 
3513  std::vector<std::string> lines;
3514  bool finished = SpParHelper::FetchBatch(mpi_fh, fpos, end_fpos, true, lines, myrank);
3515  int64_t entriesread = lines.size();
3516  SpHelper::ProcessLines(rows, cols, vals, lines, symmetric, type, onebased);
3517  MPI_Barrier(commGrid->commWorld);
3518 
3519  while(!finished)
3520  {
3521  finished = SpParHelper::FetchBatch(mpi_fh, fpos, end_fpos, false, lines, myrank);
3522  entriesread += lines.size();
3523  SpHelper::ProcessLines(rows, cols, vals, lines, symmetric, type, onebased);
3524  }
3525  int64_t allentriesread;
3526  MPI_Reduce(&entriesread, &allentriesread, 1, MPIType<int64_t>(), MPI_SUM, 0, commGrid->commWorld);
3527 #ifdef COMBBLAS_DEBUG
3528  if(myrank == 0)
3529  std::cout << "Reading finished. Total number of entries read across all processors is " << allentriesread << std::endl;
3530 #endif
3531 
3532  std::vector< std::vector < std::tuple<LIT,LIT,NT> > > data(nprocs);
3533 
3534  LIT locsize = rows.size(); // remember: locsize != entriesread (unless the matrix is unsymmetric)
3535  for(LIT i=0; i<locsize; ++i)
3536  {
3537  LIT lrow, lcol;
3538  int owner = Owner(nrows, ncols, rows[i], cols[i], lrow, lcol);
3539  data[owner].push_back(std::make_tuple(lrow,lcol,vals[i]));
3540  }
3541  std::vector<LIT>().swap(rows);
3542  std::vector<LIT>().swap(cols);
3543  std::vector<NT>().swap(vals);
3544 
3545 #ifdef COMBBLAS_DEBUG
3546  if(myrank == 0)
3547  std::cout << "Packing to recepients finished, about to send..." << std::endl;
3548 #endif
3549 
3550  if(spSeq) delete spSeq;
3551  SparseCommon(data, locsize, nrows, ncols, BinOp);
3552 }
3553 
3554 
3558 template <class IT, class NT, class DER>
3559 template <class HANDLER>
3560 void SpParMat< IT,NT,DER >::ReadDistribute (const std::string & filename, int master, bool nonum, HANDLER handler, bool transpose, bool pario)
3561 {
3562 #ifdef TAU_PROFILE
3563  TAU_PROFILE_TIMER(rdtimer, "ReadDistribute", "void SpParMat::ReadDistribute (const string & , int, bool, HANDLER, bool)", TAU_DEFAULT);
3564  TAU_PROFILE_START(rdtimer);
3565 #endif
3566 
3567  std::ifstream infile;
3568  FILE * binfile = NULL; // points to "past header" if the file is binary
3569  int seeklength = 0;
3570  HeaderInfo hfile;
3571  if(commGrid->GetRank() == master) // 1 processor
3572  {
3573  hfile = ParseHeader(filename, binfile, seeklength);
3574  }
3575  MPI_Bcast(&seeklength, 1, MPI_INT, master, commGrid->commWorld);
3576 
3577  IT total_m, total_n, total_nnz;
3578  IT m_perproc = 0, n_perproc = 0;
3579 
3580  int colneighs = commGrid->GetGridRows(); // number of neighbors along this processor column (including oneself)
3581  int rowneighs = commGrid->GetGridCols(); // number of neighbors along this processor row (including oneself)
3582 
3583  IT buffpercolneigh = MEMORYINBYTES / (colneighs * (2 * sizeof(IT) + sizeof(NT)));
3584  IT buffperrowneigh = MEMORYINBYTES / (rowneighs * (2 * sizeof(IT) + sizeof(NT)));
3585  if(pario)
3586  {
3587  // since all colneighs will be reading the data at the same time
3588  // chances are they might all read the data that should go to one
3589  // in that case buffperrowneigh > colneighs * buffpercolneigh
3590  // in order not to overflow
3591  buffpercolneigh /= colneighs;
3592  if(seeklength == 0)
3593  SpParHelper::Print("COMBBLAS: Parallel I/O requested but binary header is corrupted\n", commGrid->GetWorld());
3594  }
3595 
3596  // make sure that buffperrowneigh >= buffpercolneigh to cover for this patological case:
3597  // -- all data received by a given column head (by vertical communication) are headed to a single processor along the row
3598  // -- then making sure buffperrowneigh >= buffpercolneigh guarantees that the horizontal buffer will never overflow
3599  buffperrowneigh = std::max(buffperrowneigh, buffpercolneigh);
3600  if(std::max(buffpercolneigh * colneighs, buffperrowneigh * rowneighs) > std::numeric_limits<int>::max())
3601  {
3602  SpParHelper::Print("COMBBLAS: MPI doesn't support sending int64_t send/recv counts or displacements\n", commGrid->GetWorld());
3603  }
3604 
3605  int * cdispls = new int[colneighs];
3606  for (IT i=0; i<colneighs; ++i) cdispls[i] = i*buffpercolneigh;
3607  int * rdispls = new int[rowneighs];
3608  for (IT i=0; i<rowneighs; ++i) rdispls[i] = i*buffperrowneigh;
3609 
3610  int *ccurptrs = NULL, *rcurptrs = NULL;
3611  int recvcount = 0;
3612  IT * rows = NULL;
3613  IT * cols = NULL;
3614  NT * vals = NULL;
3615 
3616  // Note: all other column heads that initiate the horizontal communication has the same "rankinrow" with the master
3617  int rankincol = commGrid->GetRankInProcCol(master); // get master's rank in its processor column
3618  int rankinrow = commGrid->GetRankInProcRow(master);
3619  std::vector< std::tuple<IT, IT, NT> > localtuples;
3620 
3621  if(commGrid->GetRank() == master) // 1 processor
3622  {
3623  if( !hfile.fileexists )
3624  {
3625  SpParHelper::Print( "COMBBLAS: Input file doesn't exist\n", commGrid->GetWorld());
3626  total_n = 0; total_m = 0;
3627  BcastEssentials(commGrid->commWorld, total_m, total_n, total_nnz, master);
3628  return;
3629  }
3630  if (hfile.headerexists && hfile.format == 1)
3631  {
3632  SpParHelper::Print("COMBBLAS: Ascii input with binary headers is not supported\n", commGrid->GetWorld());
3633  total_n = 0; total_m = 0;
3634  BcastEssentials(commGrid->commWorld, total_m, total_n, total_nnz, master);
3635  return;
3636  }
3637  if ( !hfile.headerexists ) // no header - ascii file (at this point, file exists)
3638  {
3639  infile.open(filename.c_str());
3640  char comment[256];
3641  infile.getline(comment,256);
3642  while(comment[0] == '%')
3643  {
3644  infile.getline(comment,256);
3645  }
3646  std::stringstream ss;
3647  ss << std::string(comment);
3648  ss >> total_m >> total_n >> total_nnz;
3649  if(pario)
3650  {
3651  SpParHelper::Print("COMBBLAS: Trying to read binary headerless file in parallel, aborting\n", commGrid->GetWorld());
3652  total_n = 0; total_m = 0;
3653  BcastEssentials(commGrid->commWorld, total_m, total_n, total_nnz, master);
3654  return;
3655  }
3656  }
3657  else // hfile.headerexists && hfile.format == 0
3658  {
3659  total_m = hfile.m;
3660  total_n = hfile.n;
3661  total_nnz = hfile.nnz;
3662  }
3663  m_perproc = total_m / colneighs;
3664  n_perproc = total_n / rowneighs;
3665  BcastEssentials(commGrid->commWorld, total_m, total_n, total_nnz, master);
3666  AllocateSetBuffers(rows, cols, vals, rcurptrs, ccurptrs, rowneighs, colneighs, buffpercolneigh);
3667 
3668  if(seeklength > 0 && pario) // sqrt(p) processors also do parallel binary i/o
3669  {
3670  IT entriestoread = total_nnz / colneighs;
3671  #ifdef IODEBUG
3672  std::ofstream oput;
3673  commGrid->OpenDebugFile("Read", oput);
3674  oput << "Total nnz: " << total_nnz << " entries to read: " << entriestoread << std::endl;
3675  oput.close();
3676  #endif
3677  ReadAllMine(binfile, rows, cols, vals, localtuples, rcurptrs, ccurptrs, rdispls, cdispls, m_perproc, n_perproc,
3678  rowneighs, colneighs, buffperrowneigh, buffpercolneigh, entriestoread, handler, rankinrow, transpose);
3679  }
3680  else // only this (master) is doing I/O (text or binary)
3681  {
3682  IT temprow, tempcol;
3683  NT tempval;
3684  IT ntrow = 0, ntcol = 0; // not transposed row and column index
3685  char line[1024];
3686  bool nonumline = nonum;
3687  IT cnz = 0;
3688  for(; cnz < total_nnz; ++cnz)
3689  {
3690  int colrec;
3691  size_t commonindex;
3692  std::stringstream linestream;
3693  if( (!hfile.headerexists) && (!infile.eof()))
3694  {
3695  // read one line at a time so that missing numerical values can be detected
3696  infile.getline(line, 1024);
3697  linestream << line;
3698  linestream >> temprow >> tempcol;
3699  if (!nonum)
3700  {
3701  // see if this line has a value
3702  linestream >> std::skipws;
3703  nonumline = linestream.eof();
3704  }
3705  --temprow; // file is 1-based where C-arrays are 0-based
3706  --tempcol;
3707  ntrow = temprow;
3708  ntcol = tempcol;
3709  }
3710  else if(hfile.headerexists && (!feof(binfile)) )
3711  {
3712  handler.binaryfill(binfile, temprow , tempcol, tempval);
3713  }
3714  if (transpose)
3715  {
3716  IT swap = temprow;
3717  temprow = tempcol;
3718  tempcol = swap;
3719  }
3720  colrec = std::min(static_cast<int>(temprow / m_perproc), colneighs-1); // precipient processor along the column
3721  commonindex = colrec * buffpercolneigh + ccurptrs[colrec];
3722 
3723  rows[ commonindex ] = temprow;
3724  cols[ commonindex ] = tempcol;
3725  if( (!hfile.headerexists) && (!infile.eof()))
3726  {
3727  vals[ commonindex ] = nonumline ? handler.getNoNum(ntrow, ntcol) : handler.read(linestream, ntrow, ntcol); //tempval;
3728  }
3729  else if(hfile.headerexists && (!feof(binfile)) )
3730  {
3731  vals[ commonindex ] = tempval;
3732  }
3733  ++ (ccurptrs[colrec]);
3734  if(ccurptrs[colrec] == buffpercolneigh || (cnz == (total_nnz-1)) ) // one buffer is full, or file is done !
3735  {
3736  MPI_Scatter(ccurptrs, 1, MPI_INT, &recvcount, 1, MPI_INT, rankincol, commGrid->colWorld); // first, send the receive counts
3737 
3738  // generate space for own recv data ... (use arrays because vector<bool> is cripled, if NT=bool)
3739  IT * temprows = new IT[recvcount];
3740  IT * tempcols = new IT[recvcount];
3741  NT * tempvals = new NT[recvcount];
3742 
3743  // then, send all buffers that to their recipients ...
3744  MPI_Scatterv(rows, ccurptrs, cdispls, MPIType<IT>(), temprows, recvcount, MPIType<IT>(), rankincol, commGrid->colWorld);
3745  MPI_Scatterv(cols, ccurptrs, cdispls, MPIType<IT>(), tempcols, recvcount, MPIType<IT>(), rankincol, commGrid->colWorld);
3746  MPI_Scatterv(vals, ccurptrs, cdispls, MPIType<NT>(), tempvals, recvcount, MPIType<NT>(), rankincol, commGrid->colWorld);
3747 
3748  std::fill_n(ccurptrs, colneighs, 0); // finally, reset current pointers !
3749  DeleteAll(rows, cols, vals);
3750 
3751  HorizontalSend(rows, cols, vals,temprows, tempcols, tempvals, localtuples, rcurptrs, rdispls,
3752  buffperrowneigh, rowneighs, recvcount, m_perproc, n_perproc, rankinrow);
3753 
3754  if( cnz != (total_nnz-1) ) // otherwise the loop will exit with noone to claim memory back
3755  {
3756  // reuse these buffers for the next vertical communication
3757  rows = new IT [ buffpercolneigh * colneighs ];
3758  cols = new IT [ buffpercolneigh * colneighs ];
3759  vals = new NT [ buffpercolneigh * colneighs ];
3760  }
3761  } // end_if for "send buffer is full" case
3762  } // end_for for "cnz < entriestoread" case
3763  assert (cnz == total_nnz);
3764 
3765  // Signal the end of file to other processors along the column
3766  std::fill_n(ccurptrs, colneighs, std::numeric_limits<int>::max());
3767  MPI_Scatter(ccurptrs, 1, MPI_INT, &recvcount, 1, MPI_INT, rankincol, commGrid->colWorld);
3768 
3769  // And along the row ...
3770  std::fill_n(rcurptrs, rowneighs, std::numeric_limits<int>::max());
3771  MPI_Scatter(rcurptrs, 1, MPI_INT, &recvcount, 1, MPI_INT, rankinrow, commGrid->rowWorld);
3772  } // end of "else" (only one processor reads) block
3773  } // end_if for "master processor" case
3774  else if( commGrid->OnSameProcCol(master) ) // (r-1) processors
3775  {
3776  BcastEssentials(commGrid->commWorld, total_m, total_n, total_nnz, master);
3777  m_perproc = total_m / colneighs;
3778  n_perproc = total_n / rowneighs;
3779 
3780  if(seeklength > 0 && pario) // these processors also do parallel binary i/o
3781  {
3782  binfile = fopen(filename.c_str(), "rb");
3783  IT entrysize = handler.entrylength();
3784  int myrankincol = commGrid->GetRankInProcCol();
3785  IT perreader = total_nnz / colneighs;
3786  IT read_offset = entrysize * static_cast<IT>(myrankincol) * perreader + seeklength;
3787  IT entriestoread = perreader;
3788  if (myrankincol == colneighs-1)
3789  entriestoread = total_nnz - static_cast<IT>(myrankincol) * perreader;
3790  fseek(binfile, read_offset, SEEK_SET);
3791 
3792  #ifdef IODEBUG
3793  std::ofstream oput;
3794  commGrid->OpenDebugFile("Read", oput);
3795  oput << "Total nnz: " << total_nnz << " OFFSET : " << read_offset << " entries to read: " << entriestoread << std::endl;
3796  oput.close();
3797  #endif
3798 
3799  AllocateSetBuffers(rows, cols, vals, rcurptrs, ccurptrs, rowneighs, colneighs, buffpercolneigh);
3800  ReadAllMine(binfile, rows, cols, vals, localtuples, rcurptrs, ccurptrs, rdispls, cdispls, m_perproc, n_perproc,
3801  rowneighs, colneighs, buffperrowneigh, buffpercolneigh, entriestoread, handler, rankinrow, transpose);
3802  }
3803  else // only master does the I/O
3804  {
3805  while(total_n > 0 || total_m > 0) // otherwise input file does not exist !
3806  {
3807  // void MPI::Comm::Scatterv(const void* sendbuf, const int sendcounts[], const int displs[], const MPI::Datatype& sendtype,
3808  // void* recvbuf, int recvcount, const MPI::Datatype & recvtype, int root) const
3809  // The outcome is as if the root executed n send operations,
3810  // MPI_Send(sendbuf + displs[i] * extent(sendtype), sendcounts[i], sendtype, i, ...)
3811  // and each process executed a receive,
3812  // MPI_Recv(recvbuf, recvcount, recvtype, root, ...)
3813  // The send buffer is ignored for all nonroot processes.
3814 
3815  MPI_Scatter(ccurptrs, 1, MPI_INT, &recvcount, 1, MPI_INT, rankincol, commGrid->colWorld); // first receive the receive counts ...
3816  if( recvcount == std::numeric_limits<int>::max()) break;
3817 
3818  // create space for incoming data ...
3819  IT * temprows = new IT[recvcount];
3820  IT * tempcols = new IT[recvcount];
3821  NT * tempvals = new NT[recvcount];
3822 
3823  // receive actual data ... (first 4 arguments are ignored in the receiver side)
3824  MPI_Scatterv(rows, ccurptrs, cdispls, MPIType<IT>(), temprows, recvcount, MPIType<IT>(), rankincol, commGrid->colWorld);
3825  MPI_Scatterv(cols, ccurptrs, cdispls, MPIType<IT>(), tempcols, recvcount, MPIType<IT>(), rankincol, commGrid->colWorld);
3826  MPI_Scatterv(vals, ccurptrs, cdispls, MPIType<NT>(), tempvals, recvcount, MPIType<NT>(), rankincol, commGrid->colWorld);
3827 
3828  // now, send the data along the horizontal
3829  rcurptrs = new int[rowneighs];
3830  std::fill_n(rcurptrs, rowneighs, 0);
3831 
3832  // HorizontalSend frees the memory of temp_xxx arrays and then creates and frees memory of all the six arrays itself
3833  HorizontalSend(rows, cols, vals,temprows, tempcols, tempvals, localtuples, rcurptrs, rdispls,
3834  buffperrowneigh, rowneighs, recvcount, m_perproc, n_perproc, rankinrow);
3835  }
3836  }
3837 
3838  // Signal the end of file to other processors along the row
3839  std::fill_n(rcurptrs, rowneighs, std::numeric_limits<int>::max());
3840  MPI_Scatter(rcurptrs, 1, MPI_INT, &recvcount, 1, MPI_INT, rankinrow, commGrid->rowWorld);
3841  delete [] rcurptrs;
3842  }
3843  else // r * (s-1) processors that only participate in the horizontal communication step
3844  {
3845  BcastEssentials(commGrid->commWorld, total_m, total_n, total_nnz, master);
3846  m_perproc = total_m / colneighs;
3847  n_perproc = total_n / rowneighs;
3848  while(total_n > 0 || total_m > 0) // otherwise input file does not exist !
3849  {
3850  // receive the receive count
3851  MPI_Scatter(rcurptrs, 1, MPI_INT, &recvcount, 1, MPI_INT, rankinrow, commGrid->rowWorld);
3852  if( recvcount == std::numeric_limits<int>::max())
3853  break;
3854 
3855  // create space for incoming data ...
3856  IT * temprows = new IT[recvcount];
3857  IT * tempcols = new IT[recvcount];
3858  NT * tempvals = new NT[recvcount];
3859 
3860  MPI_Scatterv(rows, rcurptrs, rdispls, MPIType<IT>(), temprows, recvcount, MPIType<IT>(), rankinrow, commGrid->rowWorld);
3861  MPI_Scatterv(cols, rcurptrs, rdispls, MPIType<IT>(), tempcols, recvcount, MPIType<IT>(), rankinrow, commGrid->rowWorld);
3862  MPI_Scatterv(vals, rcurptrs, rdispls, MPIType<NT>(), tempvals, recvcount, MPIType<NT>(), rankinrow, commGrid->rowWorld);
3863 
3864  // now push what is ours to tuples
3865  IT moffset = commGrid->myprocrow * m_perproc;
3866  IT noffset = commGrid->myproccol * n_perproc;
3867 
3868  for(IT i=0; i< recvcount; ++i)
3869  {
3870  localtuples.push_back( std::make_tuple(temprows[i]-moffset, tempcols[i]-noffset, tempvals[i]) );
3871  }
3872  DeleteAll(temprows,tempcols,tempvals);
3873  }
3874  }
3875  DeleteAll(cdispls, rdispls);
3876  std::tuple<IT,IT,NT> * arrtuples = new std::tuple<IT,IT,NT>[localtuples.size()]; // the vector will go out of scope, make it stick !
3877  std::copy(localtuples.begin(), localtuples.end(), arrtuples);
3878 
3879  IT localm = (commGrid->myprocrow != (commGrid->grrows-1))? m_perproc: (total_m - (m_perproc * (commGrid->grrows-1)));
3880  IT localn = (commGrid->myproccol != (commGrid->grcols-1))? n_perproc: (total_n - (n_perproc * (commGrid->grcols-1)));
3881  spSeq->Create( localtuples.size(), localm, localn, arrtuples); // the deletion of arrtuples[] is handled by SpMat::Create
3882 
3883 #ifdef TAU_PROFILE
3884  TAU_PROFILE_STOP(rdtimer);
3885 #endif
3886  return;
3887 }
3888 
3889 template <class IT, class NT, class DER>
3890 void SpParMat<IT,NT,DER>::AllocateSetBuffers(IT * & rows, IT * & cols, NT * & vals, int * & rcurptrs, int * & ccurptrs, int rowneighs, int colneighs, IT buffpercolneigh)
3891 {
3892  // allocate buffers on the heap as stack space is usually limited
3893  rows = new IT [ buffpercolneigh * colneighs ];
3894  cols = new IT [ buffpercolneigh * colneighs ];
3895  vals = new NT [ buffpercolneigh * colneighs ];
3896 
3897  ccurptrs = new int[colneighs];
3898  rcurptrs = new int[rowneighs];
3899  std::fill_n(ccurptrs, colneighs, 0); // fill with zero
3900  std::fill_n(rcurptrs, rowneighs, 0);
3901 }
3902 
3903 template <class IT, class NT, class DER>
3904 void SpParMat<IT,NT,DER>::BcastEssentials(MPI_Comm & world, IT & total_m, IT & total_n, IT & total_nnz, int master)
3905 {
3906  MPI_Bcast(&total_m, 1, MPIType<IT>(), master, world);
3907  MPI_Bcast(&total_n, 1, MPIType<IT>(), master, world);
3908  MPI_Bcast(&total_nnz, 1, MPIType<IT>(), master, world);
3909 }
3910 
3911 /*
3912  * @post {rows, cols, vals are pre-allocated on the heap after this call}
3913  * @post {ccurptrs are set to zero; so that if another call is made to this function without modifying ccurptrs, no data will be send from this procesor}
3914  */
3915 template <class IT, class NT, class DER>
3916 void SpParMat<IT,NT,DER>::VerticalSend(IT * & rows, IT * & cols, NT * & vals, std::vector< std::tuple<IT,IT,NT> > & localtuples, int * rcurptrs, int * ccurptrs, int * rdispls, int * cdispls,
3917  IT m_perproc, IT n_perproc, int rowneighs, int colneighs, IT buffperrowneigh, IT buffpercolneigh, int rankinrow)
3918 {
3919  // first, send/recv the counts ...
3920  int * colrecvdispls = new int[colneighs];
3921  int * colrecvcounts = new int[colneighs];
3922  MPI_Alltoall(ccurptrs, 1, MPI_INT, colrecvcounts, 1, MPI_INT, commGrid->colWorld); // share the request counts
3923  int totrecv = std::accumulate(colrecvcounts,colrecvcounts+colneighs,0);
3924  colrecvdispls[0] = 0; // receive displacements are exact whereas send displacements have slack
3925  for(int i=0; i<colneighs-1; ++i)
3926  colrecvdispls[i+1] = colrecvdispls[i] + colrecvcounts[i];
3927 
3928  // generate space for own recv data ... (use arrays because vector<bool> is cripled, if NT=bool)
3929  IT * temprows = new IT[totrecv];
3930  IT * tempcols = new IT[totrecv];
3931  NT * tempvals = new NT[totrecv];
3932 
3933  // then, exchange all buffers that to their recipients ...
3934  MPI_Alltoallv(rows, ccurptrs, cdispls, MPIType<IT>(), temprows, colrecvcounts, colrecvdispls, MPIType<IT>(), commGrid->colWorld);
3935  MPI_Alltoallv(cols, ccurptrs, cdispls, MPIType<IT>(), tempcols, colrecvcounts, colrecvdispls, MPIType<IT>(), commGrid->colWorld);
3936  MPI_Alltoallv(vals, ccurptrs, cdispls, MPIType<NT>(), tempvals, colrecvcounts, colrecvdispls, MPIType<NT>(), commGrid->colWorld);
3937 
3938  // finally, reset current pointers !
3939  std::fill_n(ccurptrs, colneighs, 0);
3940  DeleteAll(colrecvdispls, colrecvcounts);
3941  DeleteAll(rows, cols, vals);
3942 
3943  // rcurptrs/rdispls are zero initialized scratch space
3944  HorizontalSend(rows, cols, vals,temprows, tempcols, tempvals, localtuples, rcurptrs, rdispls, buffperrowneigh, rowneighs, totrecv, m_perproc, n_perproc, rankinrow);
3945 
3946  // reuse these buffers for the next vertical communication
3947  rows = new IT [ buffpercolneigh * colneighs ];
3948  cols = new IT [ buffpercolneigh * colneighs ];
3949  vals = new NT [ buffpercolneigh * colneighs ];
3950 }
3951 
3952 
3959 template <class IT, class NT, class DER>
3960 template <class HANDLER>
3961 void SpParMat<IT,NT,DER>::ReadAllMine(FILE * binfile, IT * & rows, IT * & cols, NT * & vals, std::vector< std::tuple<IT,IT,NT> > & localtuples, int * rcurptrs, int * ccurptrs, int * rdispls, int * cdispls,
3962  IT m_perproc, IT n_perproc, int rowneighs, int colneighs, IT buffperrowneigh, IT buffpercolneigh, IT entriestoread, HANDLER handler, int rankinrow, bool transpose)
3963 {
3964  assert(entriestoread != 0);
3965  IT cnz = 0;
3966  IT temprow, tempcol;
3967  NT tempval;
3968  int finishedglobal = 1;
3969  while(cnz < entriestoread && !feof(binfile)) // this loop will execute at least once
3970  {
3971  handler.binaryfill(binfile, temprow , tempcol, tempval);
3972  if (transpose)
3973  {
3974  IT swap = temprow;
3975  temprow = tempcol;
3976  tempcol = swap;
3977  }
3978  int colrec = std::min(static_cast<int>(temprow / m_perproc), colneighs-1); // precipient processor along the column
3979  size_t commonindex = colrec * buffpercolneigh + ccurptrs[colrec];
3980  rows[ commonindex ] = temprow;
3981  cols[ commonindex ] = tempcol;
3982  vals[ commonindex ] = tempval;
3983  ++ (ccurptrs[colrec]);
3984  if(ccurptrs[colrec] == buffpercolneigh || (cnz == (entriestoread-1)) ) // one buffer is full, or this processor's share is done !
3985  {
3986  #ifdef IODEBUG
3987  std::ofstream oput;
3988  commGrid->OpenDebugFile("Read", oput);
3989  oput << "To column neighbors: ";
3990  std::copy(ccurptrs, ccurptrs+colneighs, std::ostream_iterator<int>(oput, " ")); oput << std::endl;
3991  oput.close();
3992  #endif
3993 
3994  VerticalSend(rows, cols, vals, localtuples, rcurptrs, ccurptrs, rdispls, cdispls, m_perproc, n_perproc,
3995  rowneighs, colneighs, buffperrowneigh, buffpercolneigh, rankinrow);
3996 
3997  if(cnz == (entriestoread-1)) // last execution of the outer loop
3998  {
3999  int finishedlocal = 1; // I am done, but let me check others
4000  MPI_Allreduce( &finishedlocal, &finishedglobal, 1, MPI_INT, MPI_BAND, commGrid->colWorld);
4001  while(!finishedglobal)
4002  {
4003  #ifdef DEBUG
4004  std::ofstream oput;
4005  commGrid->OpenDebugFile("Read", oput);
4006  oput << "To column neighbors: ";
4007  std::copy(ccurptrs, ccurptrs+colneighs, std::ostream_iterator<int>(oput, " ")); oput << std::endl;
4008  oput.close();
4009  #endif
4010 
4011  // postcondition of VerticalSend: ccurptrs are set to zero
4012  // if another call is made to this function without modifying ccurptrs, no data will be send from this procesor
4013  VerticalSend(rows, cols, vals, localtuples, rcurptrs, ccurptrs, rdispls, cdispls, m_perproc, n_perproc,
4014  rowneighs, colneighs, buffperrowneigh, buffpercolneigh, rankinrow);
4015 
4016  MPI_Allreduce( &finishedlocal, &finishedglobal, 1, MPI_INT, MPI_BAND, commGrid->colWorld);
4017  }
4018  }
4019  else // the other loop will continue executing
4020  {
4021  int finishedlocal = 0;
4022  MPI_Allreduce( &finishedlocal, &finishedglobal, 1, MPI_INT, MPI_BAND, commGrid->colWorld);
4023  }
4024  } // end_if for "send buffer is full" case
4025  ++cnz;
4026  }
4027 
4028  // signal the end to row neighbors
4029  std::fill_n(rcurptrs, rowneighs, std::numeric_limits<int>::max());
4030  int recvcount;
4031  MPI_Scatter(rcurptrs, 1, MPI_INT, &recvcount, 1, MPI_INT, rankinrow, commGrid->rowWorld);
4032 }
4033 
4034 
4041 template <class IT, class NT, class DER>
4042 void SpParMat<IT,NT,DER>::HorizontalSend(IT * & rows, IT * & cols, NT * & vals, IT * & temprows, IT * & tempcols, NT * & tempvals, std::vector < std::tuple <IT,IT,NT> > & localtuples,
4043  int * rcurptrs, int * rdispls, IT buffperrowneigh, int rowneighs, int recvcount, IT m_perproc, IT n_perproc, int rankinrow)
4044 {
4045  rows = new IT [ buffperrowneigh * rowneighs ];
4046  cols = new IT [ buffperrowneigh * rowneighs ];
4047  vals = new NT [ buffperrowneigh * rowneighs ];
4048 
4049  // prepare to send the data along the horizontal
4050  for(int i=0; i< recvcount; ++i)
4051  {
4052  int rowrec = std::min(static_cast<int>(tempcols[i] / n_perproc), rowneighs-1);
4053  rows[ rowrec * buffperrowneigh + rcurptrs[rowrec] ] = temprows[i];
4054  cols[ rowrec * buffperrowneigh + rcurptrs[rowrec] ] = tempcols[i];
4055  vals[ rowrec * buffperrowneigh + rcurptrs[rowrec] ] = tempvals[i];
4056  ++ (rcurptrs[rowrec]);
4057  }
4058 
4059  #ifdef IODEBUG
4060  std::ofstream oput;
4061  commGrid->OpenDebugFile("Read", oput);
4062  oput << "To row neighbors: ";
4063  std::copy(rcurptrs, rcurptrs+rowneighs, std::ostream_iterator<int>(oput, " ")); oput << std::endl;
4064  oput << "Row displacements were: ";
4065  std::copy(rdispls, rdispls+rowneighs, std::ostream_iterator<int>(oput, " ")); oput << std::endl;
4066  oput.close();
4067  #endif
4068 
4069  MPI_Scatter(rcurptrs, 1, MPI_INT, &recvcount, 1, MPI_INT, rankinrow, commGrid->rowWorld); // Send the receive counts for horizontal communication
4070 
4071  // the data is now stored in rows/cols/vals, can reset temporaries
4072  // sets size and capacity to new recvcount
4073  DeleteAll(temprows, tempcols, tempvals);
4074  temprows = new IT[recvcount];
4075  tempcols = new IT[recvcount];
4076  tempvals = new NT[recvcount];
4077 
4078  // then, send all buffers that to their recipients ...
4079  MPI_Scatterv(rows, rcurptrs, rdispls, MPIType<IT>(), temprows, recvcount, MPIType<IT>(), rankinrow, commGrid->rowWorld);
4080  MPI_Scatterv(cols, rcurptrs, rdispls, MPIType<IT>(), tempcols, recvcount, MPIType<IT>(), rankinrow, commGrid->rowWorld);
4081  MPI_Scatterv(vals, rcurptrs, rdispls, MPIType<NT>(), tempvals, recvcount, MPIType<NT>(), rankinrow, commGrid->rowWorld);
4082 
4083  // now push what is ours to tuples
4084  IT moffset = commGrid->myprocrow * m_perproc;
4085  IT noffset = commGrid->myproccol * n_perproc;
4086 
4087  for(int i=0; i< recvcount; ++i)
4088  {
4089  localtuples.push_back( std::make_tuple(temprows[i]-moffset, tempcols[i]-noffset, tempvals[i]) );
4090  }
4091 
4092  std::fill_n(rcurptrs, rowneighs, 0);
4093  DeleteAll(rows, cols, vals, temprows, tempcols, tempvals);
4094 }
4095 
4096 
4099 template <class IT, class NT, class DER>
4101 {
4102  if((*(distrows.commGrid) != *(distcols.commGrid)) || (*(distcols.commGrid) != *(distvals.commGrid)))
4103  {
4104  SpParHelper::Print("Grids are not comparable, Find() fails!", commGrid->GetWorld());
4105  MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
4106  }
4107  IT globallen = getnnz();
4108  SpTuples<IT,NT> Atuples(*spSeq);
4109 
4110  FullyDistVec<IT,IT> nrows ( distrows.commGrid, globallen, 0);
4111  FullyDistVec<IT,IT> ncols ( distcols.commGrid, globallen, 0);
4112  FullyDistVec<IT,NT> nvals ( distvals.commGrid, globallen, NT());
4113 
4114  IT prelen = Atuples.getnnz();
4115  //IT postlen = nrows.MyLocLength();
4116 
4117  int rank = commGrid->GetRank();
4118  int nprocs = commGrid->GetSize();
4119  IT * prelens = new IT[nprocs];
4120  prelens[rank] = prelen;
4121  MPI_Allgather(MPI_IN_PLACE, 0, MPIType<IT>(), prelens, 1, MPIType<IT>(), commGrid->GetWorld());
4122  IT prelenuntil = std::accumulate(prelens, prelens+rank, static_cast<IT>(0));
4123 
4124  int * sendcnt = new int[nprocs](); // zero initialize
4125  IT * rows = new IT[prelen];
4126  IT * cols = new IT[prelen];
4127  NT * vals = new NT[prelen];
4128 
4129  int rowrank = commGrid->GetRankInProcRow();
4130  int colrank = commGrid->GetRankInProcCol();
4131  int rowneighs = commGrid->GetGridCols();
4132  int colneighs = commGrid->GetGridRows();
4133  IT * locnrows = new IT[colneighs]; // number of rows is calculated by a reduction among the processor column
4134  IT * locncols = new IT[rowneighs];
4135  locnrows[colrank] = getlocalrows();
4136  locncols[rowrank] = getlocalcols();
4137 
4138  MPI_Allgather(MPI_IN_PLACE, 0, MPIType<IT>(),locnrows, 1, MPIType<IT>(), commGrid->GetColWorld());
4139  MPI_Allgather(MPI_IN_PLACE, 0, MPIType<IT>(),locncols, 1, MPIType<IT>(), commGrid->GetRowWorld());
4140 
4141  IT roffset = std::accumulate(locnrows, locnrows+colrank, static_cast<IT>(0));
4142  IT coffset = std::accumulate(locncols, locncols+rowrank, static_cast<IT>(0));
4143 
4144  DeleteAll(locnrows, locncols);
4145  for(int i=0; i< prelen; ++i)
4146  {
4147  IT locid; // ignore local id, data will come in order
4148  int owner = nrows.Owner(prelenuntil+i, locid);
4149  sendcnt[owner]++;
4150 
4151  rows[i] = Atuples.rowindex(i) + roffset; // need the global row index
4152  cols[i] = Atuples.colindex(i) + coffset; // need the global col index
4153  vals[i] = Atuples.numvalue(i);
4154  }
4155 
4156  int * recvcnt = new int[nprocs];
4157  MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, commGrid->GetWorld()); // get the recv counts
4158 
4159  int * sdpls = new int[nprocs](); // displacements (zero initialized pid)
4160  int * rdpls = new int[nprocs]();
4161  std::partial_sum(sendcnt, sendcnt+nprocs-1, sdpls+1);
4162  std::partial_sum(recvcnt, recvcnt+nprocs-1, rdpls+1);
4163 
4164  MPI_Alltoallv(rows, sendcnt, sdpls, MPIType<IT>(), SpHelper::p2a(nrows.arr), recvcnt, rdpls, MPIType<IT>(), commGrid->GetWorld());
4165  MPI_Alltoallv(cols, sendcnt, sdpls, MPIType<IT>(), SpHelper::p2a(ncols.arr), recvcnt, rdpls, MPIType<IT>(), commGrid->GetWorld());
4166  MPI_Alltoallv(vals, sendcnt, sdpls, MPIType<NT>(), SpHelper::p2a(nvals.arr), recvcnt, rdpls, MPIType<NT>(), commGrid->GetWorld());
4167 
4168  DeleteAll(sendcnt, recvcnt, sdpls, rdpls);
4169  DeleteAll(prelens, rows, cols, vals);
4170  distrows = nrows;
4171  distcols = ncols;
4172  distvals = nvals;
4173 }
4174 
4177 template <class IT, class NT, class DER>
4179 {
4180  if((*(distrows.commGrid) != *(distcols.commGrid)) )
4181  {
4182  SpParHelper::Print("Grids are not comparable, Find() fails!", commGrid->GetWorld());
4183  MPI_Abort(MPI_COMM_WORLD, GRIDMISMATCH);
4184  }
4185  IT globallen = getnnz();
4186  SpTuples<IT,NT> Atuples(*spSeq);
4187 
4188  FullyDistVec<IT,IT> nrows ( distrows.commGrid, globallen, 0);
4189  FullyDistVec<IT,IT> ncols ( distcols.commGrid, globallen, 0);
4190 
4191  IT prelen = Atuples.getnnz();
4192 
4193  int rank = commGrid->GetRank();
4194  int nprocs = commGrid->GetSize();
4195  IT * prelens = new IT[nprocs];
4196  prelens[rank] = prelen;
4197  MPI_Allgather(MPI_IN_PLACE, 0, MPIType<IT>(), prelens, 1, MPIType<IT>(), commGrid->GetWorld());
4198  IT prelenuntil = std::accumulate(prelens, prelens+rank, static_cast<IT>(0));
4199 
4200  int * sendcnt = new int[nprocs](); // zero initialize
4201  IT * rows = new IT[prelen];
4202  IT * cols = new IT[prelen];
4203  NT * vals = new NT[prelen];
4204 
4205  int rowrank = commGrid->GetRankInProcRow();
4206  int colrank = commGrid->GetRankInProcCol();
4207  int rowneighs = commGrid->GetGridCols();
4208  int colneighs = commGrid->GetGridRows();
4209  IT * locnrows = new IT[colneighs]; // number of rows is calculated by a reduction among the processor column
4210  IT * locncols = new IT[rowneighs];
4211  locnrows[colrank] = getlocalrows();
4212  locncols[rowrank] = getlocalcols();
4213 
4214  MPI_Allgather(MPI_IN_PLACE, 0, MPIType<IT>(),locnrows, 1, MPIType<IT>(), commGrid->GetColWorld());
4215  MPI_Allgather(MPI_IN_PLACE, 0, MPIType<IT>(),locncols, 1, MPIType<IT>(), commGrid->GetColWorld());
4216  IT roffset = std::accumulate(locnrows, locnrows+colrank, static_cast<IT>(0));
4217  IT coffset = std::accumulate(locncols, locncols+rowrank, static_cast<IT>(0));
4218 
4219  DeleteAll(locnrows, locncols);
4220  for(int i=0; i< prelen; ++i)
4221  {
4222  IT locid; // ignore local id, data will come in order
4223  int owner = nrows.Owner(prelenuntil+i, locid);
4224  sendcnt[owner]++;
4225 
4226  rows[i] = Atuples.rowindex(i) + roffset; // need the global row index
4227  cols[i] = Atuples.colindex(i) + coffset; // need the global col index
4228  }
4229 
4230  int * recvcnt = new int[nprocs];
4231  MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, commGrid->GetWorld()); // get the recv counts
4232 
4233  int * sdpls = new int[nprocs](); // displacements (zero initialized pid)
4234  int * rdpls = new int[nprocs]();
4235  std::partial_sum(sendcnt, sendcnt+nprocs-1, sdpls+1);
4236  std::partial_sum(recvcnt, recvcnt+nprocs-1, rdpls+1);
4237 
4238  MPI_Alltoallv(rows, sendcnt, sdpls, MPIType<IT>(), SpHelper::p2a(nrows.arr), recvcnt, rdpls, MPIType<IT>(), commGrid->GetWorld());
4239  MPI_Alltoallv(cols, sendcnt, sdpls, MPIType<IT>(), SpHelper::p2a(ncols.arr), recvcnt, rdpls, MPIType<IT>(), commGrid->GetWorld());
4240 
4241  DeleteAll(sendcnt, recvcnt, sdpls, rdpls);
4242  DeleteAll(prelens, rows, cols, vals);
4243  distrows = nrows;
4244  distcols = ncols;
4245 }
4246 
4247 template <class IT, class NT, class DER>
4248 std::ofstream& SpParMat<IT,NT,DER>::put(std::ofstream& outfile) const
4249 {
4250  outfile << (*spSeq) << std::endl;
4251  return outfile;
4252 }
4253 
4254 template <class IU, class NU, class UDER>
4255 std::ofstream& operator<<(std::ofstream& outfile, const SpParMat<IU, NU, UDER> & s)
4256 {
4257  return s.put(outfile) ; // use the right put() function
4258 
4259 }
4260 
4268 template <class IT, class NT,class DER>
4269 template <typename LIT>
4270 int SpParMat<IT,NT,DER>::Owner(IT total_m, IT total_n, IT grow, IT gcol, LIT & lrow, LIT & lcol) const
4271 {
4272  int procrows = commGrid->GetGridRows();
4273  int proccols = commGrid->GetGridCols();
4274  IT m_perproc = total_m / procrows;
4275  IT n_perproc = total_n / proccols;
4276 
4277  int own_procrow; // owner's processor row
4278  if(m_perproc != 0)
4279  {
4280  own_procrow = std::min(static_cast<int>(grow / m_perproc), procrows-1); // owner's processor row
4281  }
4282  else // all owned by the last processor row
4283  {
4284  own_procrow = procrows -1;
4285  }
4286  int own_proccol;
4287  if(n_perproc != 0)
4288  {
4289  own_proccol = std::min(static_cast<int>(gcol / n_perproc), proccols-1);
4290  }
4291  else
4292  {
4293  own_proccol = proccols-1;
4294  }
4295  lrow = grow - (own_procrow * m_perproc);
4296  lcol = gcol - (own_proccol * n_perproc);
4297  return commGrid->GetRank(own_procrow, own_proccol);
4298 }
4299 
4304 template <class IT, class NT,class DER>
4305 void SpParMat<IT,NT,DER>::GetPlaceInGlobalGrid(IT& rowOffset, IT& colOffset) const
4306 {
4307  IT total_rows = getnrow();
4308  IT total_cols = getncol();
4309 
4310  int procrows = commGrid->GetGridRows();
4311  int proccols = commGrid->GetGridCols();
4312  IT rows_perproc = total_rows / procrows;
4313  IT cols_perproc = total_cols / proccols;
4314 
4315  rowOffset = commGrid->GetRankInProcCol()*rows_perproc;
4316  colOffset = commGrid->GetRankInProcRow()*cols_perproc;
4317 }
4318 
4319 }
double B
#define mm_is_pattern(typecode)
Definition: mmio.h:40
Compute the minimum of two values.
Definition: Operations.h:172
#define MAXCOLUMNBATCH
#define TRX
Definition: SpDefs.h:102
int mm_read_mtx_crd_size(FILE *f, int64_t *M, int64_t *N, int64_t *nz, int64_t *numlinesread)
Compute the maximum of two values.
Definition: Operations.h:154
#define TRROWX
Definition: SpDefs.h:100
MPI_Datatype MPIType< int64_t >(void)
Definition: MPIType.cpp:64
int mm_read_banner(FILE *f, MM_typecode *matcode)
#define NOFILE
Definition: SpDefs.h:75
#define TROST
Definition: SpDefs.h:105
int size
IT AddLoops(NT loopval, bool replaceExisting=false)
Definition: SpTuples.h:107
IT getnnz() const
Definition: SpParMat.cpp:676
Dcsc< IU, typename promote_trait< NU1, NU2 >::T_promote > EWiseMult(const Dcsc< IU, NU1 > &A, const Dcsc< IU, NU2 > *B, bool exclude)
Definition: Friends.h:694
#define MAXVERTNAME
Definition: SpDefs.h:68
std::shared_ptr< CommGrid > getcommgrid() const
Definition: FullyDistVec.h:257
MPI_Datatype MPIType< uint64_t >(void)
Definition: MPIType.cpp:68
void DeleteAll(A arr1)
Definition: Deleter.h:48
shared_ptr< CommGrid > ProductGrid(CommGrid *gridA, CommGrid *gridB, int &innerdim, int &Aoffset, int &Boffset)
Definition: CommGrid.cpp:164
std::shared_ptr< CommGrid > getcommgrid() const
IT & colindex(IT i)
Definition: SpTuples.h:75
void RemoveDuplicates(BINFUNC BinOp)
Definition: SpTuples.cpp:244
NT & numvalue(IT i)
Definition: SpTuples.h:76
#define TRTAGROWS
Definition: SpDefs.h:92
#define TRTAGN
Definition: SpDefs.h:91
IT Count(_Predicate pred) const
Return the number of elements for which pred is true.
std::ofstream & put(std::ofstream &outfile) const
Definition: SpParMat.cpp:4248
#define TRNNZ
Definition: SpDefs.h:104
void Set(const std::vector< int > &maxsizes, int mA)
Definition: OptBuf.h:55
MPI_Datatype MPIType< double >(void)
Definition: MPIType.cpp:76
int64_t getGlobalV() const
Definition: DistEdgeList.h:95
double A
void iota(IT globalsize, NT first)
int64_t getnnz() const
Definition: SpTuples.h:269
HeaderInfo ParseHeader(const std::string &inputname, FILE *&f, int &seeklength)
Definition: FileHeader.h:52
#define MEMORYINBYTES
Definition: SpDefs.h:129
#define mm_is_real(typecode)
Definition: mmio.h:39
#define GRIDMISMATCH
Definition: SpDefs.h:72
long int64_t
Definition: compat.h:21
#define mm_is_hermitian(typecode)
Definition: mmio.h:46
#define TRTAGCOLS
Definition: SpDefs.h:93
IT getncol() const
Definition: SpParMat.cpp:694
IT & rowindex(IT i)
Definition: SpTuples.h:74
void TransposeVector(MPI_Comm &World, const FullyDistSpVec< IU, NV > &x, int32_t &trxlocnz, IU &lenuntil, int32_t *&trxinds, NV *&trxnums, bool indexisvalue)
Definition: ParFriends.h:1057
void PrintInfo() const
Definition: SpParMat.cpp:2387
#define TRI
Definition: SpDefs.h:103
void PrintInfo(std::string vectorname) const
#define mm_is_integer(typecode)
Definition: mmio.h:41
Collection of Generic Sequential Functions.
char MM_typecode[4]
Definition: mmio.h:16
void UpdateDense(DenseParMat< IT, NT > &rhs, _BinaryOperation __binary_op) const
Definition: SpParMat.cpp:2365
#define DIMMISMATCH
Definition: SpDefs.h:73
#define MEM_EFFICIENT_STAGES
Definition: SpDefs.h:67
Definition: CCGrid.h:4
SpParMat()
Deprecated. Don&#39;t call the default constructor.
Definition: SpParMat.cpp:89
void SortColBased()
Definition: SpTuples.h:96
std::shared_ptr< CommGrid > commGrid
Definition: DistEdgeList.h:99
NT Reduce(_BinaryOperation __binary_op, NT identity) const
void AllGatherVector(MPI_Comm &ColWorld, int trxlocnz, IU lenuntil, int32_t *&trxinds, NV *&trxnums, int32_t *&indacc, NV *&numacc, int &accnz, bool indexisvalue)
Definition: ParFriends.h:1099
#define TRTAGVALS
Definition: SpDefs.h:94
#define TRTAGM
Definition: SpDefs.h:90
#define mm_is_complex(typecode)
Definition: mmio.h:38
#define mm_is_symmetric(typecode)
Definition: mmio.h:43
char * mm_typecode_to_str(MM_typecode matcode)
int rank
void EWiseScale(const DenseParMat< IT, NT > &rhs)
Definition: SpParMat.cpp:2350
IT getnrow() const
Definition: SpParMat.cpp:685
#define TRTAGNZ
Definition: SpDefs.h:89