29 #ifndef _BFS_FRIENDS_H_ 30 #define _BFS_FRIENDS_H_ 46 template <
class IT,
class NT,
class DER>
58 template <
typename IT,
typename VT>
60 int32_t * sendindbuf, VT * sendnumbuf,
int * cnts,
int * dspls,
int p_c)
63 if(A.
getnnz() > 0 && nnzx > 0)
68 std::vector< std::vector<int32_t> > indy(splits);
69 std::vector< std::vector< VT > > numy(splits);
70 int32_t nlocrows =
static_cast<int32_t
>(A.
getnrow());
71 int32_t perpiece = nlocrows / splits;
74 #pragma omp parallel for 76 for(
int i=0; i<splits; ++i)
79 SpMXSpV_ForThreading<BFSsring>(*(A.
GetDCSC(i)), perpiece, indx, numx, nnzx, indy[i], numy[i], i*perpiece);
81 SpMXSpV_ForThreading<BFSsring>(*(A.
GetDCSC(i)), nlocrows - perpiece*i, indx, numx, nnzx, indy[i], numy[i], i*perpiece);
84 int32_t perproc = nlocrows / p_c;
85 int32_t last_rec = p_c-1;
89 std::vector<int32_t> end_recs(splits);
90 for(
int i=0; i<splits; ++i)
95 end_recs[i] = std::min(indy[i].back() / perproc, last_rec);
98 int ** loc_rec_cnts =
new int *[splits];
100 #pragma omp parallel for 102 for(
int i=0; i<splits; ++i)
104 loc_rec_cnts[i] =
new int[p_c]();
107 int32_t cur_rec = std::min( indy[i].front() / perproc, last_rec);
108 int32_t lastdata = (cur_rec+1) * perproc;
109 for(
typename std::vector<int32_t>::iterator it = indy[i].begin(); it != indy[i].end(); ++it)
111 if( ( (*it) >= lastdata ) && cur_rec != last_rec)
113 cur_rec = std::min( (*it) / perproc, last_rec);
114 lastdata = (cur_rec+1) * perproc;
116 ++loc_rec_cnts[i][cur_rec];
121 #pragma omp parallel for 123 for(
int i=0; i<splits; ++i)
129 int32_t beg_rec = std::min( indy[i].front() / perproc, last_rec);
130 int32_t alreadysent = 0;
131 for(
int before = i-1; before >= 0; before--)
132 alreadysent += loc_rec_cnts[before][beg_rec];
134 if(beg_rec == end_recs[i])
136 std::transform(indy[i].begin(), indy[i].end(), indy[i].begin(), std::bind2nd(std::minus<int32_t>(), perproc*beg_rec));
137 std::copy(indy[i].begin(), indy[i].end(), sendindbuf + dspls[beg_rec] + alreadysent);
138 std::copy(numy[i].begin(), numy[i].end(), sendnumbuf + dspls[beg_rec] + alreadysent);
142 int32_t cur_rec = beg_rec;
143 int32_t lastdata = (cur_rec+1) * perproc;
144 for(
typename std::vector<int32_t>::iterator it = indy[i].begin(); it != indy[i].end(); ++it)
146 if( ( (*it) >= lastdata ) && cur_rec != last_rec )
148 cur_rec = std::min( (*it) / perproc, last_rec);
149 lastdata = (cur_rec+1) * perproc;
155 sendindbuf[ dspls[cur_rec] + alreadysent ] = (*it) - perproc*cur_rec;
156 sendnumbuf[ dspls[cur_rec] + (alreadysent++) ] = *(numy[i].begin() + (it-indy[i].begin()));
162 for(
int i=0; i< splits; ++i)
164 for(
int j=0; j< p_c; ++j)
165 cnts[j] += loc_rec_cnts[i][j];
166 delete [] loc_rec_cnts[i];
168 delete [] loc_rec_cnts;
172 std::cout <<
"Something is wrong, splits should be nonzero for multithreaded execution" << std::endl;
183 template<
typename VT,
typename IT,
typename UDER>
188 double t0=MPI_Wtime();
192 if(A.spSeq->getnsplit() > 0)
196 generic_gespmv_threaded_setbuffers< Select2ndSRing<bool, VT, VT> > (*(A.spSeq), indacc, numacc, (int32_t) accnz, optbuf.
inds, optbuf.
nums, sendcnt, optbuf.
dspls, rowneighs);
205 SpMXSpV< Select2ndSRing<bool, VT, VT> >(*((A.spSeq)->GetInternal()), (int32_t) A.
getlocalrows(), indacc, numacc,
206 accnz, optbuf.
inds, optbuf.
nums, sendcnt, optbuf.
dspls, rowneighs);
213 SpParHelper::Print(
"BFS only (no semiring) function only work with optimization buffers\n");
217 double t1=MPI_Wtime();
223 template <
typename IU,
typename VT>
227 double t0=MPI_Wtime();
230 std::vector<IU>().swap(y.ind);
231 std::vector<VT>().swap(y.num);
234 IU ysize = y.MyLocLength();
235 bool * isthere =
new bool[ysize];
236 std::vector< std::pair<IU,VT> > ts_pairs;
237 std::fill_n(isthere, ysize,
false);
241 for(
int i=0; i<rowneighs; ++i)
243 for(
int j=0; j< recvcnt[i]; ++j)
245 int32_t index = recvindbuf[rdispls[i] + j];
247 ts_pairs.push_back(std::make_pair(index, recvnumbuf[rdispls[i] + j]));
252 DeleteAll(isthere, recvindbuf, recvnumbuf);
253 __gnu_parallel::sort(ts_pairs.begin(), ts_pairs.end());
254 int nnzy = ts_pairs.size();
257 for(
int i=0; i< nnzy; ++i)
259 y.ind[i] = ts_pairs[i].first;
260 y.num[i] = ts_pairs[i].second;
266 int32_t inf = std::numeric_limits<int32_t>::min();
267 int32_t sup = std::numeric_limits<int32_t>::max();
268 KNHeap< int32_t, int32_t > sHeap(sup, inf);
269 int * processed =
new int[rowneighs]();
270 for(int32_t i=0; i<rowneighs; ++i)
275 sHeap.insert(recvindbuf[rdispls[i]], i);
282 sHeap.deleteMin(&key, &locv);
283 y.ind.push_back( static_cast<IU>(key));
284 y.num.push_back(recvnumbuf[rdispls[locv]]);
286 if( (++(processed[locv])) < recvcnt[locv] )
287 sHeap.insert(recvindbuf[rdispls[locv]+processed[locv]], locv);
298 sHeap.deleteMin(&key, &locv);
299 IU deref = rdispls[locv] + processed[locv];
300 if(y.ind.back() !=
static_cast<IU
>(key))
302 y.ind.push_back(static_cast<IU>(key));
303 y.num.push_back(recvnumbuf[deref]);
306 if( (++(processed[locv])) < recvcnt[locv] )
307 sHeap.insert(recvindbuf[rdispls[locv]+processed[locv]], locv);
316 double t1=MPI_Wtime();
327 template <
typename VT,
typename IT,
typename UDER>
333 MPI_Comm World = x.commGrid->GetWorld();
334 MPI_Comm ColWorld = x.commGrid->GetColWorld();
335 MPI_Comm RowWorld = x.commGrid->GetRowWorld();
340 int32_t *trxinds, *indacc;
341 VT *trxnums, *numacc;
345 double t0=MPI_Wtime();
349 double t1=MPI_Wtime();
352 AllGatherVector(ColWorld, trxlocnz, lenuntil, trxinds, trxnums, indacc, numacc, accnz,
true);
355 int rowneighs; MPI_Comm_size(RowWorld,&rowneighs);
356 int * sendcnt =
new int[rowneighs]();
358 LocalSpMV(A, rowneighs, optbuf, indacc, numacc, sendcnt, accnz);
360 int * rdispls =
new int[rowneighs];
361 int * recvcnt =
new int[rowneighs];
362 MPI_Alltoall(sendcnt, 1, MPI_INT, recvcnt, 1, MPI_INT, RowWorld);
366 for(
int i=0; i<rowneighs-1; ++i)
368 rdispls[i+1] = rdispls[i] + recvcnt[i];
370 int totrecv = std::accumulate(recvcnt,recvcnt+rowneighs,0);
371 int32_t * recvindbuf =
new int32_t[totrecv];
372 VT * recvnumbuf =
new VT[totrecv];
375 double t2=MPI_Wtime();
380 MPI_Alltoallv(optbuf.
nums, sendcnt, optbuf.
dspls, MPIType<VT>(), recvnumbuf, recvcnt, rdispls, MPIType<VT>(), RowWorld);
385 SpParHelper::Print(
"BFS only (no semiring) function only work with optimization buffers\n");
388 double t3=MPI_Wtime();
396 template <
typename VT,
typename IT,
typename UDER>
399 IT rowuntil = x.LengthUntil();
400 MPI_Comm RowWorld = cg->GetRowWorld();
401 MPI_Bcast(&rowuntil, 1, MPIType<IT>(), 0, RowWorld);
402 int numcols = cg->GetGridCols();
406 for(
int c=0; c<numcols; c++) {
409 IT sub_range = next_sub_start - curr_sub_start;
411 IT curr_thread_start = curr_sub_start;
413 while(colit.
colid() < curr_thread_start) {
416 starts[c*cblas_splits + t] = colit;
417 curr_thread_start = std::min(curr_thread_start + per_thread, next_sub_start);
423 for(
int c=0; c<numcols; c++) {
425 while(colit.
colid() < next_start) {
430 starts[numcols] = A.
seq().endcol();
435 template <
typename VT,
typename IT>
437 int send_words = num_updates<<1, recv_words;
439 MPI_Sendrecv(&send_words, 1, MPI_INT, dest,
PUPSIZE,
440 &recv_words, 1, MPI_INT, source,
PUPSIZE, RowWorld, &status);
441 std::pair<IT,IT>* recv_buff =
new std::pair<IT,IT>[recv_words>>1];
442 MPI_Sendrecv(updates, send_words, MPIType<IT>(), dest,
PUPDATA,
443 recv_buff, recv_words, MPIType<IT>(), source,
PUPDATA, RowWorld, &status);
446 #pragma omp parallel for 448 for (
int i=0; i<recv_words>>1; i++) {
457 template <
typename VT,
typename IT,
typename UDER>
458 void BottomUpStep(
SpParMat<IT,bool,UDER> &
A,
FullyDistSpVec<IT,VT> & x,
BitMapFringe<int64_t,int64_t> &bm_fringe,
FullyDistVec<IT,VT> & parents,
BitMapCarousel<IT,VT> &done,
SpDCCols<int,bool>::SpColIter* starts)
461 MPI_Comm World = cg->GetWorld();
462 MPI_Comm ColWorld = cg->GetColWorld();
463 MPI_Comm RowWorld = cg->GetRowWorld();
467 IT rowuntil = x.LengthUntil(), my_coluntil = x.LengthUntil(), coluntil;
468 int diagneigh = cg->GetComplementRank();
469 MPI_Sendrecv(&my_coluntil, 1, MPIType<IT>(), diagneigh,
TROST, &coluntil, 1, MPIType<IT>(), diagneigh,
TROST, World, &status);
470 MPI_Bcast(&coluntil, 1, MPIType<IT>(), 0, ColWorld);
471 MPI_Bcast(&rowuntil, 1, MPIType<IT>(), 0, RowWorld);
477 const int buff_size = 8192;
480 local_update_heads[t] =
new std::pair<IT,IT>[buff_size];
484 int numcols = cg->GetGridCols();
485 int mycol = cg->GetRankInProcRow();
486 std::pair<IT,IT>* parent_updates =
new std::pair<IT,IT>[done.
SizeOfChunk()<<1];
488 for (
int sub_step=0; sub_step<numcols; sub_step++) {
491 int dest_slice = (mycol + sub_step) % numcols;
492 int source_slice = (mycol - sub_step + numcols) % numcols;
494 double t1 = MPI_Wtime();
499 int id = omp_get_thread_num();
503 std::pair<IT,IT>* local_updates = local_update_heads[id];
505 colit_end = starts[dest_slice*cblas_splits +
id + 1];
506 for(colit = starts[dest_slice*cblas_splits +
id]; colit != colit_end; ++colit) {
507 int32_t local_row_ind = colit.
colid();
508 IT row = local_row_ind + rowuntil;
510 nzit_end = A.
seq().endnz(colit);
511 for(nzit = A.
seq().begnz(colit); nzit != nzit_end; ++nzit) {
512 int32_t local_col_ind = nzit.
rowid();
513 IT col = local_col_ind + coluntil;
514 if (frontier->
get_bit(local_col_ind)) {
516 if (num_locals == buff_size) {
517 int copy_start = __sync_fetch_and_add(&num_updates, buff_size);
518 std::copy(local_updates, local_updates + buff_size, parent_updates + copy_start);
521 local_updates[num_locals++] = std::make_pair(row-sub_start, col);
528 int copy_start = __sync_fetch_and_add(&num_updates, num_locals);
529 std::copy(local_updates, local_updates + num_locals, parent_updates + copy_start);
534 colit_end = starts[dest_slice+1];
535 for(colit = starts[dest_slice]; colit != colit_end; ++colit)
537 int32_t local_row_ind = colit.
colid();
538 IT row = local_row_ind + rowuntil;
541 nzit_end = A.
seq().endnz(colit);
542 for(nzit = A.
seq().begnz(colit); nzit != nzit_end; ++nzit)
544 int32_t local_col_ind = nzit.
rowid();
545 IT col = local_col_ind + coluntil;
546 if (frontier->
get_bit(local_col_ind))
548 parent_updates[num_updates++] = std::make_pair(row-sub_start, col);
558 double t2 = MPI_Wtime();
569 UpdateParents(RowWorld, parent_updates, num_updates, parents, source_slice, dest_slice, bm_fringe);
579 delete[] local_update_heads[t];
581 delete[] parent_updates;
bool GetBit(IT index) const
std::shared_ptr< CommGrid > getcommgrid() const
Iterate over (sparse) columns of the sparse matrix.
DER::LocalIT getlocalrows() const
bool get_bit(uint64_t pos)
void UpdateFringe(BitMapFringe< IT, NT > &bm_fringe)
IT GetGlobalEndOfLocal() const
SpDCCols< int, bool >::SpColIter * CalcSubStarts(SpParMat< IT, bool, UDER > &A, FullyDistSpVec< IT, VT > &x, BitMapCarousel< IT, VT > &done)
void BottomUpStep(SpParMat< IT, bool, UDER > &A, FullyDistSpVec< IT, VT > &x, BitMapFringe< int64_t, int64_t > &bm_fringe, FullyDistVec< IT, VT > &parents, BitMapCarousel< IT, VT > &done, SpDCCols< int, bool >::SpColIter *starts)
MPI_Datatype MPIType< int32_t >(void)
void SetLocalElement(IT index, NT value)
IT rowid() const
< Return the "local" rowid of the current nonzero entry.
IT GetGlobalStartOfLocal() const
FullyDistSpVec< IT, VT > SpMV(const SpParMat< IT, bool, UDER > &A, const FullyDistSpVec< IT, VT > &x, OptBuf< int32_t, VT > &optbuf)
static void Print(const std::string &s)
Iterate over the nonzeros of the sparse column.
void MergeContributions(FullyDistSpVec< IU, VT > &y, int *&recvcnt, int *&rdispls, int32_t *&recvindbuf, VT *&recvnumbuf, int rowneighs)
void IncrementNumSet(int num_updates)
void CheckSpMVCompliance(const MATRIX &A, const VECTOR &x)
void dcsc_gespmv_threaded_setbuffers(const SpDCCols< IT, bool > &A, const int32_t *indx, const VT *numx, int32_t nnzx, int32_t *sendindbuf, VT *sendnumbuf, int *cnts, int *dspls, int p_c)
DER::LocalIT getlocalnnz() const
double cblas_mergeconttime
void TransposeVector(MPI_Comm &World, const FullyDistSpVec< IU, NV > &x, int32_t &trxlocnz, IU &lenuntil, int32_t *&trxinds, NV *&trxnums, bool indexisvalue)
double cblas_transvectime
void LocalSpMV(const SpParMat< IT, bool, UDER > &A, int rowneighs, OptBuf< int32_t, VT > &optbuf, int32_t *&indacc, VT *&numacc, int *sendcnt, int accnz)
void UpdateParents(MPI_Comm &RowWorld, std::pair< IT, IT > *updates, int num_updates, FullyDistVec< IT, VT > &parents, int source, int dest, BitMapFringe< int64_t, int64_t > &bm_fringe)
BitMap * TransposeGather()
double cblas_localspmvtime
IT colid() const
< Return the "local" colid of the current column.
void AllGatherVector(MPI_Comm &ColWorld, int trxlocnz, IU lenuntil, int32_t *&trxinds, NV *&trxnums, int32_t *&indacc, NV *&numacc, int &accnz, bool indexisvalue)
double cblas_alltoalltime
Dcsc< IT, NT > * GetDCSC() const