openCARP
Doxygen code documentation for the open cardiac electrophysiology simulator openCARP
SF_parallel_utils.h
Go to the documentation of this file.
1 // ----------------------------------------------------------------------------
2 // openCARP is an open cardiac electrophysiology simulator.
3 //
4 // Copyright (C) 2020 openCARP project
5 //
6 // This program is licensed under the openCARP Academic Public License (APL)
7 // v1.0: You can use and redistribute it and/or modify it in non-commercial
8 // academic environments under the terms of APL as published by the openCARP
9 // project v1.0, or (at your option) any later version. Commercial use requires
10 // a commercial license (info@opencarp.org).
11 //
12 // This program is distributed without any warranty; see the openCARP APL for
13 // more details.
14 //
15 // You should have received a copy of the openCARP APL along with this program
16 // and can find it online: http://www.opencarp.org/license
17 // ----------------------------------------------------------------------------
18 
26 #ifndef _SF_PARALLEL_UTILS_H
27 #define _SF_PARALLEL_UTILS_H
28 
29 
30 #include <mpi.h>
31 
32 
33 #include "SF_container.h"
34 #include "SF_globals.h"
35 #include "SF_mesh_io.h"
36 #include "SF_vector.h"
37 
38 
39 namespace SF {
40 
51 template<class T>
52 void sort_parallel(MPI_Comm comm, const vector<T> & idx, vector<T> & out_idx)
53 {
54  int size, rank;
55  MPI_Comm_size(comm, &size); MPI_Comm_rank(comm, &rank);
56 
57  // determine global min and max indices
58  T gmax = global_max(idx, comm);
59  T gmin = global_min(idx, comm);
60 
61  // block size
62  T bsize = (gmax - gmin) / size + 1;
63 
64  // distribute tuples uniquely and linearly ascending across the ranks ----------------
65  vector<T> dest(idx.size()), perm;
66  interval(perm, 0, idx.size());
67 
68  // find a destination for every tuple
69  for(size_t i=0; i<dest.size(); i++)
70  dest[i] = (idx[i] - gmin) / bsize;
71 
72  // find permutation to sort tuples in the send buffer
73  binary_sort_copy(dest, perm);
74 
75  // fill send buffer
76  vector<T> snd_idx(idx.size());
77  for(size_t i=0; i<perm.size(); i++)
78  snd_idx[i] = idx[perm[i]];
79 
80  // communicate
81  commgraph<size_t> grph;
82  grph.configure(dest, comm);
83 
84  size_t rsize = sum(grph.rcnt);
85  out_idx.resize(rsize);
86 
87  MPI_Exchange(grph, snd_idx, out_idx, comm);
88 
89  // sort the received values locally
90  binary_sort(out_idx);
91 }
92 
105 template<class T, class V>
106 void sort_parallel(MPI_Comm comm, const vector<T> & idx, const vector<V> & val,
107  vector<T> & out_idx, vector<V> & out_val)
108 {
109  int size, rank;
110  MPI_Comm_size(comm, &size); MPI_Comm_rank(comm, &rank);
111 
112  // determine global min and max indices
113  T gmax = global_max(idx, comm);
114  T gmin = global_min(idx, comm);
115 
116  // block size
117  T bsize = (gmax - gmin) / size + 1;
118 
119  // distribute tuples uniquely and linearly ascending across the ranks ----------------
120  vector<T> dest(idx.size()), perm;
121  interval(perm, 0, idx.size());
122 
123  // find a destination for every tuple
124  for(size_t i=0; i<dest.size(); i++)
125  dest[i] = (idx[i] - gmin) / bsize;
126 
127  // find permutation to sort tuples in the send buffer
128  binary_sort_copy(dest, perm);
129 
130  // fill send buffer
131  vector<T> snd_idx(idx.size());
132  vector<V> snd_val(idx.size());
133  for(size_t i=0; i<perm.size(); i++) {
134  snd_idx[i] = idx[perm[i]];
135  snd_val[i] = val[perm[i]];
136  }
137 
138  // communicate
139  commgraph<size_t> grph;
140  grph.configure(dest, comm);
141 
142  size_t rsize = sum(grph.rcnt);
143  out_idx.resize(rsize);
144  out_val.resize(rsize);
145 
146  MPI_Exchange(grph, snd_idx, out_idx, comm);
147  MPI_Exchange(grph, snd_val, out_val, comm);
148 
149  // sort the received values locally
150  binary_sort_copy(out_idx, out_val);
151 }
152 
153 template<class T, class V>
154 void sort_parallel(MPI_Comm comm, const vector<T> & idx, const vector<T> & cnt, const vector<V> & val,
155  vector<T> & out_idx, vector<T> & out_cnt, vector<V> & out_val)
156 {
157  int size, rank;
158  MPI_Comm_size(comm, &size); MPI_Comm_rank(comm, &rank);
159 
160  // determine global min and max indices
161  T gmax = global_max(idx, comm);
162  T gmin = global_min(idx, comm);
163 
164  // block size
165  T bsize = (gmax - gmin) / size + 1;
166 
167  // distribute tuples uniquely and linearly ascending across the ranks ----------------
168  vector<T> dest(idx.size()), perm, dsp;
169  interval(perm, 0, idx.size());
170  dsp_from_cnt(cnt, dsp);
171 
172  // find a destination for every tuple
173  for(size_t i=0; i<dest.size(); i++)
174  dest[i] = (idx[i] - gmin) / bsize;
175 
176  // find permutation to sort tuples in the send buffer
177  binary_sort_copy(dest, perm);
178 
179  // fill send buffer. this has to happen in two steps since
180  // each element has a different size
181  vector<T> snd_idx(idx.size()), snd_cnt(idx.size()), snd_dsp;
182  vector<V> snd_val(val.size());
183 
184  for(size_t i=0; i<perm.size(); i++) {
185  snd_idx[i] = idx[perm[i]];
186  snd_cnt[i] = cnt[perm[i]];
187  }
188  dsp_from_cnt(snd_cnt, snd_dsp);
189 
190  for(size_t i=0; i<perm.size(); i++) {
191  const V* read = val.data() + dsp[perm[i]];
192  V* write = snd_val.data() + snd_dsp[i];
193 
194  for(T j=0; j<snd_cnt[i]; j++)
195  write[j] = read[j];
196  }
197 
198  // set up two communication graphs, one for one entry per element and one
199  // for multiple entries
200  commgraph<T> grph, grph_entr;
201  grph.configure(dest, comm);
202  grph_entr.configure(dest, snd_cnt, comm);
203 
204  size_t rsize = sum(grph.rcnt), rsize_entr = sum(grph_entr.rcnt);
205  vector<T> rec_cnt(rsize), rec_dsp, out_dsp;
206  vector<V> rec_val(rsize_entr);
207  out_idx.resize(rsize); out_cnt.resize(rsize);
208  out_val.resize(rsize_entr);
209 
210  // communicate
211  MPI_Exchange(grph, snd_idx, out_idx, comm);
212  MPI_Exchange(grph, snd_cnt, rec_cnt, comm);
213  MPI_Exchange(grph_entr, snd_val, rec_val, comm);
214 
215  dsp_from_cnt(rec_cnt, rec_dsp);
216 
217  // sort the received values locally, again in two steps
218  interval(perm, 0, rsize);
219  binary_sort_copy(out_idx, perm);
220 
221  for(size_t i=0; i<perm.size(); i++)
222  out_cnt[i] = rec_cnt[perm[i]];
223 
224  dsp_from_cnt(out_cnt, out_dsp);
225 
226  for(size_t i=0; i<perm.size(); i++) {
227  const V* read = rec_val.data() + rec_dsp[perm[i]];
228  V* write = out_val.data() + out_dsp[i];
229 
230  for(T j=0; j<out_cnt[i]; j++)
231  write[j] = read[j];
232  }
233 }
234 
235 
249 template<class V>
250 size_t root_write(FILE* fd, const vector<V> & vec, MPI_Comm comm)
251 {
252  int size, rank;
253  MPI_Comm_rank(comm, &rank); MPI_Comm_size(comm, &size);
254  long int lsize = vec.size();
255  long int nwr = 0;
256 
257  if(rank == 0) {
258  // file descriptor on root must be valid
259  assert(fd != NULL);
260  // write own chunk
261  nwr += fwrite(vec.data(), sizeof(V), vec.size(), fd);
262  }
263 
264  vector<V> wbuff;
265 
266  // iterate over other ranks and write their chunks
267  for(int pid=1; pid < size; pid++)
268  {
269  if(rank == pid) {
270  MPI_Send(&lsize, 1, MPI_LONG, 0, SF_MPITAG, comm);
271  MPI_Send(vec.data(), lsize*sizeof(V), MPI_BYTE, 0, SF_MPITAG, comm);
272  }
273  else if (rank == 0) {
274  long int rsize;
275  MPI_Status stat;
276 
277  MPI_Recv(&rsize, 1, MPI_LONG, pid, SF_MPITAG, comm, &stat);
278  wbuff.resize(rsize);
279 
280  MPI_Recv(wbuff.data(), rsize*sizeof(V), MPI_BYTE, pid, SF_MPITAG, comm, &stat);
281 
282  nwr += fwrite(wbuff.data(), sizeof(V), rsize, fd);
283  }
284 
285  MPI_Barrier(comm);
286  }
287 
288  MPI_Bcast(&nwr, 1, MPI_LONG, 0, comm);
289  return nwr;
290 }
291 
295 template<class V>
296 size_t root_write(FILE* fd, V* vec, const size_t vec_size, MPI_Comm comm)
297 {
298  vector<V> vecbuff;
299  vecbuff.assign(vec_size, vec, false);
300 
301  size_t nwr = root_write(fd, vecbuff, comm);
302 
303  vecbuff.assign(0, NULL, false);
304 
305  return nwr;
306 }
307 
321 template<class V>
322 size_t root_read(FILE* fd, vector<V> & vec, MPI_Comm comm)
323 {
324  int size, rank;
325  MPI_Comm_rank(comm, &rank); MPI_Comm_size(comm, &size);
326  long int lsize = vec.size();
327  long int nrd = 0;
328 
329  if(rank == 0) {
330  // file descriptor on root must be valid
331  assert(fd != NULL);
332 
333  // read own chunk
334  nrd += fread(vec.data(), sizeof(V), vec.size(), fd);
335  vector<V> rbuff;
336 
337  // iterate over other ranks and write their chunks
338  for(int pid=1; pid < size; pid++)
339  {
340  long int rsize;
341  MPI_Status stat;
342  MPI_Recv(&rsize, 1, MPI_LONG, pid, SF_MPITAG, comm, &stat);
343 
344  rbuff.resize(rsize);
345  nrd += fread(rbuff.data(), sizeof(V), rsize, fd);
346 
347  MPI_Send(rbuff.data(), rsize*sizeof(V), MPI_BYTE, pid, SF_MPITAG, comm);
348  }
349  }
350  else {
351  MPI_Send(&lsize, 1, MPI_LONG, 0, SF_MPITAG, comm);
352 
353  MPI_Status stat;
354  MPI_Recv(vec.data(), lsize*sizeof(V), MPI_BYTE, 0, SF_MPITAG, comm, &stat);
355  }
356 
357  MPI_Bcast(&nrd, 1, MPI_LONG, 0, comm);
358  return nrd;
359 }
360 
375 template<class V>
376 size_t root_read_ascii(FILE* fd, vector<V> & vec, MPI_Comm comm, bool int_data)
377 {
378  int size, rank;
379  MPI_Comm_rank(comm, &rank); MPI_Comm_size(comm, &size);
380  long int lsize = vec.size();
381  long int nrd = 0;
382 
383  double fbuff;
384  long int ibuff;
385 
386  if(rank == 0) {
387  // file descriptor on root must be valid
388  assert(fd != NULL);
389 
390  // read own chunk
391  if(int_data) {
392  for(size_t i=0; i<vec.size(); i++) {
393  nrd += fscanf(fd, "%ld", &ibuff);
394  vec[i] = V(ibuff);
395  }
396  }
397  else {
398  for(size_t i=0; i<vec.size(); i++) {
399  nrd += fscanf(fd, "%lf", &fbuff);
400  vec[i] = V(fbuff);
401  }
402  }
403 
404  vector<V> rbuff;
405 
406  // iterate over other ranks and write their chunks
407  for(int pid=1; pid < size; pid++)
408  {
409  long int rsize;
410  MPI_Status stat;
411  MPI_Recv(&rsize, 1, MPI_LONG, pid, SF_MPITAG, comm, &stat);
412 
413  rbuff.resize(rsize);
414  for(long int i=0; i<rsize; i++) {
415  if(int_data) {
416  nrd += fscanf(fd, "%ld", &ibuff);
417  rbuff[i] = V(ibuff);
418  }
419  else {
420  nrd += fscanf(fd, "%lf", &fbuff);
421  rbuff[i] = V(fbuff);
422  }
423  }
424 
425  MPI_Send(rbuff.data(), rsize*sizeof(V), MPI_BYTE, pid, SF_MPITAG, comm);
426  }
427  }
428  else {
429  MPI_Send(&lsize, 1, MPI_LONG, 0, SF_MPITAG, comm);
430 
431  MPI_Status stat;
432  MPI_Recv(vec.data(), lsize*sizeof(V), MPI_BYTE, 0, SF_MPITAG, comm, &stat);
433  }
434 
435  MPI_Bcast(&nrd, 1, MPI_LONG, 0, comm);
436  return nrd;
437 }
438 
440 inline size_t root_count_ascii_lines(std::string file, MPI_Comm comm)
441 {
442  int size, rank;
443  MPI_Comm_size(comm, &size); MPI_Comm_rank(comm, &rank);
444 
445  size_t line_count = 0;
446  if(rank == 0) {
447  FILE* fd = fopen(file.c_str(), "r");
448 
449  if(fd) {
450  size_t fsize = file_size(fd);
451 
452  vector<int> chunk_sizes;
453  divide(fsize, size, chunk_sizes);
454 
455  vector<unsigned char> chunk;
456  for(int chunk_size : chunk_sizes) {
457  chunk.resize(chunk_size);
458  fread(chunk.data(), chunk_size, 1, fd);
459 
460  for(unsigned char c : chunk)
461  if(c == '\n') line_count++;
462  }
463 
464  fclose(fd);
465  } else {
466  fprintf(stderr, "%s error: Cannot open file %s!\n", __func__, file.c_str());
467  }
468  }
469 
470  MPI_Bcast(&line_count, sizeof(size_t), MPI_BYTE, 0, comm);
471  return line_count;
472 }
473 
477 template<class V>
478 size_t root_read(FILE* fd, V* vec, const size_t vec_size, MPI_Comm comm)
479 {
480  vector<V> vecbuff;
481  vecbuff.assign(vec_size, vec, false);
482 
483  size_t nrd = root_read(fd, vecbuff, comm);
484 
485  vecbuff.assign(0, NULL, false);
486 
487  return nrd;
488 }
489 
493 template<class V>
494 size_t root_read_ascii(FILE* fd, V* vec, const size_t vec_size, MPI_Comm comm, bool int_type)
495 {
496  vector<V> vecbuff;
497  vecbuff.assign(vec_size, vec, false);
498 
499  size_t nrd = root_read_ascii(fd, vecbuff, comm, int_type);
500 
501  vecbuff.assign(0, NULL, false);
502 
503  return nrd;
504 }
505 
521 template<class T, class V>
522 size_t root_write_ordered(FILE* fd, const vector<T> & idx, const vector<V> & vec, MPI_Comm comm)
523 {
524  vector<T> srt_idx;
525  vector<V> srt_vec;
526 
527  sort_parallel(comm, idx, vec, srt_idx, srt_vec);
528  return root_write(fd, srt_vec, comm);
529 }
530 
547 template<class T, class V>
548 size_t root_write_ordered(FILE* fd, const vector<T> & idx, const vector<T> & cnt,
549  const vector<V> & vec, MPI_Comm comm)
550 {
551  vector<T> srt_idx, srt_cnt;
552  vector<V> srt_vec;
553 
554  sort_parallel(comm, idx, cnt, vec, srt_idx, srt_cnt, srt_vec);
555  return root_write(fd, srt_vec, comm);
556 }
557 
558 
562 template<class T, class V>
563 size_t root_write_ordered(FILE* fd, T* idx, V* vec, const size_t vec_size, MPI_Comm comm)
564 {
565  vector<T> idxbuff;
566  vector<V> vecbuff;
567 
568  idxbuff.assign(vec_size, idx, false);
569  vecbuff.assign(vec_size, vec, false);
570 
571  size_t nwr = root_write_ordered(fd, idxbuff, vecbuff, comm);
572 
573  idxbuff.assign(0, NULL, false);
574  vecbuff.assign(0, NULL, false);
575  return nwr;
576 }
577 
581 template<class T, class V>
582 size_t root_write_ordered(FILE* fd, T* idx, T* cnt, V* vec,
583  const size_t idx_size, const size_t vec_size, MPI_Comm comm)
584 {
585  vector<T> idxbuff, cntbuff;
586  vector<V> vecbuff;
587 
588  idxbuff.assign(idx_size, idx, false);
589  cntbuff.assign(idx_size, cnt, false);
590  vecbuff.assign(vec_size, vec, false);
591 
592  size_t nwr = root_write_ordered(fd, idxbuff, cntbuff, vecbuff, comm);
593 
594  idxbuff.assign(0, NULL, false);
595  cntbuff.assign(0, NULL, false);
596  vecbuff.assign(0, NULL, false);
597 
598  return nwr;
599 }
600 
601 
602 template<class T>
603 void print_vector(MPI_Comm comm, const vector<T> & vec, const short dpn, FILE* fd)
604 {
605  int size, rank;
606  MPI_Comm_size(comm, &size), MPI_Comm_rank(comm, &rank);
607 
608  if(rank == 0) {
609  for (size_t i=0; i<vec.size() / dpn; i++ ) {
610  for(short j=0; j<dpn-1; j++)
611  fprintf(fd, "%g ", double(vec[i*dpn + j]) );
612  fprintf(fd, "%g\n", double(vec[i*dpn + (dpn-1)]) );
613  }
614 
615  // iterate over other ranks and write their chunks
616  vector<T> wbuff;
617  for(int pid=1; pid < size; pid++)
618  {
619  long int rsize;
620  MPI_Status stat;
621  MPI_Recv(&rsize, 1, MPI_LONG, pid, SF_MPITAG, comm, &stat);
622  wbuff.resize(rsize);
623  MPI_Recv(wbuff.data(), rsize*sizeof(T), MPI_BYTE, pid, SF_MPITAG, comm, &stat);
624 
625  for (size_t i=0; i<wbuff.size() / dpn; i++ ) {
626  for(short j=0; j<dpn-1; j++)
627  fprintf(fd, "%g ", double(wbuff[i*dpn + j]) );
628 
629  fprintf(fd, "%g\n", double(wbuff[i*dpn + (dpn-1)]) );
630  }
631  }
632  } else {
633  long int lsize = vec.size();
634  MPI_Send(&lsize, 1, MPI_LONG, 0, SF_MPITAG, comm);
635  MPI_Send(vec.data(), lsize*sizeof(T), MPI_BYTE, 0, SF_MPITAG, comm);
636  }
637 }
638 
639 
640 template<class T, class S>
641 void write_data_ascii(const MPI_Comm comm, const vector<T> & idx, const vector<S> & data,
642  std::string file, short dpn = 1)
643 {
644  assert(idx.size() == data.size());
645 
646  int rank;
647  MPI_Comm_rank(comm, &rank);
648 
649  vector<T> srt_idx;
650  vector<S> srt_data;
651  sort_parallel(comm, idx, data, srt_idx, srt_data);
652 
653  FILE* fd = NULL;
654  if(rank == 0) {
655  fd = fopen(file.c_str(), "w");
656  if(fd == NULL) {
657  fprintf(stderr, "%s error: Cannot open file %s for writing! Aborting!\n", __func__, file.c_str());
658  exit(1);
659  }
660  }
661 
662  print_vector(comm, srt_data, dpn, fd);
663 
664  if(fd) fclose(fd);
665 }
666 
667 }
668 #endif
Definition: dense_mat.hpp:34
T global_min(const vector< T > &vec, MPI_Comm comm)
Compute the global minimum of a distributed vector.
Definition: SF_network.h:126
The vector class and related algorithms.
size_t root_read_ascii(FILE *fd, vector< V > &vec, MPI_Comm comm, bool int_data)
Read binary data into a vector.
void MPI_Exchange(commgraph< T > &grph, vector< S > &send, vector< S > &recv, MPI_Comm comm)
Exchange data in parallel over MPI.
Definition: SF_network.h:47
void sort_parallel(MPI_Comm comm, const vector< T > &idx, vector< T > &out_idx)
Sort index values parallel ascending across the ranks.
void interval(vector< T > &vec, size_t start, size_t end)
Create an integer interval between start and end.
Definition: SF_vector.h:350
size_t file_size(FILE *fd)
return file size from a file descriptor
Definition: SF_io_base.h:72
void write_data_ascii(const MPI_Comm comm, const vector< T > &idx, const vector< S > &data, std::string file, short dpn=1)
void divide(const size_t gsize, const size_t num_parts, vector< T > &loc_sizes)
divide gsize into num_parts local parts with even distribution of the remainder
Definition: SF_vector.h:358
Basic containers.
#define SF_MPITAG
the MPI tag when communicating
Definition: SF_globals.h:29
T * data()
Pointer to the vector&#39;s start.
Definition: SF_vector.h:91
size_t root_write_ordered(FILE *fd, const vector< T > &idx, const vector< V > &vec, MPI_Comm comm)
Write index value pairs to disk in ordered permutation.
Functions related to mesh IO.
size_t root_read(FILE *fd, vector< V > &vec, MPI_Comm comm)
Read binary data into a vector.
void configure(const vector< V > &dest, MPI_Comm comm)
Set up the communication graph.
Definition: SF_container.h:667
T sum(const vector< T > &vec)
Compute sum of a vector&#39;s entries.
Definition: SF_vector.h:340
size_t root_count_ascii_lines(std::string file, MPI_Comm comm)
count the lines in a ascii file
void binary_sort_copy(vector< T > &_V, vector< S > &_W)
Definition: SF_sort.h:296
T global_max(const vector< T > &vec, MPI_Comm comm)
Compute the global maximum of a distributed vector.
Definition: SF_network.h:156
size_t size() const
The current size of the vector.
Definition: SF_vector.h:104
A vector storing arbitrary data.
Definition: SF_vector.h:42
void assign(InputIterator s, InputIterator e)
Assign a memory range.
Definition: SF_vector.h:161
vector< T > rcnt
Number of elements received from each rank.
Definition: SF_container.h:630
void binary_sort(vector< T > &_V)
Definition: SF_sort.h:284
size_t root_write(FILE *fd, const vector< V > &vec, MPI_Comm comm)
Write vector data binary to disk.
void resize(size_t n)
Resize a vector.
Definition: SF_vector.h:209
void print_vector(MPI_Comm comm, const vector< T > &vec, const short dpn, FILE *fd)
void dsp_from_cnt(const vector< T > &cnt, vector< T > &dsp)
Compute displacements from counts.
Definition: SF_vector.h:310