6 #ifndef _OLD_REDUCTIONS_H_ 7 #define _OLD_REDUCTIONS_H_ 11 template <
typename SR>
12 void ParallelReduce(MPI_Comm & fibWorld, tuple<int32_t,int32_t,double> * & localmerged,
13 MPI_Datatype & MPI_triple, tuple<int32_t,int32_t,double> * & globalmerged,
14 int inputnnz,
int & outputnnz)
17 MPI_Comm_size(fibWorld,&fprocs);
18 MPI_Comm_rank(fibWorld,&frank);
21 globalmerged = localmerged;
26 else if(fprocs % 2 != 0)
28 SpParHelper::Print(
"Not even sized neighbors, can't merge\n");
32 int color = frank % 2;
35 MPI_Comm_split(fibWorld, color, key, &halfWorld);
42 MPI_Recv(&hissize, 1, MPI_INT, frank+1, 1, fibWorld, &status);
44 tuple<int32_t,int32_t,double> * recvdata =
new tuple<int32_t,int32_t,double>[hissize];
46 double reduce_beg = MPI_Wtime();
47 MPI_Recv(recvdata, hissize, MPI_triple, frank+1, 1, fibWorld, &status);
52 tuple<int32_t,int32_t,double> * mergeddata =
new tuple<int32_t,int32_t,double>[inputnnz + hissize];
55 while(i < inputnnz && j < hissize)
58 if(get<1>(localmerged[i]) > get<1>(recvdata[j]))
60 mergeddata[k] = recvdata[j++];
62 else if(get<1>(localmerged[i]) < get<1>(recvdata[j]))
64 mergeddata[k] = localmerged[i++];
68 if(get<0>(localmerged[i]) > get<0>(recvdata[j]))
70 mergeddata[k] = recvdata[j++];
72 else if(get<0>(localmerged[i]) < get<0>(recvdata[j]))
74 mergeddata[k] = localmerged[i++];
78 mergeddata[k] = make_tuple(get<0>(localmerged[i]), get<1>(recvdata[j]), SR::add(get<2>(recvdata[j]), get<2>(localmerged[i])));
87 delete [] localmerged;
89 return ParallelReduce<SR>(halfWorld, mergeddata, MPI_triple, globalmerged, k, outputnnz);
94 MPI_Send(&inputnnz, 1, MPI_INT, frank-1, 1, fibWorld);
95 MPI_Send(localmerged, inputnnz, MPI_triple, frank-1, 1, fibWorld);
96 delete [] localmerged;
105 template <
typename SR,
typename IT,
typename NT>
107 MPI_Datatype & MPI_triple, tuple<IT,IT,NT> * & globalmerged,
108 IT inputnnz, IT & outputnnz, IT ncols)
111 MPI_Comm_size(fibWorld,&fprocs);
114 globalmerged = localmerged;
116 outputnnz = inputnnz;
119 int send_sizes[fprocs];
120 int recv_sizes[fprocs];
123 double loc_beg1 = MPI_Wtime();
125 int cols_per_proc = (ncols + fprocs - 1) / fprocs;
126 int split_point = cols_per_proc;
127 int send_offsets[fprocs+1];
130 for(
int i = 0; i < inputnnz; i++ )
132 if( std::get<1>(localmerged[i]) >= split_point )
134 send_offsets[++target] = i;
135 split_point += cols_per_proc;
138 while(target < fprocs) send_offsets[++target] = inputnnz;
139 for(
int i=0; i<fprocs; i++)
141 send_sizes[i] = send_offsets[i+1] - send_offsets[i];
161 double reduce_beg = MPI_Wtime();
162 MPI_Alltoall( send_sizes, 1, MPI_INT, recv_sizes, 1, MPI_INT,fibWorld);
167 for(
int i = 0; i < fprocs; i++ )
168 recv_count += recv_sizes[i];
169 tuple<IT,IT,NT> *recvbuf =
new tuple<IT,IT,NT>[recv_count];
171 int recv_offsets[fprocs];
173 for(
int i = 1; i < fprocs; i++ ) {
174 recv_offsets[i] = recv_offsets[i-1]+recv_sizes[i-1];
177 reduce_beg = MPI_Wtime();
178 MPI_Alltoallv( localmerged, send_sizes, send_offsets, MPI_triple, recvbuf, recv_sizes, recv_offsets, MPI_triple, fibWorld);
181 loc_beg1 = MPI_Wtime();
184 for(
int i = 0; i < fprocs; i++ )
185 pos[i] = recv_offsets[i];
187 globalmerged =
new tuple<IT,IT,NT>[recv_count];
194 for(
int i = 0; i < fprocs; i++ ) {
195 if( pos[i] < recv_offsets[i]+recv_sizes[i] ) {
196 if( std::get<1>(recvbuf[pos[i]]) < c ) {
197 c = std::get<1>(recvbuf[pos[i]]);
198 r = std::get<0>(recvbuf[pos[i]]);
200 }
else if( (std::get<1>(recvbuf[pos[i]]) == c) && (std::get<0>(recvbuf[pos[i]]) < r) ) {
201 r = std::get<0>(recvbuf[pos[i]]);
209 if( outputnnz > 0 && std::get<0>(globalmerged[outputnnz-1]) == std::get<0>(recvbuf[pos[nexti]]) && std::get<1>(globalmerged[outputnnz-1]) == std::get<1>(recvbuf[pos[nexti]]) )
211 std::get<2>(globalmerged[outputnnz-1]) = SR::add( std::get<2>(globalmerged[outputnnz-1]), std::get<2>(recvbuf[pos[nexti]]) );
214 globalmerged[outputnnz] = recvbuf[pos[nexti]];
224 delete [] localmerged;
void ParallelReduce_Alltoall(MPI_Comm &fibWorld, tuple< IT, IT, NT > *&localmerged, MPI_Datatype &MPI_triple, tuple< IT, IT, NT > *&globalmerged, IT inputnnz, IT &outputnnz, IT ncols)
void ParallelReduce(MPI_Comm &fibWorld, tuple< int32_t, int32_t, double > *&localmerged, MPI_Datatype &MPI_triple, tuple< int32_t, int32_t, double > *&globalmerged, int inputnnz, int &outputnnz)