COMBINATORIAL_BLAS  1.6
BetwCent.cpp
Go to the documentation of this file.
1 /****************************************************************/
2 /* Parallel Combinatorial BLAS Library (for Graph Computations) */
3 /* version 1.6 -------------------------------------------------*/
4 /* date: 6/15/2017 ---------------------------------------------*/
5 /* authors: Ariful Azad, Aydin Buluc --------------------------*/
6 /****************************************************************/
7 /*
8  Copyright (c) 2010-2017, The Regents of the University of California
9 
10  Permission is hereby granted, free of charge, to any person obtaining a copy
11  of this software and associated documentation files (the "Software"), to deal
12  in the Software without restriction, including without limitation the rights
13  to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14  copies of the Software, and to permit persons to whom the Software is
15  furnished to do so, subject to the following conditions:
16 
17  The above copyright notice and this permission notice shall be included in
18  all copies or substantial portions of the Software.
19 
20  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21  IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22  FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23  AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24  LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25  OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26  THE SOFTWARE.
27  */
28 
29 // These macros should be defined before stdint.h is included
30 #ifndef __STDC_CONSTANT_MACROS
31 #define __STDC_CONSTANT_MACROS
32 #endif
33 #ifndef __STDC_LIMIT_MACROS
34 #define __STDC_LIMIT_MACROS
35 #endif
36 #include <stdint.h>
37 #include <mpi.h>
38 #include <iostream>
39 #include <fstream>
40 #include <string>
41 #include <sstream> // Required for stringstreams
42 #include <ctime>
43 #include <cmath>
44 #include "CombBLAS/CombBLAS.h"
45 
46 using namespace combblas;
47 using namespace std;
48 
49 // Simple helper class for declarations: Just the numerical type is templated
50 // The index type and the sequential matrix type stays the same for the whole code
51 // In this case, they are "int" and "SpDCCols"
52 template <class NT>
53 class Dist
54 {
55 public:
58 };
59 
60 
61 int main(int argc, char* argv[])
62 {
63  int nprocs, myrank;
64  MPI_Init(&argc, &argv);
65  MPI_Comm_size(MPI_COMM_WORLD,&nprocs);
66  MPI_Comm_rank(MPI_COMM_WORLD,&myrank);
67 
68  typedef PlusTimesSRing<bool, int> PTBOOLINT;
69  typedef PlusTimesSRing<bool, double> PTBOOLDOUBLE;
70 
71  if(argc < 4)
72  {
73  if(myrank == 0)
74  {
75  cout << "Usage: ./betwcent <BASEADDRESS> <K4APPROX> <BATCHSIZE> <output file - optional>" << endl;
76  cout << "Example: ./betwcent Data/ 15 128" << endl;
77  cout << "Input file input.mtx should be under <BASEADDRESS> in matrix market format" << endl;
78  cout << "<BATCHSIZE> should be a multiple of sqrt(p)" << endl;
79  cout << "Because <BATCHSIZE> is for the overall matrix (similarly, <K4APPROX> is global as well) " << endl;
80  }
81  MPI_Finalize();
82  return -1;
83  }
84 
85  {
86  int K4Approx = atoi(argv[2]);
87  int batchSize = atoi(argv[3]);
88 
89  string directory(argv[1]);
90  string ifilename = "input.mtx";
91  ifilename = directory+"/"+ifilename;
92 
93  shared_ptr<CommGrid> fullWorld;
94  fullWorld.reset( new CommGrid(MPI_COMM_WORLD, 0, 0) );
95 
96  Dist<bool>::MPI_DCCols A(fullWorld);
97  Dist<bool>::MPI_DCCols AT(fullWorld); // construct object
98  AT.ParallelReadMM(ifilename, true, maximum<double>()); // read it from file, note that we use the transpose of "input" data
99  A = AT;
100  A.Transpose();
101 
102  int nPasses = (int) pow(2.0, K4Approx);
103  int numBatches = (int) ceil( static_cast<float>(nPasses)/ static_cast<float>(batchSize));
104 
105  // get the number of batch vertices for submatrix
106  int subBatchSize = batchSize / (AT.getcommgrid())->GetGridCols();
107  int nBatchSize = subBatchSize * (AT.getcommgrid())->GetGridCols();
108  nPasses = numBatches * nBatchSize; // update the number of starting vertices
109 
110  if(batchSize % (AT.getcommgrid())->GetGridCols() > 0 && myrank == 0)
111  {
112  cout << "*** Batchsize is not evenly divisible by the grid dimension ***" << endl;
113  cout << "*** Processing "<< nPasses <<" vertices instead"<< endl;
114  }
115 
116  A.PrintInfo();
117  ostringstream tinfo;
118  tinfo << "Batch processing will occur " << numBatches << " times, each processing " << nBatchSize << " vertices (overall)" << endl;
119  SpParHelper::Print(tinfo.str());
120 
121  vector<int> candidates;
122  // Only consider non-isolated vertices
123  int vertices = 0;
124  int vrtxid = 0;
125  int nlocpass = numBatches * subBatchSize;
126  while(vertices < nlocpass)
127  {
128  vector<int> single;
129  vector<int> empty;
130  single.push_back(vrtxid); // will return ERROR if vrtxid > N (the column dimension)
131  int locnnz = ((AT.seq())(empty,single)).getnnz();
132  int totnnz;
133  MPI_Allreduce( &locnnz, &totnnz, 1, MPI_INT, MPI_SUM, (AT.getcommgrid())->GetColWorld());
134 
135  if(totnnz > 0)
136  {
137  candidates.push_back(vrtxid);
138  ++vertices;
139  }
140  ++vrtxid;
141  }
142 
143  SpParHelper::Print("Candidates chosen, precomputation finished\n");
144  double t1 = MPI_Wtime();
145  vector<int> batch(subBatchSize);
146  FullyDistVec<int, double> bc(AT.getcommgrid(), A.getnrow(), 0.0);
147 
148  for(int i=0; i< numBatches; ++i)
149  {
150  for(int j=0; j< subBatchSize; ++j)
151  {
152  batch[j] = candidates[i*subBatchSize + j];
153  }
154 
155  Dist<int>::MPI_DCCols fringe = AT.SubsRefCol(batch);
156 
157  // Create nsp by setting (r,i)=1 for the ith root vertex with label r
158  // Inially only the diagonal processors have any nonzeros (because we chose roots so)
159  Dist<int>::DCCols * nsploc = new Dist<int>::DCCols();
160  tuple<int, int, int> * mytuples = NULL;
161  if(AT.getcommgrid()->GetRankInProcRow() == AT.getcommgrid()->GetRankInProcCol())
162  {
163  mytuples = new tuple<int, int, int>[subBatchSize];
164  for(int k =0; k<subBatchSize; ++k)
165  {
166  mytuples[k] = make_tuple(batch[k], k, 1);
167  }
168  nsploc->Create( subBatchSize, AT.getlocalrows(), subBatchSize, mytuples);
169  }
170  else
171  {
172  nsploc->Create( 0, AT.getlocalrows(), subBatchSize, mytuples);
173  }
174 
175  Dist<int>::MPI_DCCols nsp(nsploc, AT.getcommgrid());
176  vector < Dist<bool>::MPI_DCCols * > bfs; // internally keeps track of depth
177 
178  SpParHelper::Print("Exploring via BFS...\n");
179  while( fringe.getnnz() > 0 )
180  {
181  nsp += fringe;
182  Dist<bool>::MPI_DCCols * level = new Dist<bool>::MPI_DCCols( fringe );
183  bfs.push_back(level);
184 
185  fringe = PSpGEMM<PTBOOLINT>(AT, fringe);
186  fringe = EWiseMult(fringe, nsp, true);
187  }
188 
189  // Apply the unary function 1/x to every element in the matrix
190  // 1/x works because no explicit zeros are stored in the sparse matrix nsp
191  Dist<double>::MPI_DCCols nspInv = nsp;
192  nspInv.Apply(bind1st(divides<double>(), 1));
193 
194  // create a dense matrix with all 1's
195  DenseParMat<int, double> bcu(1.0, AT.getcommgrid(), fringe.getlocalrows(), fringe.getlocalcols() );
196 
197  SpParHelper::Print("Tallying...\n");
198  // BC update for all vertices except the sources
199  for(int j = bfs.size()-1; j > 0; --j)
200  {
201  Dist<double>::MPI_DCCols w = EWiseMult( *bfs[j], nspInv, false);
202  w.EWiseScale(bcu);
203 
204  Dist<double>::MPI_DCCols product = PSpGEMM<PTBOOLDOUBLE>(A,w);
205  product = EWiseMult(product, *bfs[j-1], false);
206  product = EWiseMult(product, nsp, false);
207 
208  bcu += product;
209  }
210  for(int j=0; j < bfs.size(); ++j)
211  {
212  delete bfs[j];
213  }
214 
215  SpParHelper::Print("Adding bc contributions...\n");
216  bc += FullyDistVec<int, double>(bcu.Reduce(Row, plus<double>(), 0.0)); // pack along rows
217  }
218  bc.Apply(bind2nd(minus<double>(), nPasses)); // Subtrack nPasses from all the bc scores (because bcu was initialized to all 1's)
219 
220  double t2=MPI_Wtime();
221  double TEPS = (nPasses * static_cast<float>(A.getnnz())) / (t2-t1);
222  if( myrank == 0)
223  {
224  cout<<"Computation finished"<<endl;
225  fprintf(stdout, "%.6lf seconds elapsed for %d starting vertices\n", t2-t1, nPasses);
226  fprintf(stdout, "TEPS score is: %.6lf\n", TEPS);
227  }
228  ofstream output(argv[4]);
229  bc.SaveGathered(output, 0);
230  output.close();
231  }
232 
233  // make sure the destructors for all objects are called before MPI::Finalize()
234  MPI_Finalize();
235  return 0;
236 }
237 
SpParMat< int, NT, DCCols > MPI_DCCols
Definition: BetwCent.cpp:57
std::shared_ptr< CommGrid > getcommgrid() const
Definition: SpParMat.h:275
Compute the maximum of two values.
Definition: Operations.h:154
void Apply(_UnaryOperation __unary_op)
Definition: SpParMat.h:145
DER::LocalIT getlocalrows() const
Definition: SpParMat.h:276
IT getnnz() const
Definition: SpParMat.cpp:676
Dcsc< IU, typename promote_trait< NU1, NU2 >::T_promote > EWiseMult(const Dcsc< IU, NU1 > &A, const Dcsc< IU, NU2 > *B, bool exclude)
Definition: Friends.h:694
void SaveGathered(std::ofstream &outfile, int master, HANDLER handler, bool printProcSplits=false)
DER::LocalIT getlocalcols() const
Definition: SpParMat.h:277
void Create(const std::vector< IT > &essentials)
Definition: SpMat.h:61
double A
static void Print(const std::string &s)
void PrintInfo() const
Definition: SpParMat.cpp:2387
void Apply(_UnaryOperation __unary_op)
Definition: FullyDistVec.h:182
SpDCCols< int, NT > DCCols
Definition: BetwCent.cpp:56
Definition: CCGrid.h:4
SpParMat< IT, NT, DER > SubsRefCol(const std::vector< IT > &ci) const
Column indexing with special parallel semantics.
Definition: SpParMat.cpp:1867
int main(int argc, char *argv[])
Definition: BetwCent.cpp:61
void ParallelReadMM(const std::string &filename, bool onebased, _BinaryOperation BinOp)
Definition: SpParMat.cpp:3417
void EWiseScale(const DenseParMat< IT, NT > &rhs)
Definition: SpParMat.cpp:2350
IT getnrow() const
Definition: SpParMat.cpp:685