COMBINATORIAL_BLAS  1.6
SingleChildBFS.cpp
Go to the documentation of this file.
1 #define DETERMINISTIC
2 #include "CombBLAS/CombBLAS.h"
3 #include <mpi.h>
4 #include <sys/time.h>
5 #include <iostream>
6 #include <functional>
7 #include <algorithm>
8 #include <vector>
9 #include <string>
10 #include <sstream>
11 #ifdef THREADED
12  #ifndef _OPENMP
13  #define _OPENMP
14  #endif
15 #endif
16 
17 #ifdef _OPENMP
18  #include <omp.h>
19 #endif
20 
21 
27 #ifdef _OPENMP
28 int cblas_splits = omp_get_max_threads();
29 #else
30 int cblas_splits = 1;
31 #endif
32 
33 #define ITERS 16
34 #define EDGEFACTOR 16
35 using namespace std;
36 using namespace combblas;
37 
38 // 64-bit floor(log2(x)) function
39 // note: least significant bit is the "zeroth" bit
40 // pre: v > 0
41 unsigned int highestbitset(uint64_t v)
42 {
43  // b in binary is {10,1100, 11110000, 1111111100000000 ...}
44  const uint64_t b[] = {0x2ULL, 0xCULL, 0xF0ULL, 0xFF00ULL, 0xFFFF0000ULL, 0xFFFFFFFF00000000ULL};
45  const unsigned int S[] = {1, 2, 4, 8, 16, 32};
46  int i;
47 
48  unsigned int r = 0; // result of log2(v) will go here
49  for (i = 5; i >= 0; i--)
50  {
51  if (v & b[i]) // highestbitset is on the left half (i.e. v > S[i] for sure)
52  {
53  v >>= S[i];
54  r |= S[i];
55  }
56  }
57  return r;
58 }
59 
60 template <class T>
61 bool from_string(T & t, const string& s, std::ios_base& (*f)(std::ios_base&))
62 {
63  istringstream iss(s);
64  return !(iss >> f >> t).fail();
65 }
66 
67 
68 template <typename PARMAT>
69 void Symmetricize(PARMAT & A)
70 {
71  // boolean addition is practically a "logical or"
72  // therefore this doesn't destruct any links
73  PARMAT AT = A;
74  AT.Transpose();
75  A += AT;
76 }
77 
82 struct prunediscovered: public std::binary_function<int64_t, int64_t, int64_t >
83 {
84  int64_t operator()(int64_t x, const int64_t & y) const
85  {
86  return ( y == -1 ) ? x: -1;
87  }
88 };
89 
90 static void MPI_randuniq(void * invec, void * inoutvec, int * len, MPI_Datatype *datatype)
91 {
93  int64_t * inveccast = (int64_t *) invec;
94  int64_t * inoutveccast = (int64_t *) inoutvec;
95  for (int i=0; i<*len; i++ )
96  inoutveccast[i] = RR(inveccast[i], inoutveccast[i]);
97 }
98 
99 int main(int argc, char* argv[])
100 {
101  int nprocs, myrank;
102  MPI_Init(&argc, &argv);
103  MPI_Comm_size(MPI_COMM_WORLD,&nprocs);
104  MPI_Comm_rank(MPI_COMM_WORLD,&myrank);
105  if(argc < 2)
106  {
107  if(myrank == 0)
108  {
109  cout << "Usage: ./scbfs <Scale>" << endl;
110  cout << "Example: mpirun -np 4 ./scbfs 20" << endl;
111  }
112  MPI_Finalize();
113  return -1;
114  }
115  {
118  typedef SpParMat < int64_t, bool, SpDCCols<int32_t,bool> > PSpMat_s32p64; // sequentially use 32-bits for local matrices, but parallel semantics are 64-bits
119  typedef SpParMat < int64_t, int, SpDCCols<int32_t,int> > PSpMat_s32p64_Int; // similarly mixed, but holds integers as upposed to booleans
121 
122  // Declare objects
123  PSpMat_Bool A;
124  PSpMat_s32p64 Aeff;
125  FullyDistVec<int64_t, int64_t> degrees; // degrees of vertices (including multi-edges and self-loops)
126  FullyDistVec<int64_t, int64_t> nonisov; // id's of non-isolated (connected) vertices
127  unsigned scale;
128  OptBuf<int32_t, int64_t> optbuf; // let indices be 32-bits
129 
130  scale = static_cast<unsigned>(atoi(argv[1]));
131  ostringstream outs;
132  outs << "Forcing scale to : " << scale << endl;
133 
134  // this is an undirected graph, so A*x does indeed BFS
135  double initiator[4] = {.57, .19, .19, .05};
136 
137  double t01 = MPI_Wtime();
138  double t02;
140  DEL->GenGraph500Data(initiator, scale, EDGEFACTOR, true, true ); // generate packed edges
141  SpParHelper::Print("Generated renamed edge lists\n");
142  t02 = MPI_Wtime();
143  ostringstream tinfo;
144  tinfo << "Generation took " << t02-t01 << " seconds" << endl;
145  SpParHelper::Print(tinfo.str());
146 
147 
148  // Start Kernel #1
149  MPI_Barrier(MPI_COMM_WORLD);
150  double t1 = MPI_Wtime();
151 
152  // conversion from distributed edge list, keeps self-loops, sums duplicates
153  PSpMat_s32p64_Int * G = new PSpMat_s32p64_Int(*DEL, false);
154  delete DEL; // free memory before symmetricizing
155  SpParHelper::Print("Created Sparse Matrix (with int32 local indices and values)\n");
156 
157  MPI_Barrier(MPI_COMM_WORLD);
158  double redts = MPI_Wtime();
159  G->Reduce(degrees, Row, plus<int64_t>(), static_cast<int64_t>(0)); // Identity is 0
160  MPI_Barrier(MPI_COMM_WORLD);
161  double redtf = MPI_Wtime();
162 
163  ostringstream redtimeinfo;
164  redtimeinfo << "Calculated degrees in " << redtf-redts << " seconds" << endl;
165  SpParHelper::Print(redtimeinfo.str());
166  A = PSpMat_Bool(*G); // Convert to Boolean
167  delete G;
168  int64_t removed = A.RemoveLoops();
169 
170  ostringstream loopinfo;
171  loopinfo << "Converted to Boolean and removed " << removed << " loops" << endl;
172  SpParHelper::Print(loopinfo.str());
173  A.PrintInfo();
174 
175  FullyDistVec<int64_t, int64_t> * ColSums = new FullyDistVec<int64_t, int64_t>(A.getcommgrid());
176  FullyDistVec<int64_t, int64_t> * RowSums = new FullyDistVec<int64_t, int64_t>(A.getcommgrid());
177  A.Reduce(*ColSums, Column, plus<int64_t>(), static_cast<int64_t>(0));
178  A.Reduce(*RowSums, Row, plus<int64_t>(), static_cast<int64_t>(0));
179  SpParHelper::Print("Reductions done\n");
180  ColSums->EWiseApply(*RowSums, plus<int64_t>());
181  delete RowSums;
182  SpParHelper::Print("Intersection of colsums and rowsums found\n");
183 
184  nonisov = ColSums->FindInds(bind2nd(greater<int64_t>(), 0)); // only the indices of non-isolated vertices
185  delete ColSums;
186 
187  SpParHelper::Print("Found (and permuted) non-isolated vertices\n");
188  nonisov.RandPerm(); // so that A(v,v) is load-balanced (both memory and time wise)
189  A.PrintInfo();
190  #ifndef NOPERMUTE
191  A(nonisov, nonisov, true); // in-place permute to save memory
192  SpParHelper::Print("Dropped isolated vertices from input\n");
193  A.PrintInfo();
194  #endif
195 
196  Aeff = PSpMat_s32p64(A); // Convert to 32-bit local integers
197  A.FreeMemory();
198  Symmetricize(Aeff); // A += A';
199  SpParHelper::Print("Symmetricized\n");
200 
201  Aeff.OptimizeForGraph500(optbuf); // Should be called before threading is activated
202  #ifdef THREADED
203  ostringstream tinfo;
204  tinfo << "Threading activated with " << cblas_splits << " threads" << endl;
205  SpParHelper::Print(tinfo.str());
207  #endif
208  Aeff.PrintInfo();
209 
210  MPI_Barrier(MPI_COMM_WORLD);
211  double t2=MPI_Wtime();
212 
213  ostringstream k1timeinfo;
214  k1timeinfo << (t2-t1) - (redtf-redts) << " seconds elapsed for Kernel #1" << endl;
215  SpParHelper::Print(k1timeinfo.str());
216 
217  Aeff.PrintInfo();
218  float balance = Aeff.LoadImbalance();
219  ostringstream outs2;
220  outs2 << "Load balance: " << balance << endl;
221  SpParHelper::Print(outs2.str());
222 
223  MPI_Barrier(MPI_COMM_WORLD);
224 
225  // Now that every remaining vertex is non-isolated, randomly pick ITERS many of them as starting vertices
226  #ifndef NOPERMUTE
227  degrees = degrees(nonisov); // fix the degrees array too
228  degrees.PrintInfo("Degrees array");
229  #endif
230  // degrees.DebugPrint();
232  double nver = (double) degrees.TotalLength();
233 
234 #ifdef DETERMINISTIC
235  MTRand M(1);
236 #else
237  MTRand M; // generate random numbers with Mersenne Twister
238 #endif
239  vector<double> loccands(ITERS);
240  vector<int64_t> loccandints(ITERS);
241  if(myrank == 0)
242  {
243  for(int i=0; i<ITERS; ++i)
244  loccands[i] = M.rand();
245  copy(loccands.begin(), loccands.end(), ostream_iterator<double>(cout," ")); cout << endl;
246  transform(loccands.begin(), loccands.end(), loccands.begin(), bind2nd( multiplies<double>(), nver ));
247 
248  for(int i=0; i<ITERS; ++i)
249  loccandints[i] = static_cast<int64_t>(loccands[i]);
250  copy(loccandints.begin(), loccandints.end(), ostream_iterator<double>(cout," ")); cout << endl;
251  }
252  MPI_Bcast(&(loccandints[0]), ITERS, MPIType<int64_t>(),0,MPI_COMM_WORLD);
253  for(int i=0; i<ITERS; ++i)
254  Cands.SetElement(i,loccandints[i]);
255 
256  double MTEPS[ITERS]; double INVMTEPS[ITERS]; double TIMES[ITERS]; double EDGES[ITERS];
257  for(int i=0; i<ITERS; ++i)
258  {
259  // FullyDistVec ( shared_ptr<CommGrid> grid, IT globallen, NT initval);
260  FullyDistVec<int64_t, int64_t> parents ( Aeff.getcommgrid(), Aeff.getncol(), (int64_t) -1); // identity is -1
261 
262  // FullyDistSpVec ( shared_ptr<CommGrid> grid, IT glen);
263  FullyDistSpVec<int64_t, int64_t> fringe(Aeff.getcommgrid(), Aeff.getncol()); // numerical values are stored 0-based
264 
265  MPI_Barrier(MPI_COMM_WORLD);
266  double t1 = MPI_Wtime();
267 
268  fringe.SetElement(Cands[i], Cands[i]);
269  int iterations = 0;
270 
271  MPI_Op randreducempiop;
272  MPI_Op_create(MPI_randuniq, true, &randreducempiop);
273 
274  while(fringe.getnnz() > 0)
275  {
276  fringe.setNumToInd();
277  fringe = SpMV(Aeff, fringe,optbuf); // SpMV with sparse vector (with indexisvalue flag preset), optimization enabled
278  fringe = EWiseMult(fringe, parents, true, (int64_t) -1); // clean-up vertices that already has parents
279  fringe.PrintInfo("Frontier");
280  auto singlechild = fringe.Uniq(RandReduce<int64_t>(), randreducempiop);
281  singlechild.PrintInfo("Single child frontier");
282 
283  parents.Set(fringe);
284  iterations++;
285  }
286  MPI_Barrier(MPI_COMM_WORLD);
287  double t2 = MPI_Wtime();
288 
289  FullyDistSpVec<int64_t, int64_t> parentsp = parents.Find(bind2nd(greater<int64_t>(), -1));
290  parentsp.Apply(myset<int64_t>(1));
291 
292  // we use degrees on the directed graph, so that we don't count the reverse edges in the teps score
293  int64_t nedges = EWiseMult(parentsp, degrees, false, (int64_t) 0).Reduce(plus<int64_t>(), (int64_t) 0);
294 
295  ostringstream outnew;
296  outnew << i << "th starting vertex was " << Cands[i] << endl;
297  outnew << "Number iterations: " << iterations << endl;
298  outnew << "Number of vertices found: " << parentsp.Reduce(plus<int64_t>(), (int64_t) 0) << endl;
299  outnew << "Number of edges traversed: " << nedges << endl;
300  outnew << "BFS time: " << t2-t1 << " seconds" << endl;
301  outnew << "MTEPS: " << static_cast<double>(nedges) / (t2-t1) / 1000000.0 << endl;
302  outnew << "Total communication (average so far): " << (cblas_allgathertime + cblas_alltoalltime) / (i+1) << endl;
303  TIMES[i] = t2-t1;
304  EDGES[i] = nedges;
305  MTEPS[i] = static_cast<double>(nedges) / (t2-t1) / 1000000.0;
306  SpParHelper::Print(outnew.str());
307  }
308  SpParHelper::Print("Finished\n");
309  ostringstream os;
310 
311 
312  sort(EDGES, EDGES+ITERS);
313  os << "--------------------------" << endl;
314  os << "Min nedges: " << EDGES[0] << endl;
315  os << "First Quartile nedges: " << (EDGES[(ITERS/4)-1] + EDGES[ITERS/4])/2 << endl;
316  os << "Median nedges: " << (EDGES[(ITERS/2)-1] + EDGES[ITERS/2])/2 << endl;
317  os << "Third Quartile nedges: " << (EDGES[(3*ITERS/4) -1 ] + EDGES[3*ITERS/4])/2 << endl;
318  os << "Max nedges: " << EDGES[ITERS-1] << endl;
319  double mean = accumulate( EDGES, EDGES+ITERS, 0.0 )/ ITERS;
320  vector<double> zero_mean(ITERS); // find distances to the mean
321  transform(EDGES, EDGES+ITERS, zero_mean.begin(), bind2nd( minus<double>(), mean ));
322  // self inner-product is sum of sum of squares
323  double deviation = inner_product( zero_mean.begin(),zero_mean.end(), zero_mean.begin(), 0.0 );
324  deviation = sqrt( deviation / (ITERS-1) );
325  os << "Mean nedges: " << mean << endl;
326  os << "STDDEV nedges: " << deviation << endl;
327  os << "--------------------------" << endl;
328 
329  sort(TIMES,TIMES+ITERS);
330  os << "Min time: " << TIMES[0] << " seconds" << endl;
331  os << "First Quartile time: " << (TIMES[(ITERS/4)-1] + TIMES[ITERS/4])/2 << " seconds" << endl;
332  os << "Median time: " << (TIMES[(ITERS/2)-1] + TIMES[ITERS/2])/2 << " seconds" << endl;
333  os << "Third Quartile time: " << (TIMES[(3*ITERS/4)-1] + TIMES[3*ITERS/4])/2 << " seconds" << endl;
334  os << "Max time: " << TIMES[ITERS-1] << " seconds" << endl;
335  mean = accumulate( TIMES, TIMES+ITERS, 0.0 )/ ITERS;
336  transform(TIMES, TIMES+ITERS, zero_mean.begin(), bind2nd( minus<double>(), mean ));
337  deviation = inner_product( zero_mean.begin(),zero_mean.end(), zero_mean.begin(), 0.0 );
338  deviation = sqrt( deviation / (ITERS-1) );
339  os << "Mean time: " << mean << " seconds" << endl;
340  os << "STDDEV time: " << deviation << " seconds" << endl;
341  os << "--------------------------" << endl;
342 
343  sort(MTEPS, MTEPS+ITERS);
344  os << "Min MTEPS: " << MTEPS[0] << endl;
345  os << "First Quartile MTEPS: " << (MTEPS[(ITERS/4)-1] + MTEPS[ITERS/4])/2 << endl;
346  os << "Median MTEPS: " << (MTEPS[(ITERS/2)-1] + MTEPS[ITERS/2])/2 << endl;
347  os << "Third Quartile MTEPS: " << (MTEPS[(3*ITERS/4)-1] + MTEPS[3*ITERS/4])/2 << endl;
348  os << "Max MTEPS: " << MTEPS[ITERS-1] << endl;
349  transform(MTEPS, MTEPS+ITERS, INVMTEPS, safemultinv<double>()); // returns inf for zero teps
350  double hteps = static_cast<double>(ITERS) / accumulate(INVMTEPS, INVMTEPS+ITERS, 0.0);
351  os << "Harmonic mean of MTEPS: " << hteps << endl;
352  transform(INVMTEPS, INVMTEPS+ITERS, zero_mean.begin(), bind2nd(minus<double>(), 1/hteps));
353  deviation = inner_product( zero_mean.begin(),zero_mean.end(), zero_mean.begin(), 0.0 );
354  deviation = sqrt( deviation / (ITERS-1) ) * (hteps*hteps); // harmonic_std_dev
355  os << "Harmonic standard deviation of MTEPS: " << deviation << endl;
356  SpParHelper::Print(os.str());
357  }
358  MPI_Finalize();
359  return 0;
360 }
361 
FullyDistVec< IT, NT > Reduce(Dim dim, _BinaryOperation __binary_op, NT id, _UnaryOperation __unary_op) const
Definition: SpParMat.cpp:791
double rand()
std::shared_ptr< CommGrid > getcommgrid() const
Definition: SpParMat.h:275
void ActivateThreading(int numsplits)
Definition: SpParMat.cpp:2881
#define EDGEFACTOR
void SetElement(IT indx, NT numx)
MPI_Datatype MPIType< int64_t >(void)
Definition: MPIType.cpp:64
void Set(const FullyDistSpVec< IT, NT > &rhs)
void GenGraph500Data(double initiator[4], int log_numverts, int edgefactor, bool scramble=false, bool packed=false)
FullyDistVec< IT, IT > FindInds(_Predicate pred) const
Return the indices where pred is true.
SpParMat< int64_t, int, SpDCCols< int32_t, int > > PSpMat_s32p64_Int
Definition: SpMSpVBench.cpp:62
Dcsc< IU, typename promote_trait< NU1, NU2 >::T_promote > EWiseMult(const Dcsc< IU, NU1 > &A, const Dcsc< IU, NU2 > *B, bool exclude)
Definition: Friends.h:694
int cblas_splits
FullyDistSpVec< IT, NT > Find(_Predicate pred) const
Return the elements for which pred is true.
SelectMaxSRing< bool, int32_t > SR
Definition: SpMSpVBench.cpp:59
float LoadImbalance() const
Definition: SpParMat.cpp:665
void EWiseApply(const FullyDistVec< IT, NT2 > &other, _BinaryOperation __binary_op, _BinaryPredicate _do_op, const bool useExtendedBinOp)
void OptimizeForGraph500(OptBuf< LIT, OT > &optbuf)
Definition: SpParMat.cpp:2780
void Symmetricize(PARMAT &A)
With 50/50 chances, return a one of the operants.
Definition: Operations.h:185
double cblas_mergeconttime
NT Reduce(_BinaryOperation __binary_op, NT init) const
double A
int64_t operator()(int64_t x, const int64_t &y) const
double cblas_alltoalltime
bool from_string(T &t, const string &s, std::ios_base &(*f)(std::ios_base &))
long int64_t
Definition: compat.h:21
IT getncol() const
Definition: SpParMat.cpp:694
void PrintInfo() const
Definition: SpParMat.cpp:2387
void PrintInfo(std::string vectorname) const
SpParMat< int64_t, bool, SpDCCols< int64_t, bool > > PSpMat_Bool
Definition: FilteredMIS.cpp:93
double cblas_allgathertime
Definition: CCGrid.h:4
unsigned int highestbitset(uint64_t v)
SpParMat< int64_t, int64_t, SpDCCols< int64_t, int64_t > > PSpMat_Int64
NT Reduce(_BinaryOperation __binary_op, NT identity) const
#define ITERS
void Apply(_UnaryOperation __unary_op)
double cblas_localspmvtime
SpParMat< int64_t, bool, SpDCCols< int32_t, bool > > PSpMat_s32p64
int main(int argc, char *argv[])
double cblas_transvectime