23 #ifndef PSORT_SAMPLE_H 25 #define PSORT_SAMPLE_H 29 #define MPI_PSORT_TAG 31337 37 template<
typename _RandomAccessIter,
typename _ValueType,
typename _Distance>
38 static void sample_splitters(_RandomAccessIter in, _Distance n_in,
39 _ValueType *out,
long n_out) {
43 _Distance *picks =
new _Distance[n_out];
45 for (
int i=0; i<n_out; ++i) {
46 picks[i] =
static_cast<_Distance
> ((n_in - i) * drand48());
47 out[i] = *(in + picks[i]);
48 *(in + picks[i]) = *(in + (n_out - i - 1));
51 for (
int i=n_out-1; i>=0; --i) {
52 *(in + (n_out - i - 1)) = *(in + picks[i]);
53 *(in + picks[i]) = out[i];
60 static inline void set_splitters(
long *part_gsize,
long n_parts,
int nproc,
long *dist,
long *out) {
62 long ind = 0, cum = 0;
63 for (
long i=0; i<n_parts; ++i) {
65 if (cum > dist[ind]) {
67 if (cum - dist[ind] > dist[ind] - cum + part_gsize[i]) {
73 if (++ind >= nproc - 1)
break;
78 template<
typename _ValueType>
79 static void redist (
int *from_dist, _ValueType *current,
80 int *to_dist, _ValueType *
final,
82 MPI_Datatype &MPI_valueType, MPI_Comm comm)
84 int ** starch =
new int*[nproc];
86 for (
int i = 0; i < nproc; ++i) {
87 starch[i] =
new int[nproc];
88 for (
int j = 0; j < nproc; ++j) {
94 int from_delta = from_dist[0], to_delta = to_dist[0];
96 while ((i < nproc) && (j < nproc)) {
98 int diff = from_delta - to_delta;
101 starch[i][j] = from_delta;
104 from_delta = from_dist[i];
105 to_delta = to_dist[j];
106 }
else if (diff > 0) {
107 starch[i][j] = to_delta;
109 from_delta -= to_delta;
110 to_delta = to_dist[j];
112 starch[i][j] = from_delta;
114 to_delta -= from_delta;
115 from_delta = from_dist[i];
119 MPI_Request send_req[nproc], recv_req[nproc];
122 int sendl = 0, recvl = 0;
124 for (
int p = 0; p <=
rank; ++p) {
126 int torecv = starch[p][
rank];
128 MPI_Irecv (&
final[recvl], torecv,
134 int tosend = starch[
rank][p];
136 MPI_Isend (¤t[sendl], tosend,
144 int sendr = 0, recvr = 0;
146 for (
int p = nproc-1; p >
rank; --p) {
148 int torecv = starch[p][
rank];
151 MPI_Irecv (&
final[to_dist[rank]-recvr], torecv,
156 int tosend = starch[
rank][p];
159 MPI_Isend (¤t[from_dist[rank]-sendr], tosend,
165 for (
int p = 0; p < nproc; ++p) {
166 if ( starch[rank][p] )
167 MPI_Wait(&send_req[p], &status);
168 if ( starch[p][rank] )
169 MPI_Wait(&recv_req[p], &status);
172 for (
int i = 0; i < nproc; ++i)
179 template<
typename _RandomAccessIter,
typename _Compare,
180 typename _SeqSortType>
182 _Compare comp,
long *dist,
187 typedef typename iterator_traits<_RandomAccessIter>::value_type _ValueType;
188 typedef typename iterator_traits<_RandomAccessIter>::difference_type _Distance;
191 MPI_Comm_size(comm, &nproc);
192 MPI_Comm_rank(comm, &rank);
194 MPI_Datatype MPI_valueType, MPI_distanceType;
195 MPI_Type_contiguous (
sizeof(_ValueType), MPI_CHAR, &MPI_valueType);
196 MPI_Type_commit (&MPI_valueType);
197 MPI_Type_contiguous (
sizeof(_Distance), MPI_CHAR, &MPI_distanceType);
198 MPI_Type_commit (&MPI_distanceType);
200 _Distance n_loc = last - first;
202 progress (rank, 0,
"Sample splitters", comm);
206 _ValueType *s_splitters =
new _ValueType[n_samp];
207 sample_splitters(first, n_loc, s_splitters, n_samp);
210 long n_parts = nproc * k - 1;
211 _ValueType *p_splitters =
new _ValueType[n_parts];
213 MPI_Gather(s_splitters, n_samp, MPI_valueType,
214 NULL, 0, MPI_valueType, 0, comm);
216 _ValueType *gath_splitters =
new _ValueType[nproc * n_samp];
217 MPI_Gather(s_splitters, n_samp, MPI_valueType,
218 gath_splitters, n_samp, MPI_valueType,
222 sort(gath_splitters, gath_splitters + (nproc * n_samp), comp);
225 double stride = (double) (nproc * n_samp) / (nproc * k);
226 for (
int i=0; i<n_parts; ++i) {
227 p_splitters[i] = gath_splitters[(int) (stride * (
double) (i + 1))];
229 delete [] gath_splitters;
232 MPI_Bcast(p_splitters, n_parts, MPI_valueType, 0, comm);
234 delete [] s_splitters;
236 progress (rank, 1,
"Partition", comm);
241 _Distance *part_ind =
new _Distance[n_loc];
242 long *part_lsize =
new long[n_parts + 1];
243 for (
long i=0; i<=n_parts; ++i) part_lsize[i] = 0;
245 for (
long i=0; i<n_loc; ++i) {
247 part_ind[i] = lower_bound(p_splitters, p_splitters + n_parts, *(first + i))
249 ++part_lsize[part_ind[i]];
252 delete [] p_splitters;
254 long *boundaries =
new long[nproc - 1];
256 MPI_Reduce(part_lsize, NULL, n_parts + 1, MPI_LONG, MPI_SUM, 0, comm);
259 long *part_gsize =
new long[n_parts + 1];
260 MPI_Reduce(part_lsize, part_gsize, n_parts + 1, MPI_LONG, MPI_SUM, 0, comm);
261 set_splitters (part_gsize, n_parts + 1, nproc, dist, boundaries);
262 delete [] part_gsize;
264 MPI_Bcast(boundaries, nproc - 1, MPI_LONG, 0, comm);
268 long *part_proc =
new long[n_parts + 1];
269 int out_counts[nproc];
270 for (
int i=0; i<nproc; ++i) out_counts[i] = 0;
272 for (
long i=0; i<=n_parts; ++i) {
273 if (cur < nproc - 1 && i > boundaries[cur]) {
277 out_counts[cur] += part_lsize[i];
280 delete [] boundaries;
281 delete [] part_lsize;
284 _ValueType **s_bufs =
new _ValueType*[nproc];
285 for (
int i=0; i<nproc; ++i) {
287 s_bufs[i] =
new _ValueType[out_counts[i]];
289 for (
int i=0; i<n_loc; ++i) {
290 int dest = part_proc[part_ind[i]];
291 s_bufs[dest][curs[dest]++] = *(first + i);
297 progress (rank, 2,
"Alltoall", comm);
300 int in_counts[nproc];
301 MPI_Alltoall(out_counts, 1, MPI_INT, in_counts, 1, MPI_INT, comm);
303 int offsets[nproc + 1];
305 for (
int i=1; i<=nproc; ++i) {
306 offsets[i] = offsets[i-1] + in_counts[i-1];
308 int n_loct = offsets[nproc];
310 MPI_Request send_reqs[nproc];
311 MPI_Request recv_reqs[nproc];
312 _ValueType *trans_data =
new _ValueType[n_loct];
314 for (
int i=0; i<nproc; ++i) {
315 MPI_Irecv(&trans_data[offsets[i]], in_counts[i],
318 for (
int i=1; i<=nproc; ++i) {
319 int dest = (rank + i) % nproc;
320 MPI_Isend(s_bufs[dest], out_counts[dest], MPI_valueType,
324 MPI_Waitall(nproc, send_reqs, MPI_STATUS_IGNORE);
325 for (
int i=0; i < nproc; ++i) {
330 MPI_Waitall (nproc, recv_reqs, MPI_STATUS_IGNORE);
332 progress (rank, 3,
"Sequential sort", comm);
334 mysort.
seqsort (trans_data, trans_data + n_loct, comp);
336 progress (rank, 4,
"Adjust boundaries", comm);
339 int trans_dist[nproc], dist_out[nproc];
340 MPI_Allgather(&n_loct, 1, MPI_INT, trans_dist, 1, MPI_INT, comm);
341 for (
int i=0; i<nproc; ++i) dist_out[i] = dist[i];
342 redist(trans_dist, trans_data, dist_out, first, rank, nproc, MPI_valueType, comm);
343 delete [] trans_data;
345 progress (rank, 5,
"Finish", comm);
347 MPI_Type_free (&MPI_valueType);
350 template<
typename _RandomAccessIter>
352 long *dist, MPI_Comm comm) {
354 typedef typename iterator_traits<_RandomAccessIter>::value_type _ValueType;
359 MPI_Comm_size (comm, &nproc);
360 long s = nproc, k = nproc;
void seqsort(_ValueType *first, _ValueType *last, _Compare comp)
void parallel_samplesort(_RandomAccessIter first, _RandomAccessIter last, _Compare comp, long *dist, SeqSort< _SeqSortType > &mysort, long s, long k, MPI_Comm comm)