COMBINATORIAL_BLAS  1.6
mtSpGEMM.h
Go to the documentation of this file.
1 #ifndef _mtSpGEMM_h
2 #define _mtSpGEMM_h
3 
4 #include "CombBLAS.h"
5 
6 namespace combblas {
7 /*
8  Multithreaded prefix sum
9  Inputs:
10  in: an input array
11  size: the length of the input array "in"
12  nthreads: number of threads used to compute the prefix sum
13 
14  Output:
15  return an array of size "size+1"
16  the memory of the output array is allocated internallay
17 
18  Example:
19 
20  in = [2, 1, 3, 5]
21  out = [0, 2, 3, 6, 11]
22  */
23 template <typename T>
24 T* prefixsum(T* in, int size, int nthreads)
25 {
26  std::vector<T> tsum(nthreads+1);
27  tsum[0] = 0;
28  T* out = new T[size+1];
29  out[0] = 0;
30  T* psum = &out[1];
31 #ifdef THREADED
32 #pragma omp parallel
33 #endif
34  {
35  int ithread = 0;
36  #ifdef THREADED
37  ithread = omp_get_thread_num();
38  #endif
39 
40  T sum = 0;
41 #ifdef THREADED
42 #pragma omp for schedule(static)
43 #endif
44  for (int i=0; i<size; i++)
45  {
46  sum += in[i];
47  psum[i] = sum;
48  }
49 
50  tsum[ithread+1] = sum;
51 #ifdef THREADED
52 #pragma omp barrier
53 #endif
54  T offset = 0;
55  for(int i=0; i<(ithread+1); i++)
56  {
57  offset += tsum[i];
58  }
59 
60 #ifdef THREADED
61 #pragma omp for schedule(static)
62 #endif
63  for (int i=0; i<size; i++)
64  {
65  psum[i] += offset;
66  }
67 
68  }
69  return out;
70 }
71 
72 
73 
74 
75 // multithreaded HeapSpGEMM
76 template <typename SR, typename NTO, typename IT, typename NT1, typename NT2>
79  const SpDCCols<IT, NT2> & B,
80  bool clearA, bool clearB)
81 {
82  IT mdim = A.getnrow();
83  IT ndim = B.getncol();
84  IT nnzA = A.getnnz();
85  if(A.isZero() || B.isZero())
86  {
87  return new SpTuples<IT, NTO>(0, mdim, ndim);
88  }
89 
90 
91  Dcsc<IT,NT1>* Adcsc = A.GetDCSC();
92  Dcsc<IT,NT2>* Bdcsc = B.GetDCSC();
93  IT nA = A.getncol();
94  float cf = static_cast<float>(nA+1) / static_cast<float>(Adcsc->nzc);
95  IT csize = static_cast<IT>(ceil(cf)); // chunk size
96  IT * aux;
97  Adcsc->ConstructAux(nA, aux);
98 
99 
100  int numThreads = 1;
101 #ifdef THREADED
102 #pragma omp parallel
103  {
104  numThreads = omp_get_num_threads();
105  }
106 #endif
107 
108  IT* colnnzC = estimateNNZ(A, B);
109  IT* colptrC = prefixsum<IT>(colnnzC, Bdcsc->nzc, numThreads);
110  delete [] colnnzC;
111  IT nnzc = colptrC[Bdcsc->nzc];
112  std::tuple<IT,IT,NTO> * tuplesC = static_cast<std::tuple<IT,IT,NTO> *> (::operator new (sizeof(std::tuple<IT,IT,NTO>[nnzc])));
113 
114 
115 
116 
117  // thread private space for heap and colinds
118  std::vector<std::vector< std::pair<IT,IT>>> colindsVec(numThreads);
119  std::vector<std::vector<HeapEntry<IT,NT1>>> globalheapVec(numThreads);
120 
121  for(int i=0; i<numThreads; i++) //inital allocation per thread, may be an overestimate, but does not require more memoty than inputs
122  {
123  colindsVec[i].resize(nnzA/numThreads);
124  globalheapVec[i].resize(nnzA/numThreads);
125  }
126 
127 
128 #ifdef THREADED
129 #pragma omp parallel for
130 #endif
131  for(int i=0; i < Bdcsc->nzc; ++i)
132  {
133  size_t nnzcolB = Bdcsc->cp[i+1] - Bdcsc->cp[i]; //nnz in the current column of B
134  int myThread = 0;
135  #ifdef THREADED
136  myThread = omp_get_thread_num();
137  #endif
138  if(colindsVec[myThread].size() < nnzcolB) //resize thread private vectors if needed
139  {
140  colindsVec[myThread].resize(nnzcolB);
141  globalheapVec[myThread].resize(nnzcolB);
142  }
143 
144 
145  // colinds.first vector keeps indices to A.cp, i.e. it dereferences "colnums" vector (above),
146  // colinds.second vector keeps the end indices (i.e. it gives the index to the last valid element of A.cpnack)
147  Adcsc->FillColInds(Bdcsc->ir + Bdcsc->cp[i], nnzcolB, colindsVec[myThread], aux, csize);
148  std::pair<IT,IT> * colinds = colindsVec[myThread].data();
149  HeapEntry<IT,NT1> * wset = globalheapVec[myThread].data();
150  IT hsize = 0;
151 
152 
153  for(IT j = 0; (unsigned)j < nnzcolB; ++j) // create the initial heap
154  {
155  if(colinds[j].first != colinds[j].second) // current != end
156  {
157  wset[hsize++] = HeapEntry< IT,NT1 > (Adcsc->ir[colinds[j].first], j, Adcsc->numx[colinds[j].first]);
158  }
159  }
160  std::make_heap(wset, wset+hsize);
161 
162  IT curptr = colptrC[i];
163  while(hsize > 0)
164  {
165  std::pop_heap(wset, wset + hsize); // result is stored in wset[hsize-1]
166  IT locb = wset[hsize-1].runr; // relative location of the nonzero in B's current column
167 
168  NTO mrhs = SR::multiply(wset[hsize-1].num, Bdcsc->numx[Bdcsc->cp[i]+locb]);
169  if (!SR::returnedSAID())
170  {
171  if( (curptr > colptrC[i]) && std::get<0>(tuplesC[curptr-1]) == wset[hsize-1].key)
172  {
173  std::get<2>(tuplesC[curptr-1]) = SR::add(std::get<2>(tuplesC[curptr-1]), mrhs);
174  }
175  else
176  {
177  tuplesC[curptr++]= std::make_tuple(wset[hsize-1].key, Bdcsc->jc[i], mrhs) ;
178  }
179 
180  }
181 
182  if( (++(colinds[locb].first)) != colinds[locb].second) // current != end
183  {
184  // runr stays the same !
185  wset[hsize-1].key = Adcsc->ir[colinds[locb].first];
186  wset[hsize-1].num = Adcsc->numx[colinds[locb].first];
187  std::push_heap(wset, wset+hsize);
188  }
189  else
190  {
191  --hsize;
192  }
193  }
194  }
195 
196  if(clearA)
197  delete const_cast<SpDCCols<IT, NT1> *>(&A);
198  if(clearB)
199  delete const_cast<SpDCCols<IT, NT2> *>(&B);
200 
201  delete [] colptrC;
202  delete [] aux;
203 
204  SpTuples<IT, NTO>* spTuplesC = new SpTuples<IT, NTO> (nnzc, mdim, ndim, tuplesC, true, true);
205  return spTuplesC;
206 
207 }
208 
209 // estimate space for result of SpGEMM
210 template <typename IT, typename NT1, typename NT2>
212 {
213  IT nnzA = A.getnnz();
214  if(A.isZero() || B.isZero())
215  {
216  return NULL;
217  }
218 
219  Dcsc<IT,NT1>* Adcsc = A.GetDCSC();
220  Dcsc<IT,NT2>* Bdcsc = B.GetDCSC();
221 
222  float cf = static_cast<float>(A.getncol()+1) / static_cast<float>(Adcsc->nzc);
223  IT csize = static_cast<IT>(ceil(cf)); // chunk size
224  IT * aux;
225  Adcsc->ConstructAux(A.getncol(), aux);
226 
227 
228  int numThreads = 1;
229 #ifdef THREADED
230 #pragma omp parallel
231  {
232  numThreads = omp_get_num_threads();
233  }
234 #endif
235 
236 
237  IT* colnnzC = new IT[Bdcsc->nzc]; // nnz in every nonempty column of C
238 
239 #ifdef THREADED
240 #pragma omp parallel for
241 #endif
242  for(IT i=0; i< Bdcsc->nzc; ++i)
243  {
244  colnnzC[i] = 0;
245  }
246 
247  // thread private space for heap and colinds
248  std::vector<std::vector< std::pair<IT,IT>>> colindsVec(numThreads);
249  std::vector<std::vector<std::pair<IT,IT>>> globalheapVec(numThreads);
250 
251 
252  for(int i=0; i<numThreads; i++) //inital allocation per thread, may be an overestimate, but does not require more memoty than inputs
253  {
254  colindsVec[i].resize(nnzA/numThreads);
255  globalheapVec[i].resize(nnzA/numThreads);
256  }
257 
258 #ifdef THREADED
259 #pragma omp parallel for
260 #endif
261  for(int i=0; i < Bdcsc->nzc; ++i)
262  {
263  size_t nnzcolB = Bdcsc->cp[i+1] - Bdcsc->cp[i]; //nnz in the current column of B
264  int myThread = 0;
265 #ifdef THREADED
266  myThread = omp_get_thread_num();
267 #endif
268  if(colindsVec[myThread].size() < nnzcolB) //resize thread private vectors if needed
269  {
270  colindsVec[myThread].resize(nnzcolB);
271  globalheapVec[myThread].resize(nnzcolB);
272  }
273 
274  // colinds.first vector keeps indices to A.cp, i.e. it dereferences "colnums" vector (above),
275  // colinds.second vector keeps the end indices (i.e. it gives the index to the last valid element of A.cpnack)
276  Adcsc->FillColInds(Bdcsc->ir + Bdcsc->cp[i], nnzcolB, colindsVec[myThread], aux, csize);
277  std::pair<IT,IT> * colinds = colindsVec[myThread].data();
278  std::pair<IT,IT> * curheap = globalheapVec[myThread].data();
279  IT hsize = 0;
280 
281  // create the initial heap
282  for(IT j = 0; (unsigned)j < nnzcolB; ++j)
283  {
284  if(colinds[j].first != colinds[j].second)
285  {
286  curheap[hsize++] = std::make_pair(Adcsc->ir[colinds[j].first], j);
287  }
288  }
289  std::make_heap(curheap, curheap+hsize, std::greater<std::pair<IT,IT>>());
290 
291  IT prevRow=-1; // previously popped row from heap
292 
293  while(hsize > 0)
294  {
295  std::pop_heap(curheap, curheap + hsize, std::greater<std::pair<IT,IT>>()); // result is stored in wset[hsize-1]
296  IT locb = curheap[hsize-1].second;
297 
298  if( curheap[hsize-1].first != prevRow)
299  {
300  prevRow = curheap[hsize-1].first;
301  colnnzC[i] ++;
302  }
303 
304  if( (++(colinds[locb].first)) != colinds[locb].second) // current != end
305  {
306  curheap[hsize-1].first = Adcsc->ir[colinds[locb].first];
307  std::push_heap(curheap, curheap+hsize, std::greater<std::pair<IT,IT>>());
308  }
309  else
310  {
311  --hsize;
312  }
313  }
314  }
315 
316  delete [] aux;
317  return colnnzC;
318 }
319 
320 }
321 
322 #endif
double B
Definition: HeapEntry.h:36
IT getnnz() const
Definition: SpDCCols.h:301
IT key
Definition: HeapEntry.h:41
IT getncol() const
Definition: SpDCCols.h:300
int size
IT * ir
row indices, size nz
Definition: dcsc.h:121
bool isZero() const
Definition: SpDCCols.h:298
IT * cp
The master array, size nzc+1 (keeps column pointers)
Definition: dcsc.h:117
SpTuples< IT, NTO > * LocalSpGEMM(const SpDCCols< IT, NT1 > &A, const SpDCCols< IT, NT2 > &B, bool clearA, bool clearB)
Definition: mtSpGEMM.h:78
void FillColInds(const VT *colnums, IT nind, std::vector< std::pair< IT, IT > > &colinds, IT *aux, IT csize) const
Definition: dcsc.cpp:1211
double A
IT * estimateNNZ(const SpDCCols< IT, NT1 > &A, const SpDCCols< IT, NT2 > &B)
Definition: mtSpGEMM.h:211
NT * numx
generic values, size nz
Definition: dcsc.h:122
IT nzc
number of columns with at least one non-zero in them
Definition: dcsc.h:125
IT ConstructAux(IT ndim, IT *&aux) const
Definition: dcsc.cpp:912
IT runr
Definition: HeapEntry.h:42
Definition: CCGrid.h:4
IT getnrow() const
Definition: SpDCCols.h:299
T * prefixsum(T *in, int size, int nthreads)
Definition: mtSpGEMM.h:24
SpDCCols< IT, NT > * multiply(SpDCCols< IT, NT > &splitA, SpDCCols< IT, NT > &splitB, CCGrid &CMG, bool isBT, bool threaded)
Definition: Multiplier.h:11
IT * jc
col indices, size nzc
Definition: dcsc.h:120
Dcsc< IT, NT > * GetDCSC() const
Definition: SpDCCols.h:322
NT num
Definition: HeapEntry.h:43