COMBINATORIAL_BLAS  1.6
OldReductions.h
Go to the documentation of this file.
1 /*
2  * This file contains current obsolete functions
3  * Kept for use during future needs (if any)
4  */
5 
6 #ifndef _OLD_REDUCTIONS_H_
7 #define _OLD_REDUCTIONS_H_
8 
9 // localmerged is invalidated in all processes after this redursive function
10 // globalmerged is valid only in fibWorld root (0) upon exit
11 template <typename SR>
12 void ParallelReduce(MPI_Comm & fibWorld, tuple<int32_t,int32_t,double> * & localmerged,
13  MPI_Datatype & MPI_triple, tuple<int32_t,int32_t,double> * & globalmerged,
14  int inputnnz, int & outputnnz)
15 {
16  int fprocs, frank;
17  MPI_Comm_size(fibWorld,&fprocs);
18  MPI_Comm_rank(fibWorld,&frank);
19  if(fprocs == 1)
20  {
21  globalmerged = localmerged;
22  localmerged = NULL;
23  outputnnz = inputnnz;
24  return;
25  }
26  else if(fprocs % 2 != 0)
27  {
28  SpParHelper::Print("Not even sized neighbors, can't merge\n");
29  return;
30  }
31 
32  int color = frank % 2;
33  int key = frank / 2;
34  MPI_Comm halfWorld;
35  MPI_Comm_split(fibWorld, color, key, &halfWorld); // odd-even split
36 
37  if(color == 0) // even numbered - received
38  {
39  MPI_Status status;
40  int hissize = 0;
41 
42  MPI_Recv(&hissize, 1, MPI_INT, frank+1, 1, fibWorld, &status);
43 
44  tuple<int32_t,int32_t,double> * recvdata = new tuple<int32_t,int32_t,double>[hissize];
45 
46  double reduce_beg = MPI_Wtime();
47  MPI_Recv(recvdata, hissize, MPI_triple, frank+1, 1, fibWorld, &status);
48  comm_reduce += (MPI_Wtime() - reduce_beg);
49 
50 
51  int i=0, j=0, k = 0;
52  tuple<int32_t,int32_t,double> * mergeddata = new tuple<int32_t,int32_t,double>[inputnnz + hissize];
53 
54 
55  while(i < inputnnz && j < hissize)
56  {
57  // both data are in ascending order w.r.t. first columns then rows
58  if(get<1>(localmerged[i]) > get<1>(recvdata[j]))
59  {
60  mergeddata[k] = recvdata[j++];
61  }
62  else if(get<1>(localmerged[i]) < get<1>(recvdata[j]))
63  {
64  mergeddata[k] = localmerged[i++];
65  }
66  else // columns are equal
67  {
68  if(get<0>(localmerged[i]) > get<0>(recvdata[j]))
69  {
70  mergeddata[k] = recvdata[j++];
71  }
72  else if(get<0>(localmerged[i]) < get<0>(recvdata[j]))
73  {
74  mergeddata[k] = localmerged[i++];
75  }
76  else // everything equal
77  {
78  mergeddata[k] = make_tuple(get<0>(localmerged[i]), get<1>(recvdata[j]), SR::add(get<2>(recvdata[j]), get<2>(localmerged[i])));
79  ++i; ++j;
80  }
81  }
82  ++k; // in any case, one more entry added to result
83 
84  }
85 
86  delete [] recvdata;
87  delete [] localmerged;
88  localmerged = NULL;
89  return ParallelReduce<SR>(halfWorld, mergeddata, MPI_triple, globalmerged, k, outputnnz); // k is the new input nnz
90 
91  }
92  else // odd numbered - sender (does not recurse further)
93  {
94  MPI_Send(&inputnnz, 1, MPI_INT, frank-1, 1, fibWorld);
95  MPI_Send(localmerged, inputnnz, MPI_triple, frank-1, 1, fibWorld);
96  delete [] localmerged;
97  localmerged = NULL;
98  }
99 }
100 
101 
102 
103 // localmerged is invalidated in all processes after this redursive function
104 // globalmerged is valid on all processes upon exit
105 template <typename SR, typename IT, typename NT>
106 void ParallelReduce_Alltoall(MPI_Comm & fibWorld, tuple<IT,IT,NT> * & localmerged,
107  MPI_Datatype & MPI_triple, tuple<IT,IT,NT> * & globalmerged,
108  IT inputnnz, IT & outputnnz, IT ncols)
109 {
110  int fprocs;
111  MPI_Comm_size(fibWorld,&fprocs);
112  if(fprocs == 1)
113  {
114  globalmerged = localmerged;
115  localmerged = NULL;
116  outputnnz = inputnnz;
117  return;
118  }
119  int send_sizes[fprocs];
120  int recv_sizes[fprocs];
121  // this could be made more efficient, either by a binary search or by guessing then correcting
122  //MPI_Barrier(MPI_COMM_WORLD);
123  double loc_beg1 = MPI_Wtime();
124  int target = 0;
125  int cols_per_proc = (ncols + fprocs - 1) / fprocs;
126  int split_point = cols_per_proc;
127  int send_offsets[fprocs+1];
128  send_offsets[0] = 0;
129 
130  for( int i = 0; i < inputnnz; i++ )
131  {
132  if( std::get<1>(localmerged[i]) >= split_point )
133  {
134  send_offsets[++target] = i;
135  split_point += cols_per_proc;
136  }
137  }
138  while(target < fprocs) send_offsets[++target] = inputnnz;
139  for(int i=0; i<fprocs; i++)
140  {
141  send_sizes[i] = send_offsets[i+1] - send_offsets[i];
142  }
143  /*
144  for( int i = 0; i < inputnnz; i++ ) {
145  if( std::get<1>(localmerged[i]) >= split_point ) {
146  if( target == 0 )
147  send_sizes[target] = i;
148  else {
149  send_sizes[target] = i-send_offsets[target];
150  }
151  send_offsets[target+1] = i;
152  target++;
153  split_point += cols_per_proc;
154  }
155  }
156  send_sizes[fprocs-1] = inputnnz - send_offsets[fprocs-1];
157  */
158  //MPI_Barrier(MPI_COMM_WORLD);
159  //comp_reduce += (MPI_Wtime() - loc_beg1);
160 
161  double reduce_beg = MPI_Wtime();
162  MPI_Alltoall( send_sizes, 1, MPI_INT, recv_sizes, 1, MPI_INT,fibWorld);
163  //MPI_Barrier(MPI_COMM_WORLD);
164  comm_reduce += (MPI_Wtime() - reduce_beg);
165 
166  int recv_count = 0;
167  for( int i = 0; i < fprocs; i++ )
168  recv_count += recv_sizes[i];
169  tuple<IT,IT,NT> *recvbuf = new tuple<IT,IT,NT>[recv_count];
170 
171  int recv_offsets[fprocs];
172  recv_offsets[0] = 0;
173  for( int i = 1; i < fprocs; i++ ) {
174  recv_offsets[i] = recv_offsets[i-1]+recv_sizes[i-1];
175  }
176  //MPI_Barrier(MPI_COMM_WORLD);
177  reduce_beg = MPI_Wtime();
178  MPI_Alltoallv( localmerged, send_sizes, send_offsets, MPI_triple, recvbuf, recv_sizes, recv_offsets, MPI_triple, fibWorld);
179  //MPI_Barrier(MPI_COMM_WORLD);
180  comm_reduce += (MPI_Wtime() - reduce_beg);
181  loc_beg1 = MPI_Wtime();
182 
183  int pos[fprocs];
184  for( int i = 0; i < fprocs; i++ )
185  pos[i] = recv_offsets[i];
186  outputnnz = 0;
187  globalmerged = new tuple<IT,IT,NT>[recv_count];
188 
189  while( true ) {
190  // find the next entry
191  int nexti = -1;
192  int r = INT_MAX;
193  int c = INT_MAX;
194  for( int i = 0; i < fprocs; i++ ) {
195  if( pos[i] < recv_offsets[i]+recv_sizes[i] ) {
196  if( std::get<1>(recvbuf[pos[i]]) < c ) {
197  c = std::get<1>(recvbuf[pos[i]]);
198  r = std::get<0>(recvbuf[pos[i]]);
199  nexti = i;
200  } else if( (std::get<1>(recvbuf[pos[i]]) == c) && (std::get<0>(recvbuf[pos[i]]) < r) ) {
201  r = std::get<0>(recvbuf[pos[i]]);
202  nexti = i;
203  }
204  }
205  }
206  if( nexti == -1 ) // merge is finished
207  break;
208 
209  if( outputnnz > 0 && std::get<0>(globalmerged[outputnnz-1]) == std::get<0>(recvbuf[pos[nexti]]) && std::get<1>(globalmerged[outputnnz-1]) == std::get<1>(recvbuf[pos[nexti]]) )
210  // add this one to the previous
211  std::get<2>(globalmerged[outputnnz-1]) = SR::add( std::get<2>(globalmerged[outputnnz-1]), std::get<2>(recvbuf[pos[nexti]]) );
212  else {
213  // make this the next entry in the output
214  globalmerged[outputnnz] = recvbuf[pos[nexti]];
215  outputnnz++;
216  }
217 
218  pos[nexti]++; // it was a bug since it was placed before the if statement
219  }
220  //MPI_Barrier(MPI_COMM_WORLD);
221  //comp_reduce += (MPI_Wtime() - loc_beg1);
222 
223  delete [] recvbuf;
224  delete [] localmerged;
225  localmerged = NULL;
226 }
227 
228 #endif
229 
230 
void ParallelReduce_Alltoall(MPI_Comm &fibWorld, tuple< IT, IT, NT > *&localmerged, MPI_Datatype &MPI_triple, tuple< IT, IT, NT > *&globalmerged, IT inputnnz, IT &outputnnz, IT ncols)
void ParallelReduce(MPI_Comm &fibWorld, tuple< int32_t, int32_t, double > *&localmerged, MPI_Datatype &MPI_triple, tuple< int32_t, int32_t, double > *&globalmerged, int inputnnz, int &outputnnz)
Definition: OldReductions.h:12
double comm_reduce
Definition: mpipspgemm.cpp:23