48 #include "papi_combblas_global.h" 63 #define MAX_ITERS 20000 67 #define PERCENTS 4 // testing with 4 different percentiles 75 template <
typename PARMAT>
99 time_t mylatest =
static_cast<int64_t>(GlobalMT.
rand() * 10000);
117 void CheckPAPI(
int errorcode,
char [] errorstring)
119 if (errorcode != PAPI_OK)
121 PAPI_perror(errorcode, errorstring, PAPI_MAX_STR_LEN);
122 fprintf(stderr,
"PAPI error (%d): %s\n", errorcode, errorstring);
129 int main(
int argc,
char* argv[])
134 int provided, flag, claimed;
135 MPI_Init_thread(&argc, &argv, MPI_THREAD_FUNNELED, &provided );
136 MPI_Is_thread_main( &flag );
138 SpParHelper::Print(
"This thread called init_thread but Is_thread_main gave false\n");
139 MPI_Query_thread( &claimed );
140 if (claimed != provided)
141 SpParHelper::Print(
"Query thread gave different thread level than requested\n");
143 MPI_Init(&argc, &argv);
144 int cblas_splits = 1;
147 MPI_Comm_size(MPI_COMM_WORLD,&nprocs);
148 MPI_Comm_rank(MPI_COMM_WORLD,&myrank);
154 retval = PAPI_library_init(PAPI_VER_CURRENT);
155 if (retval != PAPI_VER_CURRENT && retval > 0)
157 fprintf(stderr,
"PAPI library version mismatch!\en");
160 retval = PAPI_is_initialized();
161 if (retval != PAPI_LOW_LEVEL_INITED)
162 cout <<
"Not initialized" << endl;
169 cout <<
"Usage: ./FilteredBFS <File, Gen> <Input Name | Scale> (Optional: Double)" << endl;
170 cout <<
"Example: ./FilteredBFS File twitter_small.txt Double" << endl;
178 shared_ptr<CommGrid> fullWorld;
179 fullWorld.reset(
new CommGrid(MPI_COMM_WORLD, 0, 0) );
182 PSpMat_Twitter
A(fullWorld);
188 double t01 = MPI_Wtime();
189 if(
string(argv[1]) ==
string(
"File"))
191 SpParHelper::Print(
"Using real data, which we NEVER permute for load balance, also leaving isolated vertices as-is, if any\n");
192 SpParHelper::Print(
"This is because the input is assumed to be load balanced\n");
193 SpParHelper::Print(
"BFS is run on DIRECTED graph, hence hitting SCCs, and TEPS is unidirectional\n");
200 SpParHelper::Print(
"Read input\n");
203 if(argc == 4 &&
string(argv[3]) ==
string(
"Double"))
212 else if(
string(argv[1]) ==
string(
"Gen"))
214 SpParHelper::Print(
"Using synthetic data, which we ALWAYS permute for load balance\n");
215 SpParHelper::Print(
"We only balance the original input, we don't repermute after each filter change\n");
216 SpParHelper::Print(
"BFS is run on UNDIRECTED graph, hence hitting CCs, and TEPS is bidirectional\n");
218 double initiator[4] = {.57, .19, .19, .05};
219 double t01 = MPI_Wtime();
223 unsigned scale =
static_cast<unsigned>(atoi(argv[2]));
225 outs <<
"Forcing scale to : " << scale << endl;
226 SpParHelper::Print(outs.str());
230 SpParHelper::Print(
"Generated renamed edge lists\n");
234 ostringstream loopinfo;
235 loopinfo <<
"Converted to Boolean and removed " << removed <<
" loops" << endl;
236 SpParHelper::Print(loopinfo.str());
245 SpParHelper::Print(
"Not supported yet\n");
248 double t02 = MPI_Wtime();
250 tinfo <<
"I/O (or generation) took " << t02-t01 <<
" seconds" << endl;
251 SpParHelper::Print(tinfo.str());
255 ABool->
Reduce(oudegrees,
Column, plus<int64_t>(), static_cast<int64_t>(0));
256 ABool->
Reduce(indegrees,
Row, plus<int64_t>(), static_cast<int64_t>(0));
266 if(
string(argv[1]) ==
string(
"File"))
269 memset(&timeinfo, 0,
sizeof(
struct tm));
270 int year, month, day, hour, min, sec;
271 year = 2009; month = 7; day = 1;
272 hour = 0; min = 0; sec = 0;
274 timeinfo.tm_year = year - 1900;
275 timeinfo.tm_mon = month - 1 ;
276 timeinfo.tm_mday = day;
277 timeinfo.tm_hour = hour;
278 timeinfo.tm_min = min;
279 timeinfo.tm_sec = sec;
280 time_t mysincedate = timegm(&timeinfo);
282 PSpMat_Twitter
B =
A;
284 PSpMat_Bool BBool =
B;
286 BBool.
Reduce(oudegrees_filt,
Column, plus<int64_t>(), static_cast<int64_t>(0));
287 BBool.
Reduce(indegrees_filt,
Row, plus<int64_t>(), static_cast<int64_t>(0));
291 degrees.
EWiseApply(oudegrees, plus<int64_t>());
292 SpParHelper::Print(
"All degrees calculated\n");
297 outs <<
"Load balance: " << balance << endl;
298 SpParHelper::Print(outs.str());
300 if(
string(argv[1]) == string(
"Gen"))
306 SpParHelper::Print(
"Symmetricized\n");
312 SpParHelper::Print(
"Found (and permuted) non-isolated vertices\n");
314 A(*nonisov, *nonisov,
true);
315 SpParHelper::Print(
"Dropped isolated vertices from input\n");
317 indegrees = indegrees(*nonisov);
318 oudegrees = oudegrees(*nonisov);
319 degrees = degrees(*nonisov);
324 PSpMat_Twitter
B =
A;
326 PSpMat_Bool BBool =
B;
330 outs <<
"Load balance of " <<
static_cast<float>(keep[i])/100 <<
"% filtered case: " << balance << endl;
331 SpParHelper::Print(outs.str());
334 BBool.
Reduce(degrees_filt[i],
Column, plus<int64_t>(), static_cast<int64_t>(0));
340 ostringstream outs_former;
341 outs_former <<
"Load balance: " << balance_former << endl;
342 SpParHelper::Print(outs_former.str());
344 MPI_Barrier(MPI_COMM_WORLD);
345 double t1 = MPI_Wtime();
348 double nver = (double) degrees.TotalLength();
351 for(
int trials =0; trials <
MAXTRIALS; trials++)
353 if(
string(argv[1]) ==
string(
"Gen"))
358 SpParHelper::Print(outs.str());
376 MPI_Barrier(MPI_COMM_WORLD);
377 double t1 = MPI_Wtime();
380 char errorstring[PAPI_MAX_STR_LEN+1];
384 fringe.SetElement(Cands[i], Cands[i]);
385 parents.SetElement(Cands[i],
ParentType(Cands[i]));
387 while(fringe.getnnz() > 0)
396 CheckPAPI(errorcode, errorstring);
397 long_long papi_t_sta = PAPI_get_real_usec();
401 SpMV<LatestRetwitterBFS>(
A, fringe, fringe,
false);
404 lond_long papi_t_end = PAPI_get_real_usec();
406 CheckPAPI(errorcode, errorstring);
417 CheckPAPI(errorcode, errorstring);
418 papi_t_sta = PAPI_get_real_usec();
426 papi_t_end = PAPI_get_real_usec();
427 errorcode = PAPI_read_counters(ptr2values, combblas_papi_num_events);
428 CheckPAPI(errorcode, errorstring);
430 errorcode = PAPI_stop_counters(ptr2values, combblas_papi_num_events);
431 papi_this_iterate[
bfs_papi_enum.fringe_updt].resize(combblas_papi_num_events+1);
434 papi_this_iterate[
bfs_papi_enum.fringe_updt][k] = ptr2values[k];
439 CheckPAPI(errorcode, errorstring);
440 papi_t_sta = PAPI_get_real_usec();
446 papi_t_end = PAPI_get_real_usec();
447 errorcode = PAPI_read_counters(ptr2values, combblas_papi_num_events);
448 CheckPAPI(errorcode, errorstring);
450 errorcode = PAPI_stop_counters(ptr2values, combblas_papi_num_events);
451 papi_this_iterate[
bfs_papi_enum.parents_updt].resize(combblas_papi_num_events+1);
454 papi_this_iterate[
bfs_papi_enum.parents_updt][k] = ptr2values[k];
462 MPI_Barrier(MPI_COMM_WORLD);
463 double t2 = MPI_Wtime();
476 int64_t nedges, in_nedges, ou_nedges;
477 if(
string(argv[1]) == string(
"Gen"))
486 in_nedges = intraversed.
Reduce(plus<int64_t>(), (
int64_t) 0);
487 ou_nedges = outraversed.Reduce(plus<int64_t>(), (
int64_t) 0);
488 nedges = in_nedges + ou_nedges;
492 int64_t ou_nedges_processed = ouprocessed.Reduce(plus<int64_t>(), (
int64_t) 0);
493 int64_t nedges_processed = in_nedges_processed + ou_nedges_processed;
495 int64_t in_nedges, ou_nedges, nedges, in_nedges_processed, ou_nedges_processed, nedges_processed = 0;
506 parents.PrintInfo(
"Final parents array");
507 parents.DebugPrint();
510 ostringstream outnew;
511 outnew << i <<
"th starting vertex was " << Cands[i] << endl;
512 outnew <<
"Number iterations: " << iterations << endl;
513 outnew <<
"Number of vertices found: " << parentsp.getnnz() << endl;
514 outnew <<
"Number of edges traversed in both directions: " << nedges << endl;
515 if(
string(argv[1]) ==
string(
"File"))
516 outnew <<
"Number of edges traversed in one direction: " << ou_nedges << endl;
517 outnew <<
"Number of edges processed in both directions: " << nedges_processed << endl;
518 outnew <<
"Number of edges processed in one direction: " << ou_nedges_processed << endl;
519 outnew <<
"BFS time: " << t2-t1 <<
" seconds" << endl;
520 outnew <<
"MTEPS (bidirectional): " <<
static_cast<double>(nedges) / (t2-t1) / 1000000.0 << endl;
521 if(
string(argv[1]) == string(
"File"))
522 outnew <<
"MTEPS (unidirectional): " << static_cast<double>(ou_nedges) / (t2-t1) / 1000000.0 << endl;
523 outnew <<
"MPEPS (bidirectional): " <<
static_cast<double>(nedges_processed) / (t2-t1) / 1000000.0 << endl;
524 outnew <<
"MPEPS (unidirectional): " <<
static_cast<double>(ou_nedges_processed) / (t2-t1) / 1000000.0 << endl;
525 outnew <<
"Total communication (average so far): " << (cblas_allgathertime +
cblas_alltoalltime) / (i+1) << endl;
529 ostringstream papiout;
530 papiout << i <<
"th starting vertex was " << Cands[i] << endl;
533 for(
int i=0; i < iterations; i++)
535 papiout <<
"Iteration : " << i << endl;
538 papiout <<
"Function : " << bfs_papi_labels[j] << endl;
547 SpParHelper::PrintFile(papiout.str(),
"PAPIRES.txt");
551 TIMES[sruns] = t2-t1;
552 if(
string(argv[1]) == string(
"Gen"))
553 EDGES[sruns] = static_cast<double>(nedges);
555 EDGES[sruns] =
static_cast<double>(ou_nedges);
557 MTEPS[sruns] = EDGES[sruns] / (t2-t1) / 1000000.0;
558 MPEPS[sruns++] =
static_cast<double>(nedges_processed) / (t2-t1) / 1000000.0;
559 SpParHelper::Print(outnew.str());
564 SpParHelper::Print(
"Not enough valid runs done\n");
570 os << sruns <<
" valid runs done" << endl;
571 os <<
"Connected component lower limite was " <<
CC_LIMIT << endl;
572 os <<
"Per iteration communication times: " << endl;
573 os <<
"AllGatherv: " << cblas_allgathertime / sruns << endl;
576 sort(EDGES, EDGES+sruns);
577 os <<
"--------------------------" << endl;
578 os <<
"Min nedges: " << EDGES[0] << endl;
579 os <<
"Median nedges: " << (EDGES[(sruns/2)-1] + EDGES[sruns/2])/2 << endl;
580 os <<
"Max nedges: " << EDGES[sruns-1] << endl;
581 double mean = accumulate( EDGES, EDGES+sruns, 0.0 )/ sruns;
582 vector<double> zero_mean(sruns);
583 transform(EDGES, EDGES+sruns, zero_mean.begin(), bind2nd( minus<double>(), mean ));
585 double deviation = inner_product( zero_mean.begin(),zero_mean.end(), zero_mean.begin(), 0.0 );
586 deviation = sqrt( deviation / (sruns-1) );
587 os <<
"Mean nedges: " << mean << endl;
588 os <<
"STDDEV nedges: " << deviation << endl;
589 os <<
"--------------------------" << endl;
591 sort(TIMES,TIMES+sruns);
592 os <<
"Filter keeps " <<
static_cast<double>(keep[trials])/100.0 <<
" percentage of edges" << endl;
593 os <<
"Min time: " << TIMES[0] <<
" seconds" << endl;
594 os <<
"Median time: " << (TIMES[(sruns/2)-1] + TIMES[sruns/2])/2 <<
" seconds" << endl;
595 os <<
"Max time: " << TIMES[sruns-1] <<
" seconds" << endl;
596 mean = accumulate( TIMES, TIMES+sruns, 0.0 )/ sruns;
597 transform(TIMES, TIMES+sruns, zero_mean.begin(), bind2nd( minus<double>(), mean ));
598 deviation = inner_product( zero_mean.begin(),zero_mean.end(), zero_mean.begin(), 0.0 );
599 deviation = sqrt( deviation / (sruns-1) );
600 os <<
"Mean time: " << mean <<
" seconds" << endl;
601 os <<
"STDDEV time: " << deviation <<
" seconds" << endl;
602 os <<
"--------------------------" << endl;
604 sort(MTEPS, MTEPS+sruns);
605 os <<
"Min MTEPS: " << MTEPS[0] << endl;
606 os <<
"Median MTEPS: " << (MTEPS[(sruns/2)-1] + MTEPS[sruns/2])/2 << endl;
607 os <<
"Max MTEPS: " << MTEPS[sruns-1] << endl;
609 double hteps =
static_cast<double>(sruns) / accumulate(INVMTEPS, INVMTEPS+sruns, 0.0);
610 os <<
"Harmonic mean of MTEPS: " << hteps << endl;
611 transform(INVMTEPS, INVMTEPS+sruns, zero_mean.begin(), bind2nd(minus<double>(), 1/hteps));
612 deviation = inner_product( zero_mean.begin(),zero_mean.end(), zero_mean.begin(), 0.0 );
613 deviation = sqrt( deviation / (sruns-1) ) * (hteps*hteps);
614 os <<
"Harmonic standard deviation of MTEPS: " << deviation << endl;
616 sort(MPEPS, MPEPS+sruns);
617 os <<
"Bidirectional Processed Edges per second (to estimate sustained BW)"<< endl;
618 os <<
"Min MPEPS: " << MPEPS[0] << endl;
619 os <<
"Median MPEPS: " << (MPEPS[(sruns/2)-1] + MPEPS[sruns/2])/2 << endl;
620 os <<
"Max MPEPS: " << MPEPS[sruns-1] << endl;
622 double hpeps =
static_cast<double>(sruns) / accumulate(INVMPEPS, INVMPEPS+sruns, 0.0);
623 os <<
"Harmonic mean of MPEPS: " << hpeps << endl;
624 transform(INVMPEPS, INVMPEPS+sruns, zero_mean.begin(), bind2nd(minus<double>(), 1/hpeps));
625 deviation = inner_product( zero_mean.begin(),zero_mean.end(), zero_mean.begin(), 0.0 );
626 deviation = sqrt( deviation / (sruns-1) ) * (hpeps*hpeps);
627 os <<
"Harmonic standard deviation of MPEPS: " << deviation << endl;
628 SpParHelper::Print(os.str());
FullyDistVec< IT, NT > Reduce(Dim dim, _BinaryOperation __binary_op, NT id, _UnaryOperation __unary_op) const
std::shared_ptr< CommGrid > getcommgrid() const
int combblas_papi_events[]
void Apply(_UnaryOperation __unary_op)
void GenGraph500Data(double initiator[4], int log_numverts, int edgefactor, bool scramble=false, bool packed=false)
FullyDistVec< IT, IT > FindInds(_Predicate pred) const
Return the indices where pred is true.
void Symmetricize(PARMAT &A)
std::string bfs_papi_labels
float LoadImbalance() const
double cblas_alltoalltime
SpParMat< int64_t, TwitterEdge, SpDCCols< int64_t, TwitterEdge > > PSpMat_Twitter
void EWiseApply(const FullyDistVec< IT, NT2 > &other, _BinaryOperation __binary_op, _BinaryPredicate _do_op, const bool useExtendedBinOp)
void ReadDistribute(const std::string &filename, int master, bool nonum, HANDLER handler, bool transpose=false, bool pario=false)
std::vector< std::vector< std::vector< long long > > > bfs_counters
NT Reduce(_BinaryOperation __binary_op, NT init) const
std::string combblas_event_names[]
int combblas_papi_num_events
void SelectCandidates(double nver)
ABAB: Put concept check, NT should be integer for this to make sense.
double cblas_allgathertime
int main(int argc, char *argv[])
const TwitterEdge operator()(const TwitterEdge &x) const
SpParMat< int64_t, bool, SpDCCols< int64_t, bool > > PSpMat_Bool
void Apply(_UnaryOperation __unary_op)
SpParMat< IT, NT, DER > Prune(_UnaryOperation __unary_op, bool inPlace=true)