COMBINATORIAL_BLAS  1.6
Reductions.h
Go to the documentation of this file.
1 #ifndef _REDUCTIONS_H_
2 #define _REDUCTIONS_H_
3 
4 
5 #include <mpi.h>
6 #include <sys/time.h>
7 #include <iostream>
8 #include <iomanip>
9 #include <functional>
10 #include <algorithm>
11 #include <vector>
12 #include <string>
13 #include <sstream>
14 
15 
16 #include "CombBLAS/CombBLAS.h"
17 #include "Glue.h"
18 #include "CCGrid.h"
19 
20 namespace combblas {
21 
22 
23 /***************************************************************************
24  * Distribute a local m/sqrt(p) x n/sqrt(p) matrix (represented by a list of tuples) across layers
25  * so that a each processor along the third dimension receives m/sqrt(p) x n/(c*sqrt(p)) submatrices.
26  * After receiving c submatrices, they are merged to create one m/sqrt(p) x n/(c*sqrt(p)) matrix.
27  * Assumption: input tuples are deleted
28  * Inputs:
29  * fibWorld: Communicator along the third dimension
30  * localmerged: input array of tuples, which will be distributed across layers
31  * Output: output array of tuples, after distributing across layers
32  and merging locally in the received processor
33  *
34  ***************************************************************************/
35 
36 template <typename SR, typename IT, typename NT>
37 SpTuples<IT,NT> * ParallelReduce_Alltoall_threaded(MPI_Comm & fibWorld, SpTuples<IT,NT> * & localmerged)
38 {
39  double comp_begin, comm_begin, comp_time=0, comm_time=0;
40  int fprocs, fibrank;
41  MPI_Comm_size(fibWorld,&fprocs);
42  MPI_Comm_rank(fibWorld,&fibrank);
43  IT mdim = localmerged->getnrow();
44  IT ndim = localmerged->getncol();
45  if(fprocs == 1)
46  {
47  return localmerged;
48  }
49 
50 
51  // ------------ find splitters to distributed across layers -----------
52  comp_begin = MPI_Wtime();
53  std::vector<int> send_sizes(fprocs);
54  std::vector<int> recv_sizes(fprocs);
55  std::vector<int> recv_offsets(fprocs);
56  std::vector<int> send_offsets = findColSplitters<int>(localmerged, fprocs);
57  for(int i=0; i<fprocs; i++)
58  {
59  send_sizes[i] = send_offsets[i+1] - send_offsets[i];
60  }
61  comp_time += (MPI_Wtime() - comp_begin);
62 
63 
64  // ------------ Communicate counts -----------
65  comm_begin = MPI_Wtime();
66  MPI_Alltoall( send_sizes.data(), 1, MPI_INT, recv_sizes.data(), 1, MPI_INT,fibWorld);
67  comm_time += (MPI_Wtime() - comm_begin);
68  MPI_Datatype MPI_triple;
69  MPI_Type_contiguous(sizeof(std::tuple<IT,IT,NT>), MPI_CHAR, &MPI_triple);
70  MPI_Type_commit(&MPI_triple);
71 
72 
73  // ------------ Allocate memory to receive data -----------
74  comp_begin = MPI_Wtime();
75  int recv_count = 0;
76  for( int i = 0; i < fprocs; i++ )
77  {
78  recv_count += recv_sizes[i];
79  }
80  std::tuple<IT,IT,NT> * recvbuf = static_cast<std::tuple<IT, IT, NT>*> (::operator new (sizeof(std::tuple<IT, IT, NT>[recv_count])));
81 
82  recv_offsets[0] = 0;
83  for( int i = 1; i < fprocs; i++ )
84  {
85  recv_offsets[i] = recv_offsets[i-1]+recv_sizes[i-1];
86  }
87  comp_time += (MPI_Wtime() - comp_begin);
88 
89 
90  // ------------ Communicate split tuples -----------
91  comm_begin = MPI_Wtime();
92  MPI_Alltoallv( localmerged->tuples, send_sizes.data(), send_offsets.data(), MPI_triple, recvbuf, recv_sizes.data(), recv_offsets.data(), MPI_triple, fibWorld); // WARNING: is this big enough?
93  comm_time += (MPI_Wtime() - comm_begin);
94 
95 
96 
97  // -------- update column indices of split tuples ----------
98  comp_begin = MPI_Wtime();
99  IT ndimSplit = ndim/fprocs;
100  if(fibrank==(fprocs-1))
101  ndimSplit = ndim - ndimSplit * fibrank;
102  IT coloffset = fibrank * ndimSplit;
103 #pragma omp parallel for
104  for(int k=0; k<recv_count; k++)
105  {
106  std::get<1>(recvbuf[k]) = std::get<1>(recvbuf[k]) - coloffset;
107  }
108 
109 
110  // -------- create vector of SpTuples for MultiwayMerge ----------
111  std::vector< SpTuples<IT,NT>* > lists;
112  for(int i=0; i< fprocs; ++i)
113  {
114  SpTuples<IT, NT>* spTuples = new SpTuples<IT, NT> (recv_sizes[i], mdim, ndimSplit, &recvbuf[recv_offsets[i]], true); // If needed pass an empty object of proper dimension
115  lists.push_back(spTuples);
116  }
117 
118  // -------- merge received tuples ----------
119  SpTuples<IT,NT> * globalmerged = MultiwayMerge<SR>(lists, mdim, ndimSplit, false);
120 
121  comp_time += (MPI_Wtime() - comp_begin);
122  comp_reduce_layer += comp_time;
123  comm_reduce += comm_time;
124 
125 
126  ::operator delete(recvbuf);
127  delete localmerged; // not sure if we can call ::operator delete here
128 
129  return globalmerged;
130 }
131 
132 
133 template <typename NT, typename IT>
134 SpDCCols<IT,NT> * ReduceAll_threaded(std::vector< SpTuples<IT,NT>* > & unreducedC, CCGrid & CMG)
135 {
136  typedef PlusTimesSRing<double, double> PTDD;
137  IT mdim = unreducedC[0]->getnrow();
138  IT ndim = unreducedC[0]->getncol();
139 
140  // ------ merge list of tuples from n/sqrt(p) stages of SUMMA -------
141  double loc_beg1 = MPI_Wtime();
142  //SpTuples<IT, NT>* localmerged = multiwayMerge(unreducedC, true);
143  SpTuples<IT, NT>* localmerged = MultiwayMerge<PTDD>(unreducedC, mdim, ndim, true);
144  comp_reduce += (MPI_Wtime() - loc_beg1);
145 
146  // scatter local tuples across layers
147  SpTuples<IT,NT> * mergedSpTuples = ParallelReduce_Alltoall_threaded<PTDD>(CMG.fiberWorld, localmerged);
148 
149  loc_beg1 = MPI_Wtime();
150  // TODO: this is not a good constructor. Change it back to SpTuple-based constructor
151  SpDCCols<IT,NT> * reducedC = new SpDCCols<IT,NT>(mergedSpTuples->getnrow(), mergedSpTuples->getncol(), mergedSpTuples->getnnz(), mergedSpTuples->tuples, false);
152  comp_result += (MPI_Wtime() - loc_beg1);
153  delete mergedSpTuples; // too expensive
154  return reducedC;
155 }
156 
157 }
158 
159 #endif
160 
161 
double comp_reduce
Definition: mpipspgemm.cpp:25
std::tuple< IT, IT, NT > * tuples
Definition: SpTuples.h:272
SpDCCols< IT, NT > * ReduceAll_threaded(std::vector< SpTuples< IT, NT > * > &unreducedC, CCGrid &CMG)
Definition: Reductions.h:134
double comp_result
Definition: mpipspgemm.cpp:26
double comm_reduce
Definition: mpipspgemm.cpp:23
int64_t getnnz() const
Definition: SpTuples.h:269
IT getnrow() const
Definition: SpTuples.h:267
IT getncol() const
Definition: SpTuples.h:268
Definition: CCGrid.h:4
double comp_reduce_layer
Definition: mpipspgemm.cpp:27
MPI_Comm fiberWorld
Definition: CCGrid.h:42
SpTuples< IT, NT > * ParallelReduce_Alltoall_threaded(MPI_Comm &fibWorld, SpTuples< IT, NT > *&localmerged)
Definition: Reductions.h:37