COMBINATORIAL_BLAS  1.6
MultiwayMerge.h
Go to the documentation of this file.
1 #include "CombBLAS.h"
2 
3 namespace combblas {
4 
5 
6 /***************************************************************************
7  * Find indices of column splitters in a list of tuple in parallel.
8  * Inputs:
9  * tuples: an array of SpTuples each tuple is (rowid, colid, val)
10  * nsplits: number of splits requested
11  * Output:
12  * splitters: An array of size (nsplits+1) storing the starts and ends of split tuples.
13  * different type used for output since we might need int or IT
14  ***************************************************************************/
15 
16 template <typename RT, typename IT, typename NT>
17 std::vector<RT> findColSplitters(SpTuples<IT,NT> * & spTuples, int nsplits)
18 {
19  std::vector<RT> splitters(nsplits+1);
20  splitters[0] = static_cast<RT>(0);
22 #ifdef THREADED
23 #pragma omp parallel for
24 #endif
25  for(int i=1; i< nsplits; i++)
26  {
27  IT cur_col = i * (spTuples->getncol()/nsplits);
28  std::tuple<IT,IT,NT> search_tuple(0, cur_col, 0);
29  std::tuple<IT,IT,NT>* it = std::lower_bound (spTuples->tuples, spTuples->tuples + spTuples->getnnz(), search_tuple, comp);
30  splitters[i] = (RT) (it - spTuples->tuples);
31  }
32  splitters[nsplits] = spTuples->getnnz();
33 
34  return splitters;
35 }
36 
37 
38 // Symbolic serial merge : only estimates nnz
39 template<class IT, class NT>
40 IT SerialMergeNNZ( const std::vector<SpTuples<IT,NT> *> & ArrSpTups)
41 {
42  int nlists = ArrSpTups.size();
43  ColLexiCompare<IT,int> heapcomp;
44  std::vector<std::tuple<IT, IT, int>> heap(nlists);
45  std::vector<IT> curptr(nlists, static_cast<IT>(0));
46  IT hsize = 0;
47  for(int i=0; i< nlists; ++i)
48  {
49  if(ArrSpTups[i]->getnnz()>0)
50  {
51  heap[hsize++] = std::make_tuple(std::get<0>(ArrSpTups[i]->tuples[0]), std::get<1>(ArrSpTups[i]->tuples[0]), i);
52  }
53 
54  }
55  std::make_heap(heap.data(), heap.data()+hsize, std::not2(heapcomp));
56 
57  std::tuple<IT, IT, NT> curTuple;
58  IT estnnz = 0;
59  while(hsize > 0)
60  {
61  std::pop_heap(heap.data(), heap.data() + hsize, std::not2(heapcomp)); // result is stored in heap[hsize-1]
62  int source = std::get<2>(heap[hsize-1]);
63  if( (estnnz ==0) || (std::get<0>(curTuple) != std::get<0>(heap[hsize-1])) || (std::get<1>(curTuple) != std::get<1>(heap[hsize-1])))
64  {
65  curTuple = ArrSpTups[source]->tuples[curptr[source]];
66  estnnz++;
67  }
68  curptr[source]++;
69  if(curptr[source] != ArrSpTups[source]->getnnz()) // That array has not been depleted
70  {
71  heap[hsize-1] = std::make_tuple(std::get<0>(ArrSpTups[source]->tuples[curptr[source]]),
72  std::get<1>(ArrSpTups[source]->tuples[curptr[source]]), source);
73  std::push_heap(heap.data(), heap.data()+hsize, std::not2(heapcomp));
74  }
75  else
76  {
77  --hsize;
78  }
79  }
80  return estnnz;
81 }
82 
83 
84 /*
85  "Internal function" called by MultiwayMerge inside threaded region.
86  The merged list is stored in a preallocated buffer ntuples
87  Never called from outside.
88  Assumption1: the input lists are already column sorted
89  Assumption2: at least two lists are passed to this function
90  Assumption3: the input and output lists are to be deleted by caller
91  */
92 
93 template<class SR, class IT, class NT>
94 void SerialMerge( const std::vector<SpTuples<IT,NT> *> & ArrSpTups, std::tuple<IT, IT, NT> * ntuples)
95 {
96  int nlists = ArrSpTups.size();
97  ColLexiCompare<IT,int> heapcomp;
98  std::vector<std::tuple<IT, IT, int>> heap(nlists); // if performance issue, create this outside of threaded region
99  std::vector<IT> curptr(nlists, static_cast<IT>(0));
100  IT estnnz = 0;
101  IT hsize = 0;
102  for(int i=0; i< nlists; ++i)
103  {
104  if(ArrSpTups[i]->getnnz()>0)
105  {
106  estnnz += ArrSpTups[i]->getnnz();
107  heap[hsize++] = std::make_tuple(std::get<0>(ArrSpTups[i]->tuples[0]), std::get<1>(ArrSpTups[i]->tuples[0]), i);
108  }
109 
110  }
111  std::make_heap(heap.data(), heap.data()+hsize, std::not2(heapcomp));
112  IT cnz = 0;
113 
114  while(hsize > 0)
115  {
116  std::pop_heap(heap.data(), heap.data() + hsize, std::not2(heapcomp)); // result is stored in heap[hsize-1]
117  int source = std::get<2>(heap[hsize-1]);
118 
119  if( (cnz != 0) &&
120  ((std::get<0>(ntuples[cnz-1]) == std::get<0>(heap[hsize-1])) && (std::get<1>(ntuples[cnz-1]) == std::get<1>(heap[hsize-1]))) )
121  {
122  std::get<2>(ntuples[cnz-1]) = SR::add(std::get<2>(ntuples[cnz-1]), ArrSpTups[source]->numvalue(curptr[source]++));
123  }
124  else
125  {
126  ntuples[cnz++] = ArrSpTups[source]->tuples[curptr[source]++];
127  }
128 
129  if(curptr[source] != ArrSpTups[source]->getnnz()) // That array has not been depleted
130  {
131  heap[hsize-1] = std::make_tuple(std::get<0>(ArrSpTups[source]->tuples[curptr[source]]),
132  std::get<1>(ArrSpTups[source]->tuples[curptr[source]]), source);
133  std::push_heap(heap.data(), heap.data()+hsize, std::not2(heapcomp));
134  }
135  else
136  {
137  --hsize;
138  }
139  }
140 }
141 
142 
143 
144 // Performs a balanced merge of the array of SpTuples
145 // Assumes the input parameters are already column sorted
146 template<class SR, class IT, class NT>
147 SpTuples<IT, NT>* MultiwayMerge( std::vector<SpTuples<IT,NT> *> & ArrSpTups, IT mdim = 0, IT ndim = 0, bool delarrs = false )
148 {
149 
150  int nlists = ArrSpTups.size();
151  if(nlists == 0)
152  {
153  return new SpTuples<IT,NT>(0, mdim, ndim); //empty mxn SpTuples
154  }
155  if(nlists == 1)
156  {
157  if(delarrs) // steal data from input, and don't delete input
158  {
159  return ArrSpTups[0];
160  }
161  else // copy input to output
162  {
163  std::tuple<IT, IT, NT>* mergeTups = static_cast<std::tuple<IT, IT, NT>*>
164  (::operator new (sizeof(std::tuple<IT, IT, NT>[ArrSpTups[0]->getnnz()])));
165 #ifdef THREADED
166 #pragma omp parallel for
167 #endif
168  for(int i=0; i<ArrSpTups[0]->getnnz(); i++)
169  mergeTups[i] = ArrSpTups[0]->tuples[i];
170 
171  return new SpTuples<IT,NT> (ArrSpTups[0]->getnnz(), mdim, ndim, mergeTups, true);
172  }
173  }
174 
175  // ---- check correctness of input dimensions ------
176  for(int i=0; i< nlists; ++i)
177  {
178  if((mdim != ArrSpTups[i]->getnrow()) || ndim != ArrSpTups[i]->getncol())
179  {
180  std::cerr << "Dimensions of SpTuples do not match on multiwayMerge()" << std::endl;
181  return new SpTuples<IT,NT>(0,0,0);
182  }
183  }
184 
185  int nthreads = 1;
186 #ifdef THREADED
187 #pragma omp parallel
188  {
189  nthreads = omp_get_num_threads();
190  }
191 #endif
192  int nsplits = 4*nthreads; // oversplit for load balance
193  nsplits = std::min(nsplits, (int)ndim); // we cannot split a column
194  std::vector< std::vector<IT> > colPtrs;
195  for(int i=0; i< nlists; i++)
196  {
197  colPtrs.push_back(findColSplitters<IT>(ArrSpTups[i], nsplits)); // in parallel
198  }
199 
200  std::vector<IT> mergedNnzPerSplit(nsplits);
201  std::vector<IT> inputNnzPerSplit(nsplits);
202 // ------ estimate memory requirement after merge in each split ------
203 #ifdef THREADED
204 #pragma omp parallel for schedule(dynamic)
205 #endif
206  for(int i=0; i< nsplits; i++) // for each part
207  {
208  std::vector<SpTuples<IT,NT> *> listSplitTups(nlists);
209  IT t = static_cast<IT>(0);
210  for(int j=0; j< nlists; ++j)
211  {
212  IT curnnz= colPtrs[j][i+1] - colPtrs[j][i];
213  listSplitTups[j] = new SpTuples<IT, NT> (curnnz, mdim, ndim, ArrSpTups[j]->tuples + colPtrs[j][i], true);
214  t += colPtrs[j][i+1] - colPtrs[j][i];
215  }
216  mergedNnzPerSplit[i] = SerialMergeNNZ(listSplitTups);
217  inputNnzPerSplit[i] = t;
218  }
219 
220  std::vector<IT> mdisp(nsplits+1,0);
221  for(int i=0; i<nsplits; ++i)
222  mdisp[i+1] = mdisp[i] + mergedNnzPerSplit[i];
223  IT mergedNnzAll = mdisp[nsplits];
224 
225 
226 #ifdef COMBBLAS_DEBUG
227  IT inputNnzAll = std::accumulate(inputNnzPerSplit.begin(), inputNnzPerSplit.end(), static_cast<IT>(0));
228  double ratio = inputNnzAll / (double) mergedNnzAll;
229  std::ostringstream outs;
230  outs << "Multiwaymerge: inputNnz/mergedNnz = " << ratio << std::endl;
231  SpParHelper::Print(outs.str());
232 #endif
233 
234 
235  // ------ allocate memory outside of the parallel region ------
236  std::tuple<IT, IT, NT> * mergeBuf = static_cast<std::tuple<IT, IT, NT>*> (::operator new (sizeof(std::tuple<IT, IT, NT>[mergedNnzAll])));
237 
238  // ------ perform merge in parallel ------
239 #ifdef THREADED
240 #pragma omp parallel for schedule(dynamic)
241 #endif
242  for(int i=0; i< nsplits; i++) // serially merge part by part
243  {
244  std::vector<SpTuples<IT,NT> *> listSplitTups(nlists);
245  for(int j=0; j< nlists; ++j)
246  {
247  IT curnnz= colPtrs[j][i+1] - colPtrs[j][i];
248  listSplitTups[j] = new SpTuples<IT, NT> (curnnz, mdim, ndim, ArrSpTups[j]->tuples + colPtrs[j][i], true);
249  }
250  SerialMerge<SR>(listSplitTups, mergeBuf + mdisp[i]);
251  }
252 
253  for(int i=0; i< nlists; i++)
254  {
255  if(delarrs)
256  delete ArrSpTups[i]; // May be expensive for large local matrices
257  }
258  return new SpTuples<IT, NT> (mergedNnzAll, mdim, ndim, mergeBuf, true, true);
259 }
260 
261 }
std::tuple< IT, IT, NT > * tuples
Definition: SpTuples.h:272
IT SerialMergeNNZ(const std::vector< SpTuples< IT, NT > *> &ArrSpTups)
Definition: MultiwayMerge.h:40
static void Print(const std::string &s)
int64_t getnnz() const
Definition: SpTuples.h:269
std::vector< RT > findColSplitters(SpTuples< IT, NT > *&spTuples, int nsplits)
Definition: MultiwayMerge.h:17
void SerialMerge(const std::vector< SpTuples< IT, NT > *> &ArrSpTups, std::tuple< IT, IT, NT > *ntuples)
Definition: MultiwayMerge.h:94
IT getncol() const
Definition: SpTuples.h:268
Definition: CCGrid.h:4
SpTuples< IT, NT > * MultiwayMerge(std::vector< SpTuples< IT, NT > *> &ArrSpTups, IT mdim=0, IT ndim=0, bool delarrs=false)