COMBINATORIAL_BLAS  1.6
FilteredMIS.cpp
Go to the documentation of this file.
1 /****************************************************************/
2 /* Parallel Combinatorial BLAS Library (for Graph Computations) */
3 /* version 1.6 -------------------------------------------------*/
4 /* date: 6/15/2017 ---------------------------------------------*/
5 /* authors: Ariful Azad, Aydin Buluc --------------------------*/
6 /****************************************************************/
7 /*
8  Copyright (c) 2010-2017, The Regents of the University of California
9 
10  Permission is hereby granted, free of charge, to any person obtaining a copy
11  of this software and associated documentation files (the "Software"), to deal
12  in the Software without restriction, including without limitation the rights
13  to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14  copies of the Software, and to permit persons to whom the Software is
15  furnished to do so, subject to the following conditions:
16 
17  The above copyright notice and this permission notice shall be included in
18  all copies or substantial portions of the Software.
19 
20  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21  IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22  FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23  AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24  LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25  OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26  THE SOFTWARE.
27  */
28 
29 #define DETERMINISTIC
30 #include "CombBLAS/CombBLAS.h"
31 #include <mpi.h>
32 #include <sys/time.h>
33 #include <iostream>
34 #include <functional>
35 #include <algorithm>
36 #include <vector>
37 #include <string>
38 #include <sstream>
39 #ifdef THREADED
40  #ifndef _OPENMP
41  #define _OPENMP
42  #endif
43  #include <omp.h>
44 #endif
45 
46 
50 
51 #include "TwitterEdge.h"
52 
53 #define EDGEFACTOR 5 // For MIS
54 #define ITERS 16
55 #define PERCENTS 4 // testing with 4 different percentiles
56 
57 using namespace std;
58 using namespace combblas;
59 
60 
61 template <typename PARMAT>
62 void Symmetricize(PARMAT & A)
63 {
64  // boolean addition is practically a "logical or"
65  // therefore this doesn't destruct any links
66  PARMAT AT = A;
67  AT.Transpose();
68  A += AT;
69 }
70 
71 
72 struct DetSymmetricize: public std::binary_function<TwitterEdge, TwitterEdge, TwitterEdge>
73 {
74  // have to deterministically choose between one of the two values.
75  // cannot just add them because that will change the distribution (small values are unlikely to survive)
77  {
78  TwitterEdge toret = g;
79 
80  if(((g.latest + t.latest) & 1) == 1)
81  {
82  toret.latest = std::min(g.latest, t.latest);
83  }
84  else
85  {
86  toret.latest = std::max(g.latest, t.latest);
87  }
88  return toret;
89  }
90 };
91 
94 
96 {
97  PSpMat_Twitter AT = A;
98  AT.Transpose();
99  // SpParMat<IU,RETT,RETDER> EWiseApply (const SpParMat<IU,NU1,UDERA> & A,
100  // const SpParMat<IU,NU2,UDERB> & B, _BinaryOperation __binary_op, bool notB, const NU2& defaultBVal)
101  // Default B value is irrelevant since the structures of the matrices are
102  A = EWiseApply<TwitterEdge, SpDCCols<int64_t, TwitterEdge > >(A, AT, DetSymmetricize(), false, TwitterEdge());
103 }
104 
105 #ifdef DETERMINISTIC
106 MTRand GlobalMT(1);
107 #else
108 MTRand GlobalMT; // generate random numbers with Mersenne Twister
109 #endif
110 
111 
112 struct Twitter_obj_randomizer : public std::unary_function<TwitterEdge, TwitterEdge>
113 {
114  const TwitterEdge operator()(const TwitterEdge & x) const
115  {
116  short mycount = 1;
117  bool myfollow = 0;
118  time_t mylatest = static_cast<int64_t>(GlobalMT.rand() * 10000); // random.randrange(0,10000)
119 
120  return TwitterEdge(mycount, myfollow, mylatest);
121  }
122 };
123 
124 struct Twitter_materialize: public std::binary_function<TwitterEdge, time_t, bool>
125 {
126  bool operator()(const TwitterEdge & x, time_t sincedate) const
127  {
128  if(x.isRetwitter() && x.LastTweetBy(sincedate))
129  return false; // false if the edge is going to be kept
130  else
131  return true; // true if the edge is to be pruned
132  }
133 };
134 
135 // def rand( verc ):
136 // import random
137 // return random.random()
138 struct randGen : public std::unary_function<double, double>
139 {
140  const double operator()(const double & ignore)
141  {
142  return GlobalMT.rand();
143  }
144 };
145 
146 
147 int main(int argc, char* argv[])
148 {
149  int nprocs, myrank;
150 #ifdef _OPENMP
151  int cblas_splits = omp_get_max_threads();
152  int provided, flag, claimed;
153  MPI_Init_thread(&argc, &argv, MPI_THREAD_FUNNELED, &provided );
154  MPI_Is_thread_main( &flag );
155  if (!flag)
156  SpParHelper::Print("This thread called init_thread but Is_thread_main gave false\n");
157  MPI_Query_thread( &claimed );
158  if (claimed != provided)
159  SpParHelper::Print("Query thread gave different thread level than requested\n");
160 #else
161  MPI_Init(&argc, &argv);
162  int cblas_splits = 1;
163 #endif
164 
165  MPI_Comm_size(MPI_COMM_WORLD,&nprocs);
166  MPI_Comm_rank(MPI_COMM_WORLD,&myrank);
167  if(argc < 2)
168  {
169  if(myrank == 0)
170  {
171  cout << "Usage: ./FilteredMIS <Scale>" << endl;
172  }
173  MPI_Finalize();
174  return -1;
175  }
176  {
177  // Declare objects
179  shared_ptr<CommGrid> fullWorld;
180  fullWorld.reset( new CommGrid(MPI_COMM_WORLD, 0, 0) );
181  FullyDistVec<int64_t, int64_t> indegrees(fullWorld); // in-degrees of vertices (including multi-edges and self-loops)
182  FullyDistVec<int64_t, int64_t> oudegrees(fullWorld); // out-degrees of vertices (including multi-edges and self-loops)
183  FullyDistVec<int64_t, int64_t> degrees(fullWorld); // combined degrees of vertices (including multi-edges and self-loops)
184  PSpMat_Bool * ABool;
185 
186  SpParHelper::Print("Using synthetic data, which we ALWAYS permute for load balance\n");
187  SpParHelper::Print("We only balance the original input, we don't repermute after each filter change\n");
188  SpParHelper::Print("BFS is run on UNDIRECTED graph, hence hitting CCs, and TEPS is bidirectional\n");
189 
190  double initiator[4] = {.25, .25, .25, .25}; // creating erdos-renyi
191  double t01 = MPI_Wtime();
193 
194  unsigned scale = static_cast<unsigned>(atoi(argv[1]));
195  ostringstream outs;
196  outs << "Forcing scale to : " << scale << endl;
197  SpParHelper::Print(outs.str());
198 
199  // parameters: (double initiator[4], int log_numverts, int edgefactor, bool scramble, bool packed)
200  DEL->GenGraph500Data(initiator, scale, EDGEFACTOR, true, true ); // generate packed edges
201  SpParHelper::Print("Generated renamed edge lists\n");
202 
203  ABool = new PSpMat_Bool(*DEL, false);
204  int64_t removed = ABool->RemoveLoops();
205  ostringstream loopinfo;
206  loopinfo << "Converted to Boolean and removed " << removed << " loops" << endl;
207  SpParHelper::Print(loopinfo.str());
208  ABool->PrintInfo();
209  delete DEL; // free memory
210  A = PSpMat_Twitter(*ABool); // any upcasting generates the default object
211 
212  double t02 = MPI_Wtime();
213  ostringstream tinfo;
214  tinfo << "Generation took " << t02-t01 << " seconds" << endl;
215  SpParHelper::Print(tinfo.str());
216 
217  // indegrees is sum along rows because A is loaded as "tranposed", similarly oudegrees is sum along columns
218  ABool->PrintInfo();
219  ABool->Reduce(oudegrees, Column, plus<int64_t>(), static_cast<int64_t>(0));
220  ABool->Reduce(indegrees, Row, plus<int64_t>(), static_cast<int64_t>(0));
221 
222  // indegrees_filt and oudegrees_filt is used for the real data
223  FullyDistVec<int64_t, int64_t> indegrees_filt(fullWorld);
224  FullyDistVec<int64_t, int64_t> oudegrees_filt(fullWorld);
225 
226  typedef FullyDistVec<int64_t, int64_t> IntVec; // used for the synthetic data (symmetricized before randomization)
227  FullyDistVec<int64_t, int64_t> degrees_filt[4] = {IntVec(fullWorld), IntVec(fullWorld), IntVec(fullWorld), IntVec(fullWorld)};
228  int64_t keep[PERCENTS] = {100, 1000, 2500, 10000}; // ratio of edges kept in range (0, 10000)
229 
230  degrees = indegrees;
231  degrees.EWiseApply(oudegrees, plus<int64_t>());
232  SpParHelper::Print("All degrees calculated\n");
233  delete ABool;
234 
235  float balance = A.LoadImbalance();
236  ostringstream outlb;
237  outlb << "Load balance: " << balance << endl;
238  SpParHelper::Print(outlb.str());
239 
240  // We symmetricize before we apply the random generator
241  // Otherwise += will naturally add the random numbers together
242  // hence will create artificially high-permeable filters
243  Symmetricize(A); // A += A';
244  SpParHelper::Print("Symmetricized\n");
245 
247  A.PrintInfo();
249  SpParHelper::Print("Symmetricized Rands\n");
250  A.PrintInfo();
251 
252 
253  FullyDistVec<int64_t, int64_t> * nonisov = new FullyDistVec<int64_t, int64_t>(degrees.FindInds(bind2nd(greater<int64_t>(), 0)));
254  SpParHelper::Print("Found (and permuted) non-isolated vertices\n");
255  nonisov->RandPerm(); // so that A(v,v) is load-balanced (both memory and time wise)
256  A(*nonisov, *nonisov, true); // in-place permute to save memory
257  SpParHelper::Print("Dropped isolated vertices from input\n");
258 
259  indegrees = indegrees(*nonisov); // fix the degrees arrays too
260  oudegrees = oudegrees(*nonisov);
261  degrees = degrees(*nonisov);
262  delete nonisov;
263 
264  for (int i=0; i < PERCENTS; i++)
265  {
266  PSpMat_Twitter B = A;
267  B.Prune(bind2nd(Twitter_materialize(), keep[i]));
268  PSpMat_Bool BBool = B;
269  BBool.PrintInfo();
270  float balance = B.LoadImbalance();
271  ostringstream outs;
272  outs << "Load balance of " << static_cast<float>(keep[i])/100 << "% filtered case: " << balance << endl;
273  SpParHelper::Print(outs.str());
274 
275  // degrees_filt[i] is by-default generated as permuted
276  BBool.Reduce(degrees_filt[i], Column, plus<int64_t>(), static_cast<int64_t>(0)); // Column=Row since BBool is symmetric
277  }
278 
279  float balance_former = A.LoadImbalance();
280  ostringstream outs_former;
281  outs_former << "Load balance: " << balance_former << endl;
282  SpParHelper::Print(outs_former.str());
283 
284  for(int trials =0; trials < PERCENTS; trials++)
285  {
287  cblas_alltoalltime = 0;
288  double MISVS[ITERS]; // numbers of vertices in each MIS
289  double TIMES[ITERS];
290 
291  LatestRetwitterMIS::sincedate = keep[trials];
293  ostringstream outs;
294  outs << "Initializing since date (only once) to " << LatestRetwitterMIS::sincedate << endl;
295  SpParHelper::Print(outs.str());
296 
297  for(int sruns = 0; sruns < ITERS; ++sruns)
298  {
299  double t1 = MPI_Wtime();
300  int64_t nvert = A.getncol();
301 
302  //# the final result set. S[i] exists and is 1 if vertex i is in the MIS
303  //S = Vec(nvert, sparse=True)
305 
306  //# the candidate set. initially all vertices are candidates.
307  //# this vector doubles as 'r', the random value vector.
308  //# i.e. if C[i] exists, then i is a candidate. The value C[i] is i's r for this iteration.
309  //C = Vec.ones(nvert, sparse=True)
310  //FullyDistSpVec's length is not the same as its nnz
311  //Since FullyDistSpVec::Apply only affects nonzeros, nnz should be forced to glen
312  // FullyDistVec ( shared_ptr<CommGrid> grid, IT globallen, NT initval);
315  delete denseC;
316 
317  FullyDistSpVec<int64_t, double> min_neighbor_r ( A.getcommgrid(), nvert);
318  FullyDistSpVec<int64_t, uint8_t> new_S_members ( A.getcommgrid(), nvert);
319  FullyDistSpVec<int64_t, uint8_t> new_S_neighbors ( A.getcommgrid(), nvert);
320 
321  while (C.getnnz() > 0)
322  {
323  //# label each vertex in C with a random value
324  C.Apply(randGen());
325 
326  //# find the smallest random value among a vertex's neighbors
327  //# In other words: min_neighbor_r[i] = min(C[j] for all neighbors j of vertex i)
328  //min_neighbor_r = Gmatrix.SpMV(C, sr(myMin,select2nd)) # could use "min" directly
329  SpMV<LatestRetwitterMIS>(A, C, min_neighbor_r, false); // min_neighbor_r empty OK?
330  #ifdef PRINTITERS
331  min_neighbor_r.PrintInfo("Neighbors");
332  #endif
333 
334  #ifdef DEBUG
335  min_neighbor_r.DebugPrint();
336  C.DebugPrint();
337  #endif
338 
339  //# The vertices to be added to S this iteration are those whose random value is
340  //# smaller than those of all its neighbors:
341  //# new_S_members[i] exists if C[i] < min_neighbor_r[i]
342  //# If C[i] exists and min_neighbor_r[i] doesn't, still a value is returned with bin_op(NULL,C[i])
343  //new_S_members = min_neighbor_r.eWiseApply(C, return1, doOp=is2ndSmaller, allowANulls=True, allowBNulls=False, inPlace=False, ANull=2)
344  new_S_members = EWiseApply<uint8_t>(min_neighbor_r, C, return1_uint8(), is2ndSmaller(), true, false, (double) 2.0, (double) 2.0, true);
348  #ifdef PRINTITERS
349  new_S_members.PrintInfo("New members of the MIS");
350  #endif
351 
352  #ifdef DEBUG
353  new_S_members.DebugPrint();
354  #endif
355 
356  //# new_S_members are no longer candidates, so remove them from C
357  //C.eWiseApply(new_S_members, return1, allowANulls=False, allowIntersect=False, allowBNulls=True, inPlace=True)
358  C = EWiseApply<double>(C, new_S_members, return1_uint8(), return1_uint8(), false, true, (double) 0.0, (uint8_t) 0, false);
359  #ifdef PRINTITERS
360  C.PrintInfo("Entries to be removed from the Candidates set");
361  #endif
362 
363  //# find neighbors of new_S_members
364  //new_S_neighbors = Gmatrix.SpMV(new_S_members, sr(select2nd,select2nd))
365  SpMV<LatestRetwitterSelect2nd>(A, new_S_members, new_S_neighbors, false);
366 
367  //# remove neighbors of new_S_members from C, because they cannot be part of the MIS anymore
368  //# If C[i] exists and new_S_neighbors[i] doesn't, still a value is returned with bin_op(NULL,C[i])
369  //C.eWiseApply(new_S_neighbors, return1, allowANulls=False, allowIntersect=False, allowBNulls=True, inPlace=True)
370  C = EWiseApply<double>(C, new_S_neighbors, return1_uint8(), return1_uint8(), false, true, (double) 0.0, (uint8_t) 0, false);
371  #ifdef PRINTITERS
372  C.PrintInfo("Candidates set after neighbors of MIS removed");
373  #endif
374 
375  //# add new_S_members to S
376  //S.eWiseApply(new_S_members, return1, allowANulls=True, allowBNulls=True, inPlace=True)
377  S = EWiseApply<uint8_t>(S, new_S_members, return1_uint8(), return1_uint8(), true, true, (uint8_t) 1, (uint8_t) 1, true);
378  S.PrintInfo("The current MIS:");
379  }
380  double t2 = MPI_Wtime();
381 
382  ostringstream ositr;
383  ositr << "MIS has " << S.getnnz() << " vertices" << endl;
384  SpParHelper::Print(ositr.str());
385 
386  ostringstream ositr2;
387  ositr << "MIS time: " << t2-t1 << " seconds" << endl;
388  SpParHelper::Print(ositr.str());
389 
390  TIMES[sruns] = t2-t1;
391  MISVS[sruns] = S.getnnz();
392  } // end for(int sruns = 0; sruns < ITERS; ++sruns)
393 
394  ostringstream os;
395  os << "Per iteration communication times: " << endl;
396  os << "AllGatherv: " << cblas_allgathertime / ITERS << endl;
397  os << "AlltoAllv: " << cblas_alltoalltime / ITERS << endl;
398 
399  sort(MISVS, MISVS+ITERS);
400  os << "--------------------------" << endl;
401  os << "Min MIS vertices: " << MISVS[0] << endl;
402  os << "Median MIS vertices: " << (MISVS[(ITERS/2)-1] + MISVS[ITERS/2])/2 << endl;
403  os << "Max MIS vertices: " << MISVS[ITERS-1] << endl;
404  double mean = accumulate( MISVS, MISVS+ITERS, 0.0 )/ ITERS;
405  vector<double> zero_mean(ITERS); // find distances to the mean
406  transform(MISVS, MISVS+ITERS, zero_mean.begin(), bind2nd( minus<double>(), mean ));
407  // self inner-product is sum of sum of squares
408  double deviation = inner_product( zero_mean.begin(),zero_mean.end(), zero_mean.begin(), 0.0 );
409  deviation = sqrt( deviation / (ITERS-1) );
410  os << "Mean MIS vertices: " << mean << endl;
411  os << "STDDEV MIS vertices: " << deviation << endl;
412  os << "--------------------------" << endl;
413 
414  sort(TIMES,TIMES+ITERS);
415  os << "Filter keeps " << static_cast<double>(keep[trials])/100.0 << " percentage of edges" << endl;
416  os << "Min time: " << TIMES[0] << " seconds" << endl;
417  os << "Median time: " << (TIMES[(ITERS/2)-1] + TIMES[ITERS/2])/2 << " seconds" << endl;
418  os << "Max time: " << TIMES[ITERS-1] << " seconds" << endl;
419  mean = accumulate( TIMES, TIMES+ITERS, 0.0 )/ ITERS;
420  transform(TIMES, TIMES+ITERS, zero_mean.begin(), bind2nd( minus<double>(), mean ));
421  deviation = inner_product( zero_mean.begin(),zero_mean.end(), zero_mean.begin(), 0.0 );
422  deviation = sqrt( deviation / (ITERS-1) );
423  os << "Mean time: " << mean << " seconds" << endl;
424  os << "STDDEV time: " << deviation << " seconds" << endl;
425  os << "--------------------------" << endl;
426  SpParHelper::Print(os.str());
427  } // end for(int trials =0; trials < PERCENTS; trials++)
428  }
429  MPI_Finalize();
430  return 0;
431 }
432 
double B
double cblas_alltoalltime
Definition: FilteredMIS.cpp:47
#define PERCENTS
Definition: FilteredMIS.cpp:55
FullyDistVec< IT, NT > Reduce(Dim dim, _BinaryOperation __binary_op, NT id, _UnaryOperation __unary_op) const
Definition: SpParMat.cpp:791
double rand()
std::shared_ptr< CommGrid > getcommgrid() const
Definition: SpParMat.h:275
static time_t sincedate
Definition: TwitterEdge.h:446
void Symmetricize(PARMAT &A)
Definition: FilteredMIS.cpp:62
void Apply(_UnaryOperation __unary_op)
Definition: SpParMat.h:145
void GenGraph500Data(double initiator[4], int log_numverts, int edgefactor, bool scramble=false, bool packed=false)
TwitterEdge operator()(const TwitterEdge &g, const TwitterEdge &t)
Definition: FilteredMIS.cpp:76
FullyDistVec< IT, IT > FindInds(_Predicate pred) const
Return the indices where pred is true.
bool isRetwitter() const
Definition: TwitterEdge.h:24
float LoadImbalance() const
Definition: SpParMat.cpp:665
SpParMat< int64_t, TwitterEdge, SpDCCols< int64_t, TwitterEdge > > PSpMat_Twitter
Definition: FilteredMIS.cpp:92
void EWiseApply(const FullyDistVec< IT, NT2 > &other, _BinaryOperation __binary_op, _BinaryPredicate _do_op, const bool useExtendedBinOp)
void SymmetricizeRands(PSpMat_Twitter &A)
Definition: FilteredMIS.cpp:95
#define ITERS
Definition: FilteredMIS.cpp:54
int cblas_splits
Definition: FilteredMIS.cpp:49
double A
int main(int argc, char *argv[])
bool operator()(const TwitterEdge &x, time_t sincedate) const
long int64_t
Definition: compat.h:21
IT getncol() const
Definition: SpParMat.cpp:694
void PrintInfo() const
Definition: SpParMat.cpp:2387
bool LastTweetBy(time_t end) const
Definition: TwitterEdge.h:27
double C
const TwitterEdge operator()(const TwitterEdge &x) const
SpParMat< int64_t, bool, SpDCCols< int64_t, bool > > PSpMat_Bool
Definition: FilteredMIS.cpp:93
Definition: CCGrid.h:4
double cblas_allgathertime
Definition: FilteredMIS.cpp:48
void Apply(_UnaryOperation __unary_op)
#define EDGEFACTOR
Definition: FilteredMIS.cpp:53
MTRand GlobalMT(1)
const double operator()(const double &ignore)
static time_t sincedate
Definition: TwitterEdge.h:406
SpParMat< IT, NT, DER > Prune(_UnaryOperation __unary_op, bool inPlace=true)
Definition: SpParMat.h:175