COMBINATORIAL_BLAS  1.6
FilteredBFS.cpp
Go to the documentation of this file.
1 /****************************************************************/
2 /* Parallel Combinatorial BLAS Library (for Graph Computations) */
3 /* version 1.6 -------------------------------------------------*/
4 /* date: 6/15/2017 ---------------------------------------------*/
5 /* authors: Ariful Azad, Aydin Buluc --------------------------*/
6 /****************************************************************/
7 /*
8  Copyright (c) 2010-2017, The Regents of the University of California
9 
10  Permission is hereby granted, free of charge, to any person obtaining a copy
11  of this software and associated documentation files (the "Software"), to deal
12  in the Software without restriction, including without limitation the rights
13  to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14  copies of the Software, and to permit persons to whom the Software is
15  furnished to do so, subject to the following conditions:
16 
17  The above copyright notice and this permission notice shall be included in
18  all copies or substantial portions of the Software.
19 
20  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21  IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22  FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23  AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24  LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25  OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26  THE SOFTWARE.
27  */
28 
29 #define DETERMINISTIC
30 #include "CombBLAS/CombBLAS.h"
31 #include <mpi.h>
32 #include <sys/time.h>
33 #include <iostream>
34 #include <functional>
35 #include <algorithm>
36 #include <vector>
37 #include <string>
38 #include <sstream>
39 #ifdef THREADED
40  #ifndef _OPENMP
41  #define _OPENMP
42  #endif
43  #include <omp.h>
44 #endif
45 
46 #ifdef USE_PAPI
47  #include <papi.h>
48  #include "papi_combblas_global.h"
49 #endif
50 
51 
52 /* Global variables for timing */
53 
57 
58 /* End global variables */
59 
60 
61 #include "TwitterEdge.h"
62 
63 #define MAX_ITERS 20000
64 #define EDGEFACTOR 16
65 #define ITERS 16
66 #define CC_LIMIT 100
67 #define PERCENTS 4 // testing with 4 different percentiles
68 #define MINRUNS 4
69 //#define ONLYTIME // don't calculate TEPS
70 
71 using namespace std;
72 using namespace combblas;
73 
74 
75 template <typename PARMAT>
76 void Symmetricize(PARMAT & A)
77 {
78  // boolean addition is practically a "logical or"
79  // therefore this doesn't destruct any links
80  PARMAT AT = A;
81  AT.Transpose();
82  A += AT;
83 }
84 
85 
86 #ifdef DETERMINISTIC
87 MTRand GlobalMT(1);
88 #else
89 MTRand GlobalMT; // generate random numbers with Mersenne Twister
90 #endif
91 
92 
93 struct Twitter_obj_randomizer : public std::unary_function<TwitterEdge, TwitterEdge>
94 {
95  const TwitterEdge operator()(const TwitterEdge & x) const
96  {
97  short mycount = 1;
98  bool myfollow = 0;
99  time_t mylatest = static_cast<int64_t>(GlobalMT.rand() * 10000); // random.randrange(0,10000)
100 
101  return TwitterEdge(mycount, myfollow, mylatest);
102  }
103 };
104 
105 struct Twitter_materialize: public std::binary_function<TwitterEdge, time_t, bool>
106 {
107  bool operator()(const TwitterEdge & x, time_t sincedate) const
108  {
109  if(x.isRetwitter() && x.LastTweetBy(sincedate))
110  return false; // false if the edge is going to be kept
111  else
112  return true; // true if the edge is to be pruned
113  }
114 };
115 
116 #ifdef USE_PAPI
117 void CheckPAPI(int errorcode, char [] errorstring)
118 {
119  if (errorcode != PAPI_OK)
120  {
121  PAPI_perror(errorcode, errorstring, PAPI_MAX_STR_LEN);
122  fprintf(stderr, "PAPI error (%d): %s\n", errorcode, errorstring);
123  }
124 }
125 #endif
126 
127 
128 
129 int main(int argc, char* argv[])
130 {
131  int nprocs, myrank;
132 #ifdef _OPENMP
133  int cblas_splits = omp_get_max_threads();
134  int provided, flag, claimed;
135  MPI_Init_thread(&argc, &argv, MPI_THREAD_FUNNELED, &provided );
136  MPI_Is_thread_main( &flag );
137  if (!flag)
138  SpParHelper::Print("This thread called init_thread but Is_thread_main gave false\n");
139  MPI_Query_thread( &claimed );
140  if (claimed != provided)
141  SpParHelper::Print("Query thread gave different thread level than requested\n");
142 #else
143  MPI_Init(&argc, &argv);
144  int cblas_splits = 1;
145 #endif
146 
147  MPI_Comm_size(MPI_COMM_WORLD,&nprocs);
148  MPI_Comm_rank(MPI_COMM_WORLD,&myrank);
149  int MAXTRIALS;
150  int retval;
151 
152 #ifdef USE_PAPI
153  /* Initialize the PAPI library */
154  retval = PAPI_library_init(PAPI_VER_CURRENT);
155  if (retval != PAPI_VER_CURRENT && retval > 0)
156  {
157  fprintf(stderr,"PAPI library version mismatch!\en");
158  exit(1);
159  }
160  retval = PAPI_is_initialized();
161  if (retval != PAPI_LOW_LEVEL_INITED)
162  cout << "Not initialized" << endl;
163 #endif
164 
165  if(argc < 3)
166  {
167  if(myrank == 0)
168  {
169  cout << "Usage: ./FilteredBFS <File, Gen> <Input Name | Scale> (Optional: Double)" << endl;
170  cout << "Example: ./FilteredBFS File twitter_small.txt Double" << endl;
171  }
172  MPI_Finalize();
173  return -1;
174  }
175  {
178  shared_ptr<CommGrid> fullWorld;
179  fullWorld.reset( new CommGrid(MPI_COMM_WORLD, 0, 0) );
180 
181  // Declare objects
182  PSpMat_Twitter A(fullWorld);
183  FullyDistVec<int64_t, int64_t> indegrees(fullWorld); // in-degrees of vertices (including multi-edges and self-loops)
184  FullyDistVec<int64_t, int64_t> oudegrees(fullWorld); // out-degrees of vertices (including multi-edges and self-loops)
185  FullyDistVec<int64_t, int64_t> degrees(fullWorld); // combined degrees of vertices (including multi-edges and self-loops)
186  PSpMat_Bool * ABool;
187 
188  double t01 = MPI_Wtime();
189  if(string(argv[1]) == string("File")) // text|binary input option
190  {
191  SpParHelper::Print("Using real data, which we NEVER permute for load balance, also leaving isolated vertices as-is, if any\n");
192  SpParHelper::Print("This is because the input is assumed to be load balanced\n");
193  SpParHelper::Print("BFS is run on DIRECTED graph, hence hitting SCCs, and TEPS is unidirectional\n");
194 
195  // ReadDistribute (const string & filename, int master, bool nonum, HANDLER handler, bool transpose, bool pario)
196  // if nonum is true, then numerics are not supplied and they are assumed to be all 1's
197  A.ReadDistribute(string(argv[2]), 0, false, TwitterReadSaveHandler<int64_t>(), true, true); // read it from file (and transpose on the fly)
198 
199  A.PrintInfo();
200  SpParHelper::Print("Read input\n");
201 
202  ABool = new PSpMat_Bool(A);
203  if(argc == 4 && string(argv[3]) == string("Double"))
204  {
205  MAXTRIALS = 2;
206  }
207  else
208  {
209  MAXTRIALS = 1;
210  }
211  }
212  else if(string(argv[1]) == string("Gen"))
213  {
214  SpParHelper::Print("Using synthetic data, which we ALWAYS permute for load balance\n");
215  SpParHelper::Print("We only balance the original input, we don't repermute after each filter change\n");
216  SpParHelper::Print("BFS is run on UNDIRECTED graph, hence hitting CCs, and TEPS is bidirectional\n");
217 
218  double initiator[4] = {.57, .19, .19, .05};
219  double t01 = MPI_Wtime();
220  double t02;
222 
223  unsigned scale = static_cast<unsigned>(atoi(argv[2]));
224  ostringstream outs;
225  outs << "Forcing scale to : " << scale << endl;
226  SpParHelper::Print(outs.str());
227 
228  // parameters: (double initiator[4], int log_numverts, int edgefactor, bool scramble, bool packed)
229  DEL->GenGraph500Data(initiator, scale, EDGEFACTOR, true, true ); // generate packed edges
230  SpParHelper::Print("Generated renamed edge lists\n");
231 
232  ABool = new PSpMat_Bool(*DEL, false);
233  int64_t removed = ABool->RemoveLoops();
234  ostringstream loopinfo;
235  loopinfo << "Converted to Boolean and removed " << removed << " loops" << endl;
236  SpParHelper::Print(loopinfo.str());
237  ABool->PrintInfo();
238  delete DEL; // free memory
239  A = PSpMat_Twitter(*ABool); // any upcasting generates the default object
240 
241  MAXTRIALS = PERCENTS; // benchmarking
242  }
243  else
244  {
245  SpParHelper::Print("Not supported yet\n");
246  return 0;
247  }
248  double t02 = MPI_Wtime();
249  ostringstream tinfo;
250  tinfo << "I/O (or generation) took " << t02-t01 << " seconds" << endl;
251  SpParHelper::Print(tinfo.str());
252 
253  // indegrees is sum along rows because A is loaded as "tranposed", similarly oudegrees is sum along columns
254  ABool->PrintInfo();
255  ABool->Reduce(oudegrees, Column, plus<int64_t>(), static_cast<int64_t>(0));
256  ABool->Reduce(indegrees, Row, plus<int64_t>(), static_cast<int64_t>(0));
257 
258  // indegrees_filt and oudegrees_filt is used for the real data
259  FullyDistVec<int64_t, int64_t> indegrees_filt(fullWorld);
260  FullyDistVec<int64_t, int64_t> oudegrees_filt(fullWorld);
261 
262  typedef FullyDistVec<int64_t, int64_t> IntVec; // used for the synthetic data (symmetricized before randomization)
263  FullyDistVec<int64_t, int64_t> degrees_filt[4] = {IntVec(fullWorld), IntVec(fullWorld), IntVec(fullWorld), IntVec(fullWorld)};
264  int64_t keep[PERCENTS] = {100, 1000, 2500, 10000}; // ratio of edges kept in range (0, 10000)
265 
266  if(string(argv[1]) == string("File")) // if using synthetic data, no notion of out/in degrees after randomization exist
267  {
268  struct tm timeinfo;
269  memset(&timeinfo, 0, sizeof(struct tm));
270  int year, month, day, hour, min, sec;
271  year = 2009; month = 7; day = 1;
272  hour = 0; min = 0; sec = 0;
273 
274  timeinfo.tm_year = year - 1900; // year is "years since 1900"
275  timeinfo.tm_mon = month - 1 ; // month is in range 0...11
276  timeinfo.tm_mday = day; // range 1...31
277  timeinfo.tm_hour = hour; // range 0...23
278  timeinfo.tm_min = min; // range 0...59
279  timeinfo.tm_sec = sec; // range 0.
280  time_t mysincedate = timegm(&timeinfo);
281 
282  PSpMat_Twitter B = A;
283  B.Prune(bind2nd(Twitter_materialize(), mysincedate));
284  PSpMat_Bool BBool = B;
285  BBool.PrintInfo();
286  BBool.Reduce(oudegrees_filt, Column, plus<int64_t>(), static_cast<int64_t>(0));
287  BBool.Reduce(indegrees_filt, Row, plus<int64_t>(), static_cast<int64_t>(0));
288  }
289 
290  degrees = indegrees;
291  degrees.EWiseApply(oudegrees, plus<int64_t>());
292  SpParHelper::Print("All degrees calculated\n");
293  delete ABool;
294 
295  float balance = A.LoadImbalance();
296  ostringstream outs;
297  outs << "Load balance: " << balance << endl;
298  SpParHelper::Print(outs.str());
299 
300  if(string(argv[1]) == string("Gen"))
301  {
302  // We symmetricize before we apply the random generator
303  // Otherwise += will naturally add the random numbers together
304  // hence will create artificially high-permeable filters
305  Symmetricize(A); // A += A';
306  SpParHelper::Print("Symmetricized\n");
307 
309  A.PrintInfo();
310 
311  FullyDistVec<int64_t, int64_t> * nonisov = new FullyDistVec<int64_t, int64_t>(degrees.FindInds(bind2nd(greater<int64_t>(), 0)));
312  SpParHelper::Print("Found (and permuted) non-isolated vertices\n");
313  nonisov->RandPerm(); // so that A(v,v) is load-balanced (both memory and time wise)
314  A(*nonisov, *nonisov, true); // in-place permute to save memory
315  SpParHelper::Print("Dropped isolated vertices from input\n");
316 
317  indegrees = indegrees(*nonisov); // fix the degrees arrays too
318  oudegrees = oudegrees(*nonisov);
319  degrees = degrees(*nonisov);
320  delete nonisov;
321 
322  for (int i=0; i < PERCENTS; i++)
323  {
324  PSpMat_Twitter B = A;
325  B.Prune(bind2nd(Twitter_materialize(), keep[i]));
326  PSpMat_Bool BBool = B;
327  BBool.PrintInfo();
328  float balance = B.LoadImbalance();
329  ostringstream outs;
330  outs << "Load balance of " << static_cast<float>(keep[i])/100 << "% filtered case: " << balance << endl;
331  SpParHelper::Print(outs.str());
332 
333  // degrees_filt[i] is by-default generated as permuted
334  BBool.Reduce(degrees_filt[i], Column, plus<int64_t>(), static_cast<int64_t>(0)); // Column=Row since BBool is symmetric
335  }
336  }
337  // real data is pre-balanced, because otherwise even the load balancing routine crushes due to load imbalance
338 
339  float balance_former = A.LoadImbalance();
340  ostringstream outs_former;
341  outs_former << "Load balance: " << balance_former << endl;
342  SpParHelper::Print(outs_former.str());
343 
344  MPI_Barrier(MPI_COMM_WORLD);
345  double t1 = MPI_Wtime();
346 
348  double nver = (double) degrees.TotalLength();
349  Cands.SelectCandidates(nver);
350 
351  for(int trials =0; trials < MAXTRIALS; trials++)
352  {
353  if(string(argv[1]) == string("Gen"))
354  {
355  LatestRetwitterBFS::sincedate = keep[trials];
356  ostringstream outs;
357  outs << "Initializing since date (only once) to " << LatestRetwitterBFS::sincedate << endl;
358  SpParHelper::Print(outs.str());
359  }
360 
362  cblas_alltoalltime = 0;
363 
364  double MTEPS[ITERS]; double INVMTEPS[ITERS]; double TIMES[ITERS]; double EDGES[ITERS];
365  double MPEPS[ITERS]; double INVMPEPS[ITERS];
366  int sruns = 0; // successful runs
367  for(int i=0; i<MAX_ITERS && sruns < ITERS; ++i)
368  {
369 
370  // FullyDistVec ( shared_ptr<CommGrid> grid, IT globallen, NT initval);
372 
373  // FullyDistSpVec ( shared_ptr<CommGrid> grid, IT glen);
375 
376  MPI_Barrier(MPI_COMM_WORLD);
377  double t1 = MPI_Wtime();
378 
379  #ifdef USE_PAPI
380  char errorstring[PAPI_MAX_STR_LEN+1];
381  long long ptr2values[combblas_papi_num_events];
382  #endif
383 
384  fringe.SetElement(Cands[i], Cands[i]);
385  parents.SetElement(Cands[i], ParentType(Cands[i])); // make root discovered
386  int iterations = 0;
387  while(fringe.getnnz() > 0)
388  {
389  #ifdef USE_PAPI
390  vector< vector<long long> > papi_this_iterate(num_bfs_papi_labels);
391  #endif
392  fringe.ApplyInd(NumSetter);
393 
394  #ifdef USE_PAPI
395  int errorcode = PAPI_start_counters(combblas_papi_events, combblas_papi_num_events);
396  CheckPAPI(errorcode, errorstring);
397  long_long papi_t_sta = PAPI_get_real_usec();
398  #endif
399 
400  // SpMV with sparse vector, optimizations disabled for generality
401  SpMV<LatestRetwitterBFS>(A, fringe, fringe, false);
402 
403  #ifdef USE_PAPI
404  lond_long papi_t_end = PAPI_get_real_usec();
405  errorcode = PAPI_read_counters(ptr2values, combblas_papi_num_events);
406  CheckPAPI(errorcode, errorstring);
407 
408  errorcode = PAPI_stop_counters(ptr2values, combblas_papi_num_events);
409  papi_this_iterate[bfs_papi_enum.SpMV].resize(combblas_papi_num_events+1); // +1 for the time
410  for(int k=0; k<combblas_papi_num_events; ++k)
411  {
412  papi_this_iterate[bfs_papi_enum.SpMV][k] = ptr2values[k];
413  }
414  papi_this_iterate[bfs_papi_enum.SpMV][combblas_papi_num_events] = papi_t_end - papi_s_end; // time in usec
415 
416  PAPI_start_counters(combblas_papi_events, combblas_papi_num_events);
417  CheckPAPI(errorcode, errorstring);
418  papi_t_sta = PAPI_get_real_usec();
419  #endif
420 
421  // EWiseApply (const FullyDistSpVec<IU,NU1> & V, const FullyDistVec<IU,NU2> & W,
422  // _BinaryOperation _binary_op, _BinaryPredicate _doOp, bool allowVNulls, NU1 Vzero)
423  fringe = EWiseApply<ParentType>(fringe, parents, getfringe(), keepinfrontier_f(), true, ParentType());
424 
425  #ifdef USE_PAPI
426  papi_t_end = PAPI_get_real_usec();
427  errorcode = PAPI_read_counters(ptr2values, combblas_papi_num_events);
428  CheckPAPI(errorcode, errorstring);
429 
430  errorcode = PAPI_stop_counters(ptr2values, combblas_papi_num_events);
431  papi_this_iterate[bfs_papi_enum.fringe_updt].resize(combblas_papi_num_events+1); // +1 for the time
432  for(int k=0; k<combblas_papi_num_events; ++k)
433  {
434  papi_this_iterate[bfs_papi_enum.fringe_updt][k] = ptr2values[k];
435  }
436  papi_this_iterate[bfs_papi_enum.fringe_updt][combblas_papi_num_events] = papi_t_end - papi_s_end; // time in usec
437 
438  PAPI_start_counters(combblas_papi_events, combblas_papi_num_events);
439  CheckPAPI(errorcode, errorstring);
440  papi_t_sta = PAPI_get_real_usec();
441  #endif
442 
443  parents += fringe; // ABAB: Convert this to EwiseApply for compliance in PAPI timings
444 
445  #ifdef USE_PAPI
446  papi_t_end = PAPI_get_real_usec();
447  errorcode = PAPI_read_counters(ptr2values, combblas_papi_num_events);
448  CheckPAPI(errorcode, errorstring);
449 
450  errorcode = PAPI_stop_counters(ptr2values, combblas_papi_num_events);
451  papi_this_iterate[bfs_papi_enum.parents_updt].resize(combblas_papi_num_events+1); // +1 for the time
452  for(int k=0; k<combblas_papi_num_events; ++k)
453  {
454  papi_this_iterate[bfs_papi_enum.parents_updt][k] = ptr2values[k];
455  }
456  papi_this_iterate[bfs_papi_enum.parents_updt][combblas_papi_num_events] = papi_t_end - papi_s_end; // time in usec
457  bfs_counters.push_back(papi_this_iterate); // copy to bfs_counters[iterations]
458  #endif
459 
460  iterations++;
461  }
462  MPI_Barrier(MPI_COMM_WORLD);
463  double t2 = MPI_Wtime();
464 
465 
466  FullyDistSpVec<int64_t, ParentType> parentsp = parents.Find(isparentset());
467  parentsp.Apply(myset<ParentType>(ParentType(1)));
468 
469 #ifndef ONLYTIME
470  FullyDistSpVec<int64_t, int64_t> intraversed(fullWorld);
471  FullyDistSpVec<int64_t, int64_t> inprocessed(fullWorld);
472  FullyDistSpVec<int64_t, int64_t> outraversed(fullWorld);
473  FullyDistSpVec<int64_t, int64_t> ouprocessed(fullWorld);
474  inprocessed = EWiseApply<int64_t>(parentsp, indegrees, seldegree(), passifthere(), true, ParentType());
475  ouprocessed = EWiseApply<int64_t>(parentsp, oudegrees, seldegree(), passifthere(), true, ParentType());
476  int64_t nedges, in_nedges, ou_nedges;
477  if(string(argv[1]) == string("Gen"))
478  {
479  FullyDistSpVec<int64_t, int64_t> traversed = EWiseApply<int64_t>(parentsp, degrees_filt[trials], seldegree(), passifthere(), true, ParentType());
480  nedges = traversed.Reduce(plus<int64_t>(), (int64_t) 0);
481  }
482  else
483  {
484  intraversed = EWiseApply<int64_t>(parentsp, indegrees_filt, seldegree(), passifthere(), true, ParentType());
485  outraversed = EWiseApply<int64_t>(parentsp, oudegrees_filt, seldegree(), passifthere(), true, ParentType());
486  in_nedges = intraversed.Reduce(plus<int64_t>(), (int64_t) 0);
487  ou_nedges = outraversed.Reduce(plus<int64_t>(), (int64_t) 0);
488  nedges = in_nedges + ou_nedges; // count birectional edges twice
489  }
490 
491  int64_t in_nedges_processed = inprocessed.Reduce(plus<int64_t>(), (int64_t) 0);
492  int64_t ou_nedges_processed = ouprocessed.Reduce(plus<int64_t>(), (int64_t) 0);
493  int64_t nedges_processed = in_nedges_processed + ou_nedges_processed; // count birectional edges twice
494 #else
495  int64_t in_nedges, ou_nedges, nedges, in_nedges_processed, ou_nedges_processed, nedges_processed = 0;
496 #endif
497 
498  if(parentsp.getnnz() > CC_LIMIT)
499  {
500  // intraversed.PrintInfo("Incoming edges traversed per vertex");
501  // intraversed.DebugPrint();
502  // outraversed.PrintInfo("Outgoing edges traversed per vertex");
503  // outraversed.DebugPrint();
504 
505  #ifdef DEBUG
506  parents.PrintInfo("Final parents array");
507  parents.DebugPrint();
508  #endif
509 
510  ostringstream outnew;
511  outnew << i << "th starting vertex was " << Cands[i] << endl;
512  outnew << "Number iterations: " << iterations << endl;
513  outnew << "Number of vertices found: " << parentsp.getnnz() << endl;
514  outnew << "Number of edges traversed in both directions: " << nedges << endl;
515  if(string(argv[1]) == string("File"))
516  outnew << "Number of edges traversed in one direction: " << ou_nedges << endl;
517  outnew << "Number of edges processed in both directions: " << nedges_processed << endl;
518  outnew << "Number of edges processed in one direction: " << ou_nedges_processed << endl;
519  outnew << "BFS time: " << t2-t1 << " seconds" << endl;
520  outnew << "MTEPS (bidirectional): " << static_cast<double>(nedges) / (t2-t1) / 1000000.0 << endl;
521  if(string(argv[1]) == string("File"))
522  outnew << "MTEPS (unidirectional): " << static_cast<double>(ou_nedges) / (t2-t1) / 1000000.0 << endl;
523  outnew << "MPEPS (bidirectional): " << static_cast<double>(nedges_processed) / (t2-t1) / 1000000.0 << endl;
524  outnew << "MPEPS (unidirectional): " << static_cast<double>(ou_nedges_processed) / (t2-t1) / 1000000.0 << endl;
525  outnew << "Total communication (average so far): " << (cblas_allgathertime + cblas_alltoalltime) / (i+1) << endl;
526 
527  /* Write to PAPI */
528  #ifdef USE_PAPI
529  ostringstream papiout;
530  papiout << i << "th starting vertex was " << Cands[i] << endl;
531  papiout << "Threshold is " << LatestRetwitterBFS::sincedate << endl;
532 
533  for(int i=0; i < iterations; i++) // over all spmv iterations in this BFS
534  {
535  papiout << "Iteration : " << i << endl;
536  for(int j=0; j < bfs_papi_labels; ++j)
537  {
538  papiout << "Function : " << bfs_papi_labels[j] << endl;
539 
540  for(int k=0; k < combblas_papi_num_events; ++k)
541  {
542  papiout << combblas_event_names[k] << ":\t" << bfs_counters[i][j][k] << endl;
543  }
544  papiout << "Time (usec)" << ":\t" << bfs_counters[i][j][combblas_papi_num_events] << endl;
545  }
546  }
547  SpParHelper::PrintFile(papiout.str(), "PAPIRES.txt");
548  #endif
549  /* (End) Write to PAPI */
550 
551  TIMES[sruns] = t2-t1;
552  if(string(argv[1]) == string("Gen"))
553  EDGES[sruns] = static_cast<double>(nedges);
554  else
555  EDGES[sruns] = static_cast<double>(ou_nedges);
556 
557  MTEPS[sruns] = EDGES[sruns] / (t2-t1) / 1000000.0;
558  MPEPS[sruns++] = static_cast<double>(nedges_processed) / (t2-t1) / 1000000.0;
559  SpParHelper::Print(outnew.str());
560  }
561  }
562  if (sruns < MINRUNS)
563  {
564  SpParHelper::Print("Not enough valid runs done\n");
565  MPI_Finalize();
566  return 0;
567  }
568  ostringstream os;
569 
570  os << sruns << " valid runs done" << endl;
571  os << "Connected component lower limite was " << CC_LIMIT << endl;
572  os << "Per iteration communication times: " << endl;
573  os << "AllGatherv: " << cblas_allgathertime / sruns << endl;
574  os << "AlltoAllv: " << cblas_alltoalltime / sruns << endl;
575 
576  sort(EDGES, EDGES+sruns);
577  os << "--------------------------" << endl;
578  os << "Min nedges: " << EDGES[0] << endl;
579  os << "Median nedges: " << (EDGES[(sruns/2)-1] + EDGES[sruns/2])/2 << endl;
580  os << "Max nedges: " << EDGES[sruns-1] << endl;
581  double mean = accumulate( EDGES, EDGES+sruns, 0.0 )/ sruns;
582  vector<double> zero_mean(sruns); // find distances to the mean
583  transform(EDGES, EDGES+sruns, zero_mean.begin(), bind2nd( minus<double>(), mean ));
584  // self inner-product is sum of sum of squares
585  double deviation = inner_product( zero_mean.begin(),zero_mean.end(), zero_mean.begin(), 0.0 );
586  deviation = sqrt( deviation / (sruns-1) );
587  os << "Mean nedges: " << mean << endl;
588  os << "STDDEV nedges: " << deviation << endl;
589  os << "--------------------------" << endl;
590 
591  sort(TIMES,TIMES+sruns);
592  os << "Filter keeps " << static_cast<double>(keep[trials])/100.0 << " percentage of edges" << endl;
593  os << "Min time: " << TIMES[0] << " seconds" << endl;
594  os << "Median time: " << (TIMES[(sruns/2)-1] + TIMES[sruns/2])/2 << " seconds" << endl;
595  os << "Max time: " << TIMES[sruns-1] << " seconds" << endl;
596  mean = accumulate( TIMES, TIMES+sruns, 0.0 )/ sruns;
597  transform(TIMES, TIMES+sruns, zero_mean.begin(), bind2nd( minus<double>(), mean ));
598  deviation = inner_product( zero_mean.begin(),zero_mean.end(), zero_mean.begin(), 0.0 );
599  deviation = sqrt( deviation / (sruns-1) );
600  os << "Mean time: " << mean << " seconds" << endl;
601  os << "STDDEV time: " << deviation << " seconds" << endl;
602  os << "--------------------------" << endl;
603 
604  sort(MTEPS, MTEPS+sruns);
605  os << "Min MTEPS: " << MTEPS[0] << endl;
606  os << "Median MTEPS: " << (MTEPS[(sruns/2)-1] + MTEPS[sruns/2])/2 << endl;
607  os << "Max MTEPS: " << MTEPS[sruns-1] << endl;
608  transform(MTEPS, MTEPS+sruns, INVMTEPS, safemultinv<double>()); // returns inf for zero teps
609  double hteps = static_cast<double>(sruns) / accumulate(INVMTEPS, INVMTEPS+sruns, 0.0);
610  os << "Harmonic mean of MTEPS: " << hteps << endl;
611  transform(INVMTEPS, INVMTEPS+sruns, zero_mean.begin(), bind2nd(minus<double>(), 1/hteps));
612  deviation = inner_product( zero_mean.begin(),zero_mean.end(), zero_mean.begin(), 0.0 );
613  deviation = sqrt( deviation / (sruns-1) ) * (hteps*hteps); // harmonic_std_dev
614  os << "Harmonic standard deviation of MTEPS: " << deviation << endl;
615 
616  sort(MPEPS, MPEPS+sruns);
617  os << "Bidirectional Processed Edges per second (to estimate sustained BW)"<< endl;
618  os << "Min MPEPS: " << MPEPS[0] << endl;
619  os << "Median MPEPS: " << (MPEPS[(sruns/2)-1] + MPEPS[sruns/2])/2 << endl;
620  os << "Max MPEPS: " << MPEPS[sruns-1] << endl;
621  transform(MPEPS, MPEPS+sruns, INVMPEPS, safemultinv<double>()); // returns inf for zero teps
622  double hpeps = static_cast<double>(sruns) / accumulate(INVMPEPS, INVMPEPS+sruns, 0.0);
623  os << "Harmonic mean of MPEPS: " << hpeps << endl;
624  transform(INVMPEPS, INVMPEPS+sruns, zero_mean.begin(), bind2nd(minus<double>(), 1/hpeps));
625  deviation = inner_product( zero_mean.begin(),zero_mean.end(), zero_mean.begin(), 0.0 );
626  deviation = sqrt( deviation / (sruns-1) ) * (hpeps*hpeps); // harmonic_std_dev
627  os << "Harmonic standard deviation of MPEPS: " << deviation << endl;
628  SpParHelper::Print(os.str());
629  }
630  }
631  MPI_Finalize();
632  return 0;
633 }
634 
double B
FullyDistVec< IT, NT > Reduce(Dim dim, _BinaryOperation __binary_op, NT id, _UnaryOperation __unary_op) const
Definition: SpParMat.cpp:791
double rand()
std::shared_ptr< CommGrid > getcommgrid() const
Definition: SpParMat.h:275
int combblas_papi_events[]
ParentType NumSetter(ParentType &num, int64_t index)
Definition: TwitterEdge.h:193
void Apply(_UnaryOperation __unary_op)
Definition: SpParMat.h:145
void GenGraph500Data(double initiator[4], int log_numverts, int edgefactor, bool scramble=false, bool packed=false)
FullyDistVec< IT, IT > FindInds(_Predicate pred) const
Return the indices where pred is true.
void Symmetricize(PARMAT &A)
Definition: FilteredBFS.cpp:76
#define PERCENTS
Definition: FilteredBFS.cpp:67
#define ITERS
Definition: FilteredBFS.cpp:65
bool isRetwitter() const
Definition: TwitterEdge.h:24
std::string bfs_papi_labels
float LoadImbalance() const
Definition: SpParMat.cpp:665
double cblas_alltoalltime
Definition: FilteredBFS.cpp:54
int num_bfs_papi_labels
SpParMat< int64_t, TwitterEdge, SpDCCols< int64_t, TwitterEdge > > PSpMat_Twitter
Definition: FilteredMIS.cpp:92
void EWiseApply(const FullyDistVec< IT, NT2 > &other, _BinaryOperation __binary_op, _BinaryPredicate _do_op, const bool useExtendedBinOp)
#define CC_LIMIT
Definition: FilteredBFS.cpp:66
void ReadDistribute(const std::string &filename, int master, bool nonum, HANDLER handler, bool transpose=false, bool pario=false)
Definition: SpParMat.cpp:3560
std::vector< std::vector< std::vector< long long > > > bfs_counters
NT Reduce(_BinaryOperation __binary_op, NT init) const
std::string combblas_event_names[]
#define MINRUNS
Definition: FilteredBFS.cpp:68
double A
int combblas_papi_num_events
#define MAXTRIALS
void SelectCandidates(double nver)
ABAB: Put concept check, NT should be integer for this to make sense.
bool operator()(const TwitterEdge &x, time_t sincedate) const
double cblas_allgathertime
Definition: FilteredBFS.cpp:55
int main(int argc, char *argv[])
long int64_t
Definition: compat.h:21
int cblas_splits
Definition: FilteredBFS.cpp:56
IT getncol() const
Definition: SpParMat.cpp:694
void PrintInfo() const
Definition: SpParMat.cpp:2387
bool LastTweetBy(time_t end) const
Definition: TwitterEdge.h:27
const TwitterEdge operator()(const TwitterEdge &x) const
Definition: FilteredBFS.cpp:95
SpParMat< int64_t, bool, SpDCCols< int64_t, bool > > PSpMat_Bool
Definition: FilteredMIS.cpp:93
#define MAX_ITERS
Definition: FilteredBFS.cpp:63
static time_t sincedate
Definition: TwitterEdge.h:280
Definition: CCGrid.h:4
#define EDGEFACTOR
Definition: FilteredBFS.cpp:64
MTRand GlobalMT(1)
void Apply(_UnaryOperation __unary_op)
SpParMat< IT, NT, DER > Prune(_UnaryOperation __unary_op, bool inPlace=true)
Definition: SpParMat.h:175