COMBINATORIAL_BLAS  1.6
psort_samplesort.h
Go to the documentation of this file.
1 /*
2 Copyright (c) 2009, David Cheng, Viral B. Shah.
3 
4 Permission is hereby granted, free of charge, to any person obtaining a copy
5 of this software and associated documentation files (the "Software"), to deal
6 in the Software without restriction, including without limitation the rights
7 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 copies of the Software, and to permit persons to whom the Software is
9 furnished to do so, subject to the following conditions:
10 
11 The above copyright notice and this permission notice shall be included in
12 all copies or substantial portions of the Software.
13 
14 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
20 THE SOFTWARE.
21 */
22 
23 #ifndef PSORT_SAMPLE_H
24 
25 #define PSORT_SAMPLE_H
26 
27 #include "psort_util.h"
28 
29 #define MPI_PSORT_TAG 31337
30 
31 namespace vpsort {
32  using namespace std;
33 
34  // sample n_out elements from array in _without_ replacement
35  // done with two sweeps of size n_out
36  // in is touched, but restored
37  template<typename _RandomAccessIter, typename _ValueType, typename _Distance>
38  static void sample_splitters(_RandomAccessIter in, _Distance n_in,
39  _ValueType *out, long n_out) {
40  if (n_out >= n_in) {
41  // TODO
42  } else {
43  _Distance *picks = new _Distance[n_out];
44 
45  for (int i=0; i<n_out; ++i) {
46  picks[i] = static_cast<_Distance> ((n_in - i) * drand48());
47  out[i] = *(in + picks[i]);
48  *(in + picks[i]) = *(in + (n_out - i - 1));
49  }
50 
51  for (int i=n_out-1; i>=0; --i) {
52  *(in + (n_out - i - 1)) = *(in + picks[i]);
53  *(in + picks[i]) = out[i];
54  }
55 
56  delete [] picks;
57  }
58  }
59 
60  static inline void set_splitters(long *part_gsize, long n_parts, int nproc, long *dist, long *out) {
61  // impl 1: get all but last about correct size; last might pick up slack
62  long ind = 0, cum = 0;
63  for (long i=0; i<n_parts; ++i) {
64  cum += part_gsize[i];
65  if (cum > dist[ind]) {
66  out[ind] = i;
67  if (cum - dist[ind] > dist[ind] - cum + part_gsize[i]) {
68  --out[ind];
69  cum = part_gsize[i];
70  } else {
71  cum = 0;
72  }
73  if (++ind >= nproc - 1) break;
74  }
75  }
76  }
77 
78  template<typename _ValueType>
79  static void redist (int *from_dist, _ValueType *current,
80  int *to_dist, _ValueType *final,
81  int rank, int nproc,
82  MPI_Datatype &MPI_valueType, MPI_Comm comm)
83  {
84  int ** starch = new int*[nproc];
85 
86  for (int i = 0; i < nproc; ++i) {
87  starch[i] = new int[nproc];
88  for (int j = 0; j < nproc; ++j) {
89  starch[i][j] = 0;
90  }
91  }
92 
93  int i = 0, j=0;
94  int from_delta = from_dist[0], to_delta = to_dist[0];
95 
96  while ((i < nproc) && (j < nproc)) {
97 
98  int diff = from_delta - to_delta;
99 
100  if (diff == 0) {
101  starch[i][j] = from_delta;
102  ++i;
103  ++j;
104  from_delta = from_dist[i];
105  to_delta = to_dist[j];
106  } else if (diff > 0) {
107  starch[i][j] = to_delta;
108  ++j;
109  from_delta -= to_delta;
110  to_delta = to_dist[j];
111  } else {
112  starch[i][j] = from_delta;
113  ++i;
114  to_delta -= from_delta;
115  from_delta = from_dist[i];
116  }
117  }
118 
119  MPI_Request send_req[nproc], recv_req[nproc];
120  MPI_Status status;
121 
122  int sendl = 0, recvl = 0;
123 
124  for (int p = 0; p <= rank; ++p) {
125 
126  int torecv = starch[p][rank];
127  if (torecv) {
128  MPI_Irecv (&final[recvl], torecv,
129  MPI_valueType, p, MPI_PSORT_TAG,
130  comm, &recv_req[p]);
131  recvl += torecv;
132  }
133 
134  int tosend = starch[rank][p];
135  if (tosend) {
136  MPI_Isend (&current[sendl], tosend,
137  MPI_valueType, p, MPI_PSORT_TAG,
138  comm, &send_req[p]);
139  sendl += tosend;
140  }
141 
142  }
143 
144  int sendr = 0, recvr = 0;
145 
146  for (int p = nproc-1; p > rank; --p) {
147 
148  int torecv = starch[p][rank];
149  if (torecv) {
150  recvr += torecv;
151  MPI_Irecv (&final[to_dist[rank]-recvr], torecv,
152  MPI_valueType, p, MPI_PSORT_TAG,
153  comm, &recv_req[p]);
154  }
155 
156  int tosend = starch[rank][p];
157  if (tosend) {
158  sendr += tosend;
159  MPI_Isend (&current[from_dist[rank]-sendr], tosend,
160  MPI_valueType, p, MPI_PSORT_TAG,
161  comm, &send_req[p]);
162  }
163  }
164 
165  for (int p = 0; p < nproc; ++p) {
166  if ( starch[rank][p] )
167  MPI_Wait(&send_req[p], &status);
168  if ( starch[p][rank] )
169  MPI_Wait(&recv_req[p], &status);
170  }
171 
172  for (int i = 0; i < nproc; ++i)
173  delete [] starch[i];
174  delete [] starch;
175 
176  }
177 
178 
179  template<typename _RandomAccessIter, typename _Compare,
180  typename _SeqSortType>
181  void parallel_samplesort(_RandomAccessIter first, _RandomAccessIter last,
182  _Compare comp, long *dist,
183  SeqSort<_SeqSortType> &mysort,
184  long s, long k,
185  MPI_Comm comm) {
186 
187  typedef typename iterator_traits<_RandomAccessIter>::value_type _ValueType;
188  typedef typename iterator_traits<_RandomAccessIter>::difference_type _Distance;
189 
190  int nproc, rank;
191  MPI_Comm_size(comm, &nproc);
192  MPI_Comm_rank(comm, &rank);
193 
194  MPI_Datatype MPI_valueType, MPI_distanceType;
195  MPI_Type_contiguous (sizeof(_ValueType), MPI_CHAR, &MPI_valueType);
196  MPI_Type_commit (&MPI_valueType);
197  MPI_Type_contiguous (sizeof(_Distance), MPI_CHAR, &MPI_distanceType);
198  MPI_Type_commit (&MPI_distanceType);
199 
200  _Distance n_loc = last - first;
201 
202  progress (rank, 0, "Sample splitters", comm);
203 
204  // randomly pick s*k sample splitters
205  long n_samp = s*k;
206  _ValueType *s_splitters = new _ValueType[n_samp];
207  sample_splitters(first, n_loc, s_splitters, n_samp);
208 
209  // send the sample splitters to the root
210  long n_parts = nproc * k - 1;
211  _ValueType *p_splitters = new _ValueType[n_parts];
212  if (rank != 0) {
213  MPI_Gather(s_splitters, n_samp, MPI_valueType,
214  NULL, 0, MPI_valueType, 0, comm);
215  } else {
216  _ValueType *gath_splitters = new _ValueType[nproc * n_samp];
217  MPI_Gather(s_splitters, n_samp, MPI_valueType,
218  gath_splitters, n_samp, MPI_valueType,
219  0, comm);
220 
221  // root sorts sample splitters serially
222  sort(gath_splitters, gath_splitters + (nproc * n_samp), comp);
223 
224  // root picks pk-1 splitters, broadcasts
225  double stride = (double) (nproc * n_samp) / (nproc * k);
226  for (int i=0; i<n_parts; ++i) {
227  p_splitters[i] = gath_splitters[(int) (stride * (double) (i + 1))];
228  }
229  delete [] gath_splitters;
230  }
231 
232  MPI_Bcast(p_splitters, n_parts, MPI_valueType, 0, comm);
233 
234  delete [] s_splitters;
235 
236  progress (rank, 1, "Partition", comm);
237 
238  // apply the splitters
239  // impl1: binary search each element over the splitters
240  // TODO: impl2: recurse, partitioning dfs order over splitters
241  _Distance *part_ind = new _Distance[n_loc];
242  long *part_lsize = new long[n_parts + 1];
243  for (long i=0; i<=n_parts; ++i) part_lsize[i] = 0;
244 
245  for (long i=0; i<n_loc; ++i) {
246  // TODO: load balance when splitters have same value
247  part_ind[i] = lower_bound(p_splitters, p_splitters + n_parts, *(first + i))
248  - p_splitters;
249  ++part_lsize[part_ind[i]];
250  }
251 
252  delete [] p_splitters;
253 
254  long *boundaries = new long[nproc - 1];
255  if (rank != 0) {
256  MPI_Reduce(part_lsize, NULL, n_parts + 1, MPI_LONG, MPI_SUM, 0, comm);
257  } else {
258  // determine global partition sizes, starch-minimizing boundaries
259  long *part_gsize = new long[n_parts + 1];
260  MPI_Reduce(part_lsize, part_gsize, n_parts + 1, MPI_LONG, MPI_SUM, 0, comm);
261  set_splitters (part_gsize, n_parts + 1, nproc, dist, boundaries);
262  delete [] part_gsize;
263  }
264  MPI_Bcast(boundaries, nproc - 1, MPI_LONG, 0, comm);
265 
266  // send data according to boundaries
267  // part_proc[i] says which processor partition i goes to
268  long *part_proc = new long[n_parts + 1];
269  int out_counts[nproc];
270  for (int i=0; i<nproc; ++i) out_counts[i] = 0;
271  long cur = 0;
272  for (long i=0; i<=n_parts; ++i) {
273  if (cur < nproc - 1 && i > boundaries[cur]) {
274  ++cur;
275  }
276  part_proc[i] = cur;
277  out_counts[cur] += part_lsize[i];
278  }
279 
280  delete [] boundaries;
281  delete [] part_lsize;
282 
283  long curs[nproc];
284  _ValueType **s_bufs = new _ValueType*[nproc];
285  for (int i=0; i<nproc; ++i) {
286  curs[i] = 0;
287  s_bufs[i] = new _ValueType[out_counts[i]];
288  }
289  for (int i=0; i<n_loc; ++i) {
290  int dest = part_proc[part_ind[i]];
291  s_bufs[dest][curs[dest]++] = *(first + i);
292  }
293 
294  delete [] part_ind;
295  delete [] part_proc;
296 
297  progress (rank, 2, "Alltoall", comm);
298 
299  // tell each processor how many elements to expect from m
300  int in_counts[nproc];
301  MPI_Alltoall(out_counts, 1, MPI_INT, in_counts, 1, MPI_INT, comm);
302 
303  int offsets[nproc + 1];
304  offsets[0] = 0;
305  for (int i=1; i<=nproc; ++i) {
306  offsets[i] = offsets[i-1] + in_counts[i-1];
307  }
308  int n_loct = offsets[nproc];
309 
310  MPI_Request send_reqs[nproc];
311  MPI_Request recv_reqs[nproc];
312  _ValueType *trans_data = new _ValueType[n_loct];
313 
314  for (int i=0; i<nproc; ++i) {
315  MPI_Irecv(&trans_data[offsets[i]], in_counts[i],
316  MPI_valueType, i, MPI_PSORT_TAG, comm, &recv_reqs[i]);
317  }
318  for (int i=1; i<=nproc; ++i) {
319  int dest = (rank + i) % nproc;
320  MPI_Isend(s_bufs[dest], out_counts[dest], MPI_valueType,
321  dest, MPI_PSORT_TAG, comm, &send_reqs[dest]);
322  }
323 
324  MPI_Waitall(nproc, send_reqs, MPI_STATUS_IGNORE);
325  for (int i=0; i < nproc; ++i) {
326  delete [] s_bufs[i];
327  }
328  delete [] s_bufs;
329 
330  MPI_Waitall (nproc, recv_reqs, MPI_STATUS_IGNORE);
331 
332  progress (rank, 3, "Sequential sort", comm);
333 
334  mysort.seqsort (trans_data, trans_data + n_loct, comp);
335 
336  progress (rank, 4, "Adjust boundaries", comm);
337 
338  // starch
339  int trans_dist[nproc], dist_out[nproc];
340  MPI_Allgather(&n_loct, 1, MPI_INT, trans_dist, 1, MPI_INT, comm);
341  for (int i=0; i<nproc; ++i) dist_out[i] = dist[i];
342  redist(trans_dist, trans_data, dist_out, first, rank, nproc, MPI_valueType, comm);
343  delete [] trans_data;
344 
345  progress (rank, 5, "Finish", comm);
346 
347  MPI_Type_free (&MPI_valueType);
348  }
349 
350  template<typename _RandomAccessIter>
351  void parallel_samplesort(_RandomAccessIter first, _RandomAccessIter last,
352  long *dist, MPI_Comm comm) {
353 
354  typedef typename iterator_traits<_RandomAccessIter>::value_type _ValueType;
355 
356  STLSort stl_sort;
357 
358  int nproc;
359  MPI_Comm_size (comm, &nproc);
360  long s = nproc, k = nproc;
361  parallel_samplesort (first, last, less<_ValueType>(), dist, stl_sort, s, k, comm);
362 
363  }
364 
365 } /* namespace vpsort */
366 
367 #endif /* PSORT_SAMPLE_H */
Definition: psort.h:28
void seqsort(_ValueType *first, _ValueType *last, _Compare comp)
Definition: psort_seqsort.h:33
void parallel_samplesort(_RandomAccessIter first, _RandomAccessIter last, _Compare comp, long *dist, SeqSort< _SeqSortType > &mysort, long s, long k, MPI_Comm comm)
#define MPI_PSORT_TAG
int rank